聊聊併發庫 conc
上個月 sourcegraph 放出了 conc[1] 併發庫,目標是 better structured concurrency for go, 簡單的評價一下
每個公司都有類似的輪子,與以往的庫比起來,多了泛型,代碼寫起來更優雅,不需要 interface, 不需要運行時 assert, 性能肯定更好
我們在寫通用庫和框架的時候,都有一個原則,併發控制與業務邏輯分離,背離這個原則肯定做不出通用庫
整體介紹
1. WaitGroup 與 Panic
標準庫自帶 sync.WaitGroup
用於等待 goroutine 運行結束,缺點是我們要處理控制部分
代碼裏大量的 wg.Add
與 wg.Done
函數,所以一般封裝成右側的庫
type WaitGroup struct {
wg sync.WaitGroup
pc panics.Catcher
}
// Go spawns a new goroutine in the WaitGroup.
func (h *WaitGroup) Go(f func()) {
h.wg.Add(1)
go func() {
defer h.wg.Done()
h.pc.Try(f)
}()
}
但是如何處理 panic 呢?簡單的可以在閉包 doSomething
運行時增加一個 safeGo 函數,用於捕捉 recover
原生 Go
要生成大量無用代碼,我司 repo 運動式的清理過一波,也遇到過 goroutine 忘寫 recover 導致的事故。conc
同時提供 catcher
封裝 recover 邏輯,conc.WaitGroup
可以選擇 Wait
重新拋出 panic, 也可以 WaitAndRecover
返回捕獲到的 panic 堆棧信息
func (h *WaitGroup) Wait() {
h.wg.Wait()
// Propagate a panic if we caught one from a child goroutine.
h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
h.wg.Wait()
// Return a recovered panic if we caught one from a child goroutine.
return h.pc.Recovered()
}
2. ForEach 與 Map
高級語言很多的基操,在 go 裏面很奢侈,只能寫很多繁瑣代碼。conc
封裝了泛型版本的 iterator 和 mapper
func process(values []int) {
iter.ForEach(values, handle)
}
func concMap(input []int, f func(int) int) []int {
return iter.Map(input, f)
}
上面是使用例子,用戶只需要寫業務函數 handle. 相比 go1.19 前的版本,泛型的引入,使得基礎庫的編寫更遊刃有餘
// Iterator is also safe for reuse and concurrent use.
type Iterator[T any] struct {
// MaxGoroutines controls the maximum number of goroutines
// to use on this Iterator's methods.
//
// If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
MaxGoroutines int
}
MaxGoroutines 默認 GOMAXPROCS 併發處理傳參 slice, 也可以自定義,個人認爲不合理,默認爲 1 最妥
// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }
ForEachIdx
在創建 Iterator[T]{}
可以自定義併發度,最終調用 iter.ForEachIdx
// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
......
var idx atomic.Int64
// Create the task outside the loop to avoid extra closure allocations.
task := func() {
i := int(idx.Add(1) - 1)
for ; i < numInput; i = int(idx.Add(1) - 1) {
f(i, &input[i])
}
}
var wg conc.WaitGroup
for i := 0; i < iter.MaxGoroutines; i++ {
wg.Go(task)
}
wg.Wait()
}
ForEachIdx
泛型函數寫得非常好,略去部分代碼。樸素的實現在 for 循環裏創建閉包,傳入 idx 參數,然後 wg.Go 去運行。但是這樣會產生大量閉包,我司遇到過大量閉包,造成 heap 內存增長很快頻繁觸發 GC 的性能問題,所以在外層只創建一個閉包,通過 atomic 控制 idx
func Map[T, R any](input []T, f func(*T) R) []R {
return Mapper[T, R]{}.Map(input, f)
}
func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error) {
return Mapper[T, R]{}.MapErr(input, f)
}
Map
與 MapErr
也只是對 ForEachIdx
的封裝,區別是處理 error
3. 各種 Pool 與 Stream
Pool
用於併發處理,同時 Wait
等待任務結束。相比我司現有 concurrency 庫
-
增加了泛型實現
-
增加了對 goroutine 的複用
-
增加併發度設置 (我司有,但 conc 實現方式更巧秒)
-
支持的函數簽名更多
先看一下支持的接口
Go(f func())
Go(f func() error)
Go(f func(ctx context.Context) error)
Go(f func(context.Context) (T, error))
Go(f func() (T, error))
Go(f func() T)
Go(f func(context.Context) (T, error))
理論上這一個足夠用了,傳參 Context
, 返回泛型類型與錯誤。
Wait() ([]T, error)
這是對應的 Wait
回收函數,返回泛型結果 []T
與錯誤。具體 Pool
實現由多種組合而來:Pool
, ErrorPool
, ContextPool
, ResultContextPool
, ResultPool
func (p *Pool) Go(f func()) {
p.init()
if p.limiter == nil {
// No limit on the number of goroutines.
select {
case p.tasks <- f:
// A goroutine was available to handle the task.
default:
// No goroutine was available to handle the task.
// Spawn a new one and send it the task.
p.handle.Go(p.worker)
p.tasks <- f
}
}
......
}
func (p *Pool) worker() {
// The only time this matters is if the task panics.
// This makes it possible to spin up new workers in that case.
defer p.limiter.release()
for f := range p.tasks {
f()
}
}
複用方式很巧妙,如果處理速度足夠快,沒必要過多創建 goroutine
Stream
用於併發處理 goroutine, 但是返回結果保持順序
type Stream struct {
pool pool.Pool
callbackerHandle conc.WaitGroup
queue chan callbackCh
initOnce sync.Once
}
實現很簡單,queue 是一個 channel, 類型 callbackCh
同樣也是 channel, 在真正派生 goroutine 前按序順生成 callbackCh 傳遞結果
Stream
命名很差,容易讓人混淆,感覺叫 OrderedResultsPool
更理想,整體非常雞肋
超時
超時永遠是最難處理的問題,目前 conc 庫 Wait
函數並沒有提供 timeout 傳參,這就要求閉包內部必須考濾超時,如果添加 timeout 傳參,又涉及 conc 內部庫併發問題題
Wait() ([]T, error)
比如這個返回值,內部 append 到 slice 時是有鎖的,如果 Wait
提前結束了會發生什麼?
[]T
拿到的部分結果只能丟棄,返回給上層 timeout error
Context 框架傳遞參數
通用庫很容易做的臃腫,我司併發庫會給閉包產生新的 context, 並繼承所需框架層的 metadata, 兩種實現無可厚非,這些細節總得要處理
參考資料
[1]
conc: https://github.com/sourcegraph/conc,
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/tU7ufjwOjlwBGhnbTNq4PA