conc 使用指南

conc 使用指南

conc 是由 sourcegraph 開源的一套友好的結構化併發工具包,其中總結了 sourcegraph 內部在編寫併發代碼時反覆遇到的問題的解決方案。

conc.WaitGroup

conc 庫中的WaitGroup是作用域併發的主要構建塊。調用其Go方法能夠生成 goroutine,調用其Wait方法可以確保在生成的 goroutine 都退出後再繼續執行。這一些特性和標準庫中的sync.WaitGroup一樣。區別的地方在於conc.WaitGroup中子 goroutine 中的 panic 會被傳遞給Wait方法的調用方,這也正是我們需要conc.WaitGroup的原因,要不然我們就需要自己去 recover 子 goroutine 中的 panic。

conc.WaitGroup的使用非常簡單,可以參考以下代碼示例。

// waitGroupDemo conc.WaitGroup併發示例
func waitGroupDemo() {
 var count atomic.Int64

 var wg conc.WaitGroup
 // 開啓10個goroutine併發執行 count.Add(1)
 for i := 0; i < 10; i++ {
  wg.Go(func() {
   count.Add(1)
  })
 }
 // 等待10個goroutine都執行完
 wg.Wait()

 fmt.Println(count.Load())
}

如果想要自動 recover 子 goroutine 可能傳遞出的 panic,可以使用其WaitAndRecover方法。示例代碼如下。

// waitGroupDemo 自動recover示例
func waitGroupDemo2() {
 var count atomic.Int64

 var wg conc.WaitGroup
 // 開啓10個goroutine併發執行 count.Add(1)
 for i := 0; i < 10; i++ {
  wg.Go(func() {
   if i == 7 {
    panic("bad thing")
   }
   count.Add(1)
  })
 }
 // 等待10個goroutine都執行完
 wg.WaitAndRecover()

 fmt.Println(count.Load())
}

擴展:爲什麼要使用作用域併發?

有一種觀點認爲:Go 語言通過go關鍵字隨時隨地創建 goroutine 發起併發就像在代碼中使用goto一樣,讓程序顯得混亂和令人困惑。在併發代碼中無論是出現 panic 還是出現其他錯誤,爲了避免引發更大問題,通常的解決方式是將異常或者堆棧信息傳遞給調用方,這就需要有堆棧信息和明確的 "調用者" 爲前提,所以我們應該使用結構化的併發編程——把 goroutine 放到託兒所裏,而不是讓他們像野孩子一樣到處亂跑。

goroutine 池

Pool 是一個用於併發執行任務的 goroutines 池。conc 包中針對不同的業務場景定義了以下幾種 goroutine 池。

pool.Pool

使用New()創建一個池對象,然後通過調用Go()提交要執行的任務。提交完所有任務後,必須調用Wait()來清理任何派生的 goroutines 並傳播可能出現的 panic。

Pool中的 goroutine 是延遲啓動的(用到的時候再啓動),所以創建一個新的Pool是廉價的。產生的 goroutine 永遠不會比提交的任務多。

創建Pool是高效的,但也不是零成本。它不適用於耗時非常短的任務。啓動和拆卸的開銷約爲 1µs,每個任務的開銷約 300ns。

對於創建得到的池對象,可以使用With系列函數進行配置。其中最常用的便是通過WithMaxGoroutines()指定池中最大 goroutine 數量。

例如下面的示例中使用WithMaxGoroutines(3)配置最大 goroutine 數量爲 3。

// poolDemo goroutine池示例
func poolDemo() {
 // 創建一個最大數量爲3的goroutine池
 p := pool.New().WithMaxGoroutines(3)
 // 使用p.Go()提交5個任務
 for i := 0; i < 5; i++ {
  p.Go(func() {
   fmt.Println("Q1mi")
  })
 }
 p.Wait()
}

注意 :對Pool使用Go()提交任務後不允許再調用With系列方法進行配置。

pool.ContextPool

使用WithContext可以創建一個傳遞 Context 的Pool,通過這個父 Context 來控制池中的 goroutine。默認情況下,在取消父 Context 之前,Pool中的 Context 不會取消。

想要在任何任務返回錯誤或出現 panic 時立即取消其 Context,可以通過WithCancelOnError進行配置。

// poolWithContextDemoCancelOnError 支持context的池
// goroutine中出錯時取消context
func poolWithContextDemoCancelOnError() {
 p := pool.New().
  WithMaxGoroutines(4).
  WithContext(context.Background()).
  WithCancelOnError() // 出錯時取消所有goroutine
 // 提交3個任務
 for i := 0; i < 3; i++ {
  i := i
  p.Go(func(ctx context.Context) error {
   if i == 2 {
    return errors.New("cancel all other tasks")
   }
   <-ctx.Done()
   return nil
  })
 }
 err := p.Wait()
 fmt.Println(err)
}

pool.WithErrors

當提交的任務有可能返回錯誤時,可以使用WithErrors得到一個ErrorPool,並通過Wait()獲取可能返回的錯誤。

func poolWithError() {
 p := pool.New().WithErrors()
 for i := 0; i < 3; i++ {
  i := i
  p.Go(func() error {
   if i == 2 {
    return errors.New("oh no!")
   }
   return nil
  })
 }
 err := p.Wait()
 fmt.Println(err)
}

