聊聊併發庫 conc

上個月 sourcegraph 放出了 conc[1] 併發庫,目標是 better structured concurrency for go, 簡單的評價一下

每個公司都有類似的輪子,與以往的庫比起來,多了泛型,代碼寫起來更優雅,不需要 interface, 不需要運行時 assert, 性能肯定更好

我們在寫通用庫和框架的時候,都有一個原則,併發控制與業務邏輯分離,背離這個原則肯定做不出通用庫

整體介紹

1. WaitGroup 與 Panic

標準庫自帶 sync.WaitGroup 用於等待 goroutine 運行結束,缺點是我們要處理控制部分

代碼裏大量的 wg.Addwg.Done 函數,所以一般封裝成右側的庫

type WaitGroup struct {
 wg sync.WaitGroup
 pc panics.Catcher
}

// Go spawns a new goroutine in the WaitGroup.
func (h *WaitGroup) Go(f func()) {
 h.wg.Add(1)
 go func() {
  defer h.wg.Done()
  h.pc.Try(f)
 }()
}

但是如何處理 panic 呢?簡單的可以在閉包 doSomething 運行時增加一個 safeGo 函數,用於捕捉 recover

原生 Go 要生成大量無用代碼,我司 repo 運動式的清理過一波,也遇到過 goroutine 忘寫 recover 導致的事故。conc 同時提供 catcher 封裝 recover 邏輯,conc.WaitGroup 可以選擇 Wait 重新拋出 panic, 也可以 WaitAndRecover 返回捕獲到的 panic 堆棧信息

func (h *WaitGroup) Wait() {
 h.wg.Wait()

 // Propagate a panic if we caught one from a child goroutine.
 h.pc.Repanic()
}

func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
 h.wg.Wait()

 // Return a recovered panic if we caught one from a child goroutine.
 return h.pc.Recovered()
}

2. ForEach 與 Map

高級語言很多的基操,在 go 裏面很奢侈,只能寫很多繁瑣代碼。conc封裝了泛型版本的 iterator 和 mapper

func process(values []int) {
    iter.ForEach(values, handle)
}

func concMap(input []int, f func(int) int) []int {
    return iter.Map(input, f)
}

上面是使用例子,用戶只需要寫業務函數 handle. 相比 go1.19 前的版本,泛型的引入,使得基礎庫的編寫更遊刃有餘

// Iterator is also safe for reuse and concurrent use.
type Iterator[T any] struct {
 // MaxGoroutines controls the maximum number of goroutines
 // to use on this Iterator's methods.
 //
 // If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
 MaxGoroutines int
}

MaxGoroutines 默認 GOMAXPROCS 併發處理傳參 slice, 也可以自定義,個人認爲不合理,默認爲 1 最妥

// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }

ForEachIdx 在創建 Iterator[T]{} 可以自定義併發度,最終調用 iter.ForEachIdx

// ForEachIdx is the same as ForEach except it also provides the
// index of the element to the callback.
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
  ......
 var idx atomic.Int64
 // Create the task outside the loop to avoid extra closure allocations.
 task := func() {
  i := int(idx.Add(1) - 1)
  for ; i < numInput; i = int(idx.Add(1) - 1) {
   f(i, &input[i])
  }
 }

 var wg conc.WaitGroup
 for i := 0; i < iter.MaxGoroutines; i++ {
  wg.Go(task)
 }
 wg.Wait()
}

ForEachIdx 泛型函數寫得非常好,略去部分代碼。樸素的實現在 for 循環裏創建閉包,傳入 idx 參數,然後 wg.Go 去運行。但是這樣會產生大量閉包,我司遇到過大量閉包,造成 heap 內存增長很快頻繁觸發 GC 的性能問題,所以在外層只創建一個閉包,通過 atomic 控制 idx

func Map[T, R any](input []T, f func(*T) R) []{
 return Mapper[T, R]{}.Map(input, f)
}

func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error) {
 return Mapper[T, R]{}.MapErr(input, f)
}

MapMapErr 也只是對 ForEachIdx 的封裝,區別是處理 error

3. 各種 Pool 與 Stream

Pool 用於併發處理,同時 Wait 等待任務結束。相比我司現有 concurrency 庫

先看一下支持的接口

Go(f func())
Go(f func() error) 
Go(f func(ctx context.Context) error)
Go(f func(context.Context) (T, error))
Go(f func() (T, error)) 
Go(f func() T)
Go(f func(context.Context) (T, error))

理論上這一個足夠用了,傳參 Context, 返回泛型類型與錯誤。

Wait() ([]T, error)

這是對應的 Wait 回收函數,返回泛型結果 []T 與錯誤。具體 Pool 實現由多種組合而來:Pool, ErrorPool, ContextPool, ResultContextPool, ResultPool

func (p *Pool) Go(f func()) {
 p.init()

 if p.limiter == nil {
  // No limit on the number of goroutines.
  select {
  case p.tasks <- f:
   // A goroutine was available to handle the task.
  default:
   // No goroutine was available to handle the task.
   // Spawn a new one and send it the task.
   p.handle.Go(p.worker)
   p.tasks <- f
  }
 }
  ......
}

func (p *Pool) worker() {
 // The only time this matters is if the task panics.
 // This makes it possible to spin up new workers in that case.
 defer p.limiter.release()

 for f := range p.tasks {
  f()
 }
}

複用方式很巧妙,如果處理速度足夠快,沒必要過多創建 goroutine

Stream 用於併發處理 goroutine, 但是返回結果保持順序

type Stream struct {
 pool             pool.Pool
 callbackerHandle conc.WaitGroup
 queue            chan callbackCh

 initOnce sync.Once
}

實現很簡單,queue 是一個 channel, 類型 callbackCh 同樣也是 channel, 在真正派生 goroutine 前按序順生成 callbackCh 傳遞結果

Stream 命名很差,容易讓人混淆,感覺叫 OrderedResultsPool 更理想,整體非常雞肋

超時

超時永遠是最難處理的問題,目前 conc 庫 Wait 函數並沒有提供 timeout 傳參,這就要求閉包內部必須考濾超時,如果添加 timeout 傳參,又涉及 conc 內部庫併發問題題

Wait() ([]T, error)

比如這個返回值,內部 append 到 slice 時是有鎖的,如果 Wait 提前結束了會發生什麼?

[]T 拿到的部分結果只能丟棄,返回給上層 timeout error

Context 框架傳遞參數

通用庫很容易做的臃腫,我司併發庫會給閉包產生新的 context, 並繼承所需框架層的 metadata, 兩種實現無可厚非,這些細節總得要處理

參考資料

[1]

conc: https://github.com/sourcegraph/conc,

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/tU7ufjwOjlwBGhnbTNq4PA