conc 使用指南
conc 使用指南
conc 是由 sourcegraph 開源的一套友好的結構化併發工具包,其中總結了 sourcegraph 內部在編寫併發代碼時反覆遇到的問題的解決方案。
conc.WaitGroup
conc 庫中的WaitGroup
是作用域併發的主要構建塊。調用其Go
方法能夠生成 goroutine,調用其Wait
方法可以確保在生成的 goroutine 都退出後再繼續執行。這一些特性和標準庫中的sync.WaitGroup
一樣。區別的地方在於conc.WaitGroup
中子 goroutine 中的 panic 會被傳遞給Wait
方法的調用方,這也正是我們需要conc.WaitGroup
的原因,要不然我們就需要自己去 recover 子 goroutine 中的 panic。
conc.WaitGroup
的使用非常簡單,可以參考以下代碼示例。
// waitGroupDemo conc.WaitGroup併發示例
func waitGroupDemo() {
var count atomic.Int64
var wg conc.WaitGroup
// 開啓10個goroutine併發執行 count.Add(1)
for i := 0; i < 10; i++ {
wg.Go(func() {
count.Add(1)
})
}
// 等待10個goroutine都執行完
wg.Wait()
fmt.Println(count.Load())
}
如果想要自動 recover 子 goroutine 可能傳遞出的 panic,可以使用其WaitAndRecover
方法。示例代碼如下。
// waitGroupDemo 自動recover示例
func waitGroupDemo2() {
var count atomic.Int64
var wg conc.WaitGroup
// 開啓10個goroutine併發執行 count.Add(1)
for i := 0; i < 10; i++ {
wg.Go(func() {
if i == 7 {
panic("bad thing")
}
count.Add(1)
})
}
// 等待10個goroutine都執行完
wg.WaitAndRecover()
fmt.Println(count.Load())
}
擴展:爲什麼要使用作用域併發?
有一種觀點認爲:Go 語言通過
go
關鍵字隨時隨地創建 goroutine 發起併發就像在代碼中使用goto
一樣,讓程序顯得混亂和令人困惑。在併發代碼中無論是出現 panic 還是出現其他錯誤,爲了避免引發更大問題,通常的解決方式是將異常或者堆棧信息傳遞給調用方,這就需要有堆棧信息和明確的 "調用者" 爲前提,所以我們應該使用結構化的併發編程——把 goroutine 放到託兒所裏,而不是讓他們像野孩子一樣到處亂跑。
goroutine 池
Pool
是一個用於併發執行任務的 goroutines 池。conc 包中針對不同的業務場景定義了以下幾種 goroutine 池。
pool.Pool
使用New()
創建一個池對象,然後通過調用Go()
提交要執行的任務。提交完所有任務後,必須調用Wait()
來清理任何派生的 goroutines 並傳播可能出現的 panic。
Pool
中的 goroutine 是延遲啓動的(用到的時候再啓動),所以創建一個新的Pool
是廉價的。產生的 goroutine 永遠不會比提交的任務多。
創建Pool
是高效的,但也不是零成本。它不適用於耗時非常短的任務。啓動和拆卸的開銷約爲 1µs,每個任務的開銷約 300ns。
對於創建得到的池對象,可以使用With
系列函數進行配置。其中最常用的便是通過WithMaxGoroutines()
指定池中最大 goroutine 數量。
例如下面的示例中使用WithMaxGoroutines(3)
配置最大 goroutine 數量爲 3。
// poolDemo goroutine池示例
func poolDemo() {
// 創建一個最大數量爲3的goroutine池
p := pool.New().WithMaxGoroutines(3)
// 使用p.Go()提交5個任務
for i := 0; i < 5; i++ {
p.Go(func() {
fmt.Println("Q1mi")
})
}
p.Wait()
}
注意 :對Pool
使用Go()
提交任務後不允許再調用With
系列方法進行配置。
pool.ContextPool
使用WithContext
可以創建一個傳遞 Context 的Pool
,通過這個父 Context 來控制池中的 goroutine。默認情況下,在取消父 Context 之前,Pool
中的 Context 不會取消。
想要在任何任務返回錯誤或出現 panic 時立即取消其 Context,可以通過WithCancelOnError
進行配置。
// poolWithContextDemoCancelOnError 支持context的池
// goroutine中出錯時取消context
func poolWithContextDemoCancelOnError() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError() // 出錯時取消所有goroutine
// 提交3個任務
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("cancel all other tasks")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
}
pool.WithErrors
當提交的任務有可能返回錯誤時,可以使用WithErrors
得到一個ErrorPool
,並通過Wait()
獲取可能返回的錯誤。
func poolWithError() {
p := pool.New().WithErrors()
for i := 0; i < 3; i++ {
i := i
p.Go(func() error {
if i == 2 {
return errors.New("oh no!")
}
return nil
})
}
err := p.Wait()
fmt.Println(err)
}
pool.ResultPool
ResultPool
是一個執行返回泛型結果的任務池。使用Go()
在池中執行任務,然後由Wait()
返回任務的結果。
// poolWithResult 執行返回結果的任務池
func poolWithResult() {
// 創建一個任務池,其中任務返回的結果爲int
p := pool.NewWithResults[int]()
for i := 0; i < 10; i++ {
i := i
p.Go(func() int {
return i * 2
})
}
res := p.Wait()
// 結果的順序是不確定的, 所以這裏先排序再打印
sort.Ints(res)
fmt.Println(res)
}
pool.ResultContextPool
ResultContextPool
中執行的任務接受一個 context 參數並返回結果。
pool.ResultErrorPool
ResultErrorPool
中執行的任務會返回一個泛型結果和錯誤。
Stream
在Pool
中併發執行任務後返回結果是無序的,conc 中有一個 stream 包提供任務併發、結果有序的實現,適用於在維護結果順序的同時併發執行任務流。
使用Stream
時提交的每個任務都返回一個回調函數。每個任務都將在任務池中同時執行,但是回調函數將按照任務提交的順序依次執行。
等到所有任務都提交後,必須調用 Wait()
來等待正在運行的 goroutine 運行完。當任務執行過程中或回調函數執行期間出現 panic 時,所有其他任務和回調仍將執行。當調用 Wait()
時,panic 將傳給調用方。
同Pool
一樣,Stream
也不適用於非常短的任務。啓動和拆卸會增加幾微秒的開銷,每個任務的開銷大約是 500ns。對於任何需要網絡通話的任務來說,這性能都足夠好了。
// streamDemo 併發的流式任務示例
func streamDemo() {
times := []int{20, 52, 16, 45, 4, 80}
s := stream.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
// 提交任務
s.Go(func() stream.Callback {
time.Sleep(dur)
// 雖然上一行通過sleep增加了時間
// 但最終結果仍按任務提交(s.Go)的順序打印
return func() { fmt.Println(dur) }
})
}
s.Wait()
}
輸出結果:
20ms
52ms
16ms
45ms
4ms
80ms
iter
conc 包還提供了一個 iter。在iter
包中提供了基於泛型實現的iterator
和mapper
,封裝了以下四種日常開發中常用的迭代方法。
func ForEach[T any](input []T, f func(*T))
func ForEachIdx[T any](input []T, f func(int, *T))
func Map[T, R any](input []T, f func(*T) R) []R
func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error)
初始狀態下可直接調用iter
包的上述函數。
// iterDemo 迭代器ForEach示例
func iterDemo() {
input := []int{1, 2, 3, 4}
// 可直接調用iter包的ForEach函數
iter.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})
fmt.Println(input)
}
如果想要更定製化的功能,可以自行構造iterator
或mapper
。
iterator 示例
iterator
適用於對序列中的每個元素執行統一操作的場景。
// iteratorDemo 迭代器示例
func iteratorDemo() {
input := []int{1, 2, 3, 4}
// 創建一個最大goroutine個數爲輸入元素一半的迭代器
iterator := iter.Iterator[int]{
MaxGoroutines: len(input) / 2,
}
iterator.ForEach(input, func(v *int) {
if *v%2 != 0 {
*v = -1
}
})
fmt.Println(input)
}
輸出:
[-1 2 -1 4]
mapper 示例
mapper
是一個帶有結果類型的迭代器,適用於遍歷序列中的每個元素執行統一操作後拿到返回結果的場景。
// mapperDemo mapper示例
func mapperDemo() {
input := []int{1, 2, 3, 4}
// 創建一個最大goroutine個數爲輸入元素一半的映射器
mapper := iter.Mapper[int, bool]{
MaxGoroutines: len(input) / 2,
}
results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
fmt.Println(results)
}
輸出:
[false true false true]
panics
conc 下提供了一個處理 panic 相關的 panics 包。其中panics.Catcher
是用來捕獲任務運行時可能出現的 panic。具體使用方法是通過 Try()
執行任務函數,該函數將捕獲任何產生的 panic。Try()
可以在任意 goroutine 中被調用任意次數。一旦所有調用完成後,使用 Recovered()
獲取第一個 panic(如果有的話) 的值,或者使用 Repanic()
傳遞 panic(重新 panic)。
// panicDemo recover可能出現的異常
func panicDemo() {
var pc panics.Catcher
i := 0
pc.Try(func() { i += 1 })
pc.Try(func() { panic("abort!") })
pc.Try(func() { i += 1 })
// recover可能出現的panic
rc := pc.Recovered()
// 重新panic
// pc.Repanic()
fmt.Println(i)
fmt.Println(rc.Value.(string))
}
總結
對於不太熟悉併發編程的初學者來說 conc 提供了一套簡單易用的併發工具,同時其代碼實現也比較簡潔,所以很適合用來學習源碼。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Kil56G3YtE2qQf0aKSZd0A