pool.ResultPool

ResultPool是一個執行返回泛型結果的任務池。使用Go()在池中執行任務,然後由Wait()返回任務的結果。

// poolWithResult 執行返回結果的任務池
func poolWithResult() {
 // 創建一個任務池,其中任務返回的結果爲int
 p := pool.NewWithResults[int]()
 for i := 0; i < 10; i++ {
  i := i
  p.Go(func() int {
   return i * 2
  })
 }
 res := p.Wait()
 // 結果的順序是不確定的, 所以這裏先排序再打印
 sort.Ints(res)
 fmt.Println(res)
}

pool.ResultContextPool

ResultContextPool中執行的任務接受一個 context 參數並返回結果。

pool.ResultErrorPool

ResultErrorPool中執行的任務會返回一個泛型結果和錯誤。

Stream

Pool中併發執行任務後返回結果是無序的,conc 中有一個 stream 包提供任務併發、結果有序的實現,適用於在維護結果順序的同時併發執行任務流。

使用Stream時提交的每個任務都返回一個回調函數。每個任務都將在任務池中同時執行,但是回調函數將按照任務提交的順序依次執行。

等到所有任務都提交後,必須調用 Wait()來等待正在運行的 goroutine 運行完。當任務執行過程中或回調函數執行期間出現 panic 時,所有其他任務和回調仍將執行。當調用 Wait()時,panic 將傳給調用方。

Pool一樣,Stream也不適用於非常短的任務。啓動和拆卸會增加幾微秒的開銷,每個任務的開銷大約是 500ns。對於任何需要網絡通話的任務來說,這性能都足夠好了。

// streamDemo 併發的流式任務示例
func streamDemo() {
 times := []int{20, 52, 16, 45, 4, 80}

 s := stream.New()
 for _, millis := range times {
  dur := time.Duration(millis) * time.Millisecond
  // 提交任務
  s.Go(func() stream.Callback {
   time.Sleep(dur)
   // 雖然上一行通過sleep增加了時間
   // 但最終結果仍按任務提交(s.Go)的順序打印
   return func() { fmt.Println(dur) }
  })
 }
 s.Wait()
}

輸出結果:

20ms
52ms
16ms
45ms
4ms
80ms

iter

conc 包還提供了一個 iter。在iter包中提供了基於泛型實現的iteratormapper,封裝了以下四種日常開發中常用的迭代方法。

func ForEach[T any](input []T, f func(*T))
func ForEachIdx[T any](input []T, f func(int, *T))
func Map[T, R any](input []T, f func(*T) R) []R
func MapErr[T, R any](input []T, f func(*T) (R, error)) ([]R, error)

初始狀態下可直接調用iter包的上述函數。

// iterDemo 迭代器ForEach示例
func iterDemo() {
 input := []int{1, 2, 3, 4}
 // 可直接調用iter包的ForEach函數
 iter.ForEach(input, func(v *int) {
  if *v%2 != 0 {
   *v = -1
  }
 })
 fmt.Println(input)
}

如果想要更定製化的功能,可以自行構造iteratormapper

iterator 示例

iterator適用於對序列中的每個元素執行統一操作的場景。

// iteratorDemo 迭代器示例
func iteratorDemo() {
 input := []int{1, 2, 3, 4}
 // 創建一個最大goroutine個數爲輸入元素一半的迭代器
 iterator := iter.Iterator[int]{
  MaxGoroutines: len(input) / 2,
 }

 iterator.ForEach(input, func(v *int) {
  if *v%2 != 0 {
   *v = -1
  }
 })

 fmt.Println(input)
}

輸出:

[-1 2 -1 4]

mapper 示例

mapper是一個帶有結果類型的迭代器,適用於遍歷序列中的每個元素執行統一操作後拿到返回結果的場景。

// mapperDemo mapper示例
func mapperDemo() {
 input := []int{1, 2, 3, 4}
 // 創建一個最大goroutine個數爲輸入元素一半的映射器
 mapper := iter.Mapper[int, bool]{
  MaxGoroutines: len(input) / 2,
 }

 results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
 fmt.Println(results)
}

輸出:

[false true false true]

panics

conc 下提供了一個處理 panic 相關的 panics 包。其中panics.Catcher是用來捕獲任務運行時可能出現的 panic。具體使用方法是通過 Try() 執行任務函數,該函數將捕獲任何產生的 panic。Try()可以在任意 goroutine 中被調用任意次數。一旦所有調用完成後,使用 Recovered()獲取第一個 panic(如果有的話) 的值,或者使用 Repanic()傳遞 panic(重新 panic)。

// panicDemo recover可能出現的異常
func panicDemo() {
 var pc panics.Catcher
 i := 0
 pc.Try(func() { i += 1 })
 pc.Try(func() { panic("abort!") })
 pc.Try(func() { i += 1 })

 // recover可能出現的panic
 rc := pc.Recovered()
 // 重新panic
 // pc.Repanic()

 fmt.Println(i)
 fmt.Println(rc.Value.(string))
}

總結

對於不太熟悉併發編程的初學者來說 conc 提供了一套簡單易用的併發工具,同時其代碼實現也比較簡潔,所以很適合用來學習源碼。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Kil56G3YtE2qQf0aKSZd0A