一個例子,給你講透典型的 Go 併發控制
Go 中可以使用一個go關鍵字讓程序異步執行
一個比較常見的場景:逐個異步調用多個函數,或者循環中異步調用
func main() {
go do1()
go do2()
go do3()
}
// 或者
func main() {
for i := range []int{1,2,3}{
go do(i)
}
}
如果瞭解 Go 併發機制,就知道main在其他 goroutine 運行完成之前就已經結束了,所以上面代碼的運行結果是不符合預期的。我們需要使用一種叫做併發控制的手段,來保證程序正確運行
爲了更容易理解,我們虛擬一個🌰
已知有一個現成的函數search,能夠按照關鍵詞執行搜索
期望實現一個新的函數coSearch能夠進行批量查詢
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
if word == "Go" {
return "", errors.New("error: Go") // 模擬結果
}
return fmt.Sprintf("result: %s", word), nil // 模擬結果
}
func coSearch(ctx context.Context, words []string) (results []string, err error) {
//tbd
return
}
func main() {
words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
可以先暫停想想該如何實現coSearch函數
併發控制基礎
sync.WaitGroup是 Go 標準庫中用來控制併發的結構,這裏放一個使用WaitGroup實現coSearch的示例
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
if word == "Go" {
return "", errors.New("error: Go") // 模擬結果
}
return fmt.Sprintf("result: %s", word), nil // 模擬結果
}
func coSearch(ctx context.Context, words []string) ([]string, error) {
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
err error
)
for i, word := range words {
wg.Add(1)
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
func main() {
words := []string{"Go", "Rust", "PHP", "JavaScript", "Java"}
results, err := coSearch(context.Background(), words)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(results)
}
上面的代碼中有非常多的細節,來逐個聊一聊
🌲 sync.WaitGroup{}併發控制
sync.WaitGroup{}的用法非常簡潔
-
當新運行一個 goroutine 時,我們需要調用
wg.Add(1) -
當一個 goroutine 運行完成的時候,我們需要調用
wg.Done() -
wg.Wait()讓程序阻塞在此處,直到所有的 goroutine 運行完畢。
對於coSearch來說,等待所有 goroutine 運行完成,也就完成了函數的任務,返回最終的結果
var (
wg = sync.WaitGroup{}
//...省略其他代碼
)
for i, word := range words {
wg.Add(1)
go func(word string, i int) {
defer wg.Done()
//...省略其他代碼
}(word, i)
}
wg.Wait()
🌲 for循環中的 goroutine!
這是一個 Go 經典錯誤,如果 goroutine 中使用了for迭代的變量,所有 goroutine 都會獲得最後一次循環的值。例如下面的示例,並不會輸出 "a", "b", "c" 而是輸出 "c", "c", "c"
func main() {
done := make(chan bool)
values := []string{"a", "b", "c"}
for _, v := range values {
go func() {
fmt.Println(v)
done <- true
}()
}
// wait for all goroutines to complete before exiting
for _ = range values {
<-done
}
}
正確的做法就是像上文示例一樣,將迭代的變量賦值給函數參數,或者賦值給新的變量
for i, word := range words {
// ...
go func(word string, i int) {
// fmt.Println(word, i)
}(word, i)
}
for i, word := range words {
i, word := i, word
go func() {
// fmt.Println(word, i)
}()
}
由於這個錯誤實在太常見,從 Go 1.22 開始 Go 已經修正了這個經典的錯誤:Fixing For Loops in Go 1.22。
不過 Go 1.22 默認不會開啓修正,需要設置環境變量
GOEXPERIMENT=loopvar纔會 開啓
🌲 併發安全
簡單理解:當多個 goroutine 對同一個內存區域進行讀寫時,就會產生併發安全的問題,它會導致程序運行的結果不符合預期
上面的示例把最終的結果放入了results = make([]string, len(words))中。雖然我們在 goroutine 中併發的對於results變量進行寫入,但因爲每一個 goroutine 都寫在了獨立的位置,且沒有任何讀取的操作,因此results[i] = result是併發安全的
results = [ xxxxxxxx, xxxxxxxx, xxxxxxxx, .... ]
^ ^ ^
| | |
goroutine1 goroutine2 goroutine3
這也意味着如果使用results = append(results, result)的方式併發賦值,因爲會涉及到 slice 的擴容等操作,所以並不是併發安全的,需要利用sync.Mutex{}進行加鎖
如果想盡可能的提高程序的併發性能,推薦使用 results[i] = result這種方式賦值
🌲 sync.Once{}單次賦值
示例coSearch中,會返回第一個出錯的search的error。err是一個全局變量,在併發 goroutine 中賦值是併發不安全的操作
//...省略其他代碼
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil && err == nil {
err = e
return
}
results[i] = result
}(word, i)
//...省略其他代碼
對於全局變量的賦值比較常規做法就是利用sync.Mutex{}進行加鎖。但示例的邏輯爲單次賦值,我們剛好可以利用同在sync庫的sync.Once{}來簡化代碼
sync.Once{}功能如其名,將我們要執行的邏輯放到它的Do()方法中,無論多少併發都只會執行一次
//...省略其他代碼
go func(word string, i int) {
defer wg.Done()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
//...省略其他代碼
Further more
上面的示例coSearch已經是一個比較完善的函數了,但我們還可以做得更多
🌲 goroutine 數量控制
coSearch入參的數組可能非常大,如果不加以控制可能導致我們的服務器資源耗盡,我們需要控制併發的數量
利用帶緩衝 channel 可以實現
tokens := make(chan struct{}, 10)
for i, word := range words {
tokens <- struct{}{} // 新增
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens // 新增
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
如上,代碼中創建了 10 個緩衝區的 channel,當 channel 被填滿時,繼續寫入會被阻塞;當 goroutine 運行完成之後,除了原有的wg.Done(),我們需要從 channel 讀取走一個數據,來允許新的 goroutine 運行
通過這種方式,我們控制了coSearch最多隻能運行 10 個 goroutine,當超過 10 個時需要等待前面運行的 goroutine 結束
🌲 context.Context
併發執行的 goroutine 只要有一個出錯,其他 goroutine 就可以停止,沒有必要繼續執行下去了。如何把取消的事件傳導到其他 goroutine 呢?context.Context就是用來傳遞類似上下文信息的結構
ctx, cancel := context.WithCancelCause(ctx) // 新增
defer cancel(nil) // 新增
for i, word := range words {
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
cancel(e) // 新增
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
完整的代碼
最終完成的效果如下
package main
import (
"context"
"errors"
"fmt"
"sync"
)
func search(ctx context.Context, word string) (string, error) {
select {
case <-ctx.Done():
return "", ctx.Err()
default:
if word == "Go" || word == "Java" {
return "", errors.New("Go or Java")
}
return fmt.Sprintf("result: %s", word), nil // 模擬結果
}
}
func coSearch(ctx context.Context, words []string) ([]string, error) {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
var (
wg = sync.WaitGroup{}
once = sync.Once{}
results = make([]string, len(words))
tokens = make(chan struct{}, 2)
err error
)
for i, word := range words {
tokens <- struct{}{}
wg.Add(1)
go func(word string, i int) {
defer func() {
wg.Done()
<-tokens
}()
result, e := search(ctx, word)
if e != nil {
once.Do(func() {
err = e
cancel(e)
})
return
}
results[i] = result
}(word, i)
}
wg.Wait()
return results, err
}
併發控制庫 errgroup
可以看到要實現一個較爲完備的併發控制,需要做的工作非常多。不過 Go 官方團隊爲大家準備了 golang.org/x/sync/errgroup
errgroup提供的能力和上文的示例類似,實現方式也類似,包含併發控制,錯誤傳遞,context.Context傳遞等
package main
import (
"context"
"fmt"
"sync"
"golang.org/x/sync/errgroup"
)
func coSearch(ctx context.Context, words []string) ([]string, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10)
results := make([]string, len(words))
for i, word := range words {
i, word := i, word
g.Go(func() error {
result, err := search(ctx, word)
if err != nil {
return err
}
results[i] = result
return nil
})
}
err := g.Wait()
return results, err
}
errgroup的用法也很簡單
-
使用
g, ctx := errgroup.WithContext(ctx)來創建 goroutine 的管理器 -
g.SetLimit()可以設置允許的最大的 goroutine 數量 -
類似於
go關鍵詞,g.Go異步執行函數 -
g.Wait()和sync.WaitGroup{}的wg.Wait()類似,會阻塞直到所有 goroutine 都運行完成,並返回其中一個 goroutine 的錯誤
利用golang.org/x/sync/errgroup大幅簡化了進行併發控制的邏輯,真是一個併發控制的利器啊!
總結
本篇從基礎的sync.WaitGroup{}庫出發,涉及到了併發安全、sync.Once等內容。最後介紹了併發控制的利器:golang.org/x/sync/errgroup。
雖然使用 Go 語言能夠非常簡單的編寫併發程序,但其中要注意的細節非常多,忽略這些細節不僅沒有提升程序運行的效率,還會產生錯誤的結果
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/qxs6ikRy_Km5JlJesYmH4w