Go 語言 errgroup 庫的使用方式和實現原理

大家好,我是 frank。
歡迎大家關注「Golang 語言開發棧」公衆號。

01  介紹

在 Go 語言中,我們可以使用 errgroup 庫處理 goroutine 中的錯誤。

errgroup 庫最近更新了,新增支持限制併發數量的功能。

本文我們介紹 errgroup 庫的使用方式和實現原理。

02  使用方式

errgroup 庫使用非常簡單,我們通過三個簡單示例代碼,分別介紹三種使用方式。

基礎使用

func main() {
 eg := errgroup.Group{}
 eg.Go(func() error {
  fmt.Println("go1")
  return nil
 })
 eg.Go(func() error {
  fmt.Println("go2")
  err := errors.New("go2 err")
  return err
 })
 err := eg.Wait()
 if err != nil {
  fmt.Println("err =", err)
 }
}

閱讀上面這段代碼,我們使用 errgroup 庫的 Go() 方法啓動兩個 goroutine,分別模擬錯誤 goroutine 和正常 goroutine

然後,使用 errgroup 庫的 Wait() 方法判斷是否有 goroutine 返回錯誤信息。

附加 cancel 功能

func main() {
 eg, ctx := errgroup.WithContext(context.Background())
 eg.Go(func() error {
  time.Sleep(1 * time.Second)
  select {
  case <-ctx.Done():
   fmt.Println("go1 cancel, err = ", ctx.Err())
  default:
   fmt.Println("go1 run")
  }
  return nil
 })
 eg.Go(func() error {
  err := errors.New("go2 err")
  return err
 })
 err := eg.Wait()
 if err != nil {
  fmt.Println("err =", err)
 }
}

閱讀上面這段代碼,我們使用 errgroup 庫的 WithContext() 函數,可以附加 cancel 功能。

我們在第一個使用 Go() 方法啓動的協程函數中,使用 select ... case ... default 監聽其他協程是否返回錯誤並做出相應的邏輯處理。

限制併發數量

func main() {
 eg := errgroup.Group{}
 eg.SetLimit(2)
 eg.TryGo(func() error {
  fmt.Println("go1 run")
  return nil
 })
 eg.TryGo(func() error {
  err := errors.New("go2 err")
  return err
 })
 eg.TryGo(func() error {
  fmt.Println("go3 run")
  return nil
 })
 err := eg.Wait()
 if err != nil {
  fmt.Println("err =", err)
 }
}

閱讀上面這段代碼,我們使用 errgroup 庫新增的限制併發數量的功能。

首先,使用 SetLimit() 方法設置併發數量,然後使用 TryGo() 方法替換 Go() 方法。

03  實現原理

我們通過閱讀 errgroup 庫的源碼,簡單介紹 errgroup 的實現原理。

我們先閱讀 Group 結構體的源碼。

type Group struct {
 cancel func()

 wg sync.WaitGroup

 sem chan token

 errOnce sync.Once
 err     error
}

在源碼中,我們可以發現 Group 結構體包含的 5 個字段,其中 sem 字段是最近爲了實現限制併發數量功能而新增的。

通過 Group 結構體的字段,我們可以看出 errgroup 實際上是對 sync 和 context 的封裝。

其中,cancel 是使用 context 的 cancel 方法;wg 是使用 sync.WairGroup 的相關方法;sem 是通過 channel 實現控制併發數量;errOnce 是使用 sync.Once 的特性,只保存第一個返回的 goroutine 錯誤;err 是 goroutine 返回的錯誤。

func (g *Group) Go(f func() error) {
 if g.sem != nil {
  g.sem <- token{}
 }

 g.wg.Add(1)
 go func() {
  defer g.done()

  if err := f(); err != nil {
   g.errOnce.Do(func() {
    g.err = err
    if g.cancel != nil {
     g.cancel()
    }
   })
  }
 }()
}

我們閱讀 errgroup 庫的 Go() 方法,首先,通過判斷 g.sem 的值是否是 nil,如果 g.sem 的值不是 nil,說明已設置併發數量,就通過向 g.sem 中發送一個空結構體 token{},來搶佔資源。

如果搶到資源,就啓動一個 goroutine,否則,就阻塞,等待其他正在執行的 goroutine 釋放一個資源。

細心的讀者可能已經發現,Go() 方法除了開頭新增判斷 g.sem 的值是否爲 nil 的邏輯代碼之外,defer 也發生了變化,由之前的直接調用 sync.WaitGroup 的 Done() 方法,改爲調用 errgroup 庫新增的 done() 方法。

done() 方法源碼:

func (g *Group) done() {
 if g.sem != nil {
  <-g.sem
 }
 g.wg.Done()
}

通過閱讀 done() 方法的源碼,我們可以發現,在調用 sync.WaitGroup 的 Done() 方法之前,先判斷 g.sem 的值是否是 nil,如果不是 nil,則釋放資源。

我們再閱讀 Wait() 方法的源碼:

func (g *Group) Wait() error {
 g.wg.Wait()
 if g.cancel != nil {
  g.cancel()
 }
 return g.err
}

通過閱讀 Wait() 方法的源碼,我們可以發現它實際上是封裝 sync.WaitGroup 的 Wait() 方法,和 context 包的 cancel,並且返回所有運行的 goroutine 中第一個返回的錯誤。

最後,我們閱讀新增控制併發數量的功能 TryGo() 方法和 SetLimit() 方法的源碼:

func (g *Group) TryGo(f func() error) bool {
 if g.sem != nil {
  select {
  case g.sem <- token{}:
   // Note: this allows barging iff channels in general allow barging.
  default:
   return false
  }
 }

 g.wg.Add(1)
 go func() {
  defer g.done()

  if err := f(); err != nil {
   g.errOnce.Do(func() {
    g.err = err
    if g.cancel != nil {
     g.cancel()
    }
   })
  }
 }()
 return true
}

通過閱讀 TryGo() 方法的源碼,我們可以發現,它和 Go() 方法的區別就是在處理 g.sem 的值上,使用的邏輯不同。

TryGo() 方法在處理 g.sem 的值時,使用 select ... case ... default 語句,先嚐試一次搶佔資源,當無法搶到資源時,不再阻塞,而是直接返回 false,表示執行失敗。

SetLimit() 方法的源碼:

func (g *Group) SetLimit(n int) {
 if n < 0 {
  g.sem = nil
  return
 }
 if len(g.sem) != 0 {
  panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
 }
 g.sem = make(chan token, n)
}

通過閱讀 SetLimit() 方法的源碼,我們可以看出當入參 n 的值小於 0 時,直接給 g.sem 賦值爲 nil,表示不限制併發數量。

在調用 SetLimit() 方法時,g.sem 必須是一個空通道,否則程序會 panic

除去 SetLimit() 方法的判斷邏輯代碼,實際上 SetLimit() 方法就是創建一個大小爲 n 的有緩衝 channel

SetLimit() 和 TryGo() 通常一起使用。

04  總結

本文我們介紹 Go 方法提供的 errgroup 庫,該庫最近新增了控制併發數量的功能。

我們先介紹了三種使用方式,然後通過閱讀源碼,分析其實現原理。

errgroup 庫的代碼不多,建議感興趣的讀者朋友們閱讀完整源碼。

參考資料:

  1. https://github.com/golang/sync/blob/master/errgroup/errgroup.go

  2. https://pkg.go.dev/golang.org/x/sync@v0.1.0/errgroup

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/R8f_-cl98th7ZiSVxaeUAQ