【Go】 errgroup 併發小工具

使用場景:微服務中的併發請求

併發編程是 Golang 語言的強大特性之一。在微服務架構中,面對用戶的請求,我們常常需要向下遊請求大量的數據繼而組裝成所需數據,不同的數據很可能會由不同的服務提供,這裏一一請求顯然是效率十分低效的,所以併發成爲提高響應效率的優選方法。

errgroup 庫

基礎版本安裝


$ go get -u golang.org/x/sync/errgroup

加強版本 https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgroup

演變歷程

channel 版本


    res_ch := make(chan interface{},3)
    go func() {
        r := funA()
        res_ch <- r
    }()
    go func() {
        r := funB()
        res_ch <- r
    }()
    go func() {
        r := funC()
        res_ch <- r
    }()
    res := make([]interface{},0,3)
    for i := 0; i < 3; i++ {
        data := <- res_ch
        res = append(res,data)
    }

此版本運用了官方推薦的用於 goroutine 通信的 channel 結構。預計完整接收 goroutine 的結果。

問題 1:goroutine 數量控制較爲繁瑣

問題 2:若 goroutine 內部發生錯誤,會導致接收程序阻塞,無法正常退出

基本版本 errgroup

源碼

    //源代碼結構
    type Group struct {
     cancel func()

     wg sync.WaitGroup

     errOnce sync.Once
     err     error
    }

    func WithContext(ctx context.Context) (*Group, context.Context) {
        ctx, cancel := context.WithCancel(ctx)
        return &Group{cancel: cancel}, ctx
    }

    func (g *Group) Wait() error {
        g.wg.Wait()
        if g.cancel != nil {
            g.cancel()
        }
        return g.err
    }

    func (g *Group) Go(f func() error) {
        g.wg.Add(1)

        go func() {
            defer g.wg.Done()

            if err := f(); err != nil {
                g.errOnce.Do(func() {
                    g.err = err
                    if g.cancel != nil {
                        g.cancel()
                    }
                })
            }
        }()
    }

閱讀源碼我們可以得知,Group 結構中使用 sync.WaitGroup 來控制 goroutine 的併發,成員變量 err 來記錄運行中發生的錯誤,這裏只記錄第一次返回的錯誤值。

使用

group,ctx := errgroup.WithContent(context.Background())
urls :=[]string{
    ...
}
for _,v := range urls {
    group.Go(func()error{
        resp,err := http.Get(v)
        if err != nil {
            resp.Body.Close()
        }
        ...
        return err
    })
}
if err := g.Wait();err != nil {
    fmt.Println(err)
}

一些說明

加強版本

下面是 kratos 的 errgroup 加強版,其針對幾個問題作出的改進。

//基礎版本
type Group struct {
 cancel func()

    wg sync.WaitGroup

    errOnce sync.Once
    err     error
}    

//kratos 版本
type Group struct {
    err     error
    wg      sync.WaitGroup
    errOnce sync.Once

    workerOnce sync.Once
    ch         chan func(ctx context.Context) error
    chs        []func(ctx context.Context) error

    ctx    context.Context
    cancel func()
}

我們先從結構體定義的角度來看待加強點。

控制併發數量源碼分析

func (g *Group) Go(f func(ctx context.Context) error) {
 g.wg.Add(1)
 if g.ch != nil {
  select {
  case g.ch <- f:
  default:
   g.chs = append(g.chs, f)
  }
  return
 }
 go g.do(f)
}

func (g *Group) GOMAXPROCS(n int) {
 if n <= 0 {
  panic("errgroup: GOMAXPROCS must great than 0")
 }
 g.workerOnce.Do(func() {
  g.ch = make(chan func(context.Context) error, n)
  for i := 0; i < n; i++ {
   go func() {
    for f := range g.ch {
     g.do(f)
    }
   }()
  }
 })
}

func (g *Group) Wait() error {
 if g.ch != nil {
  for _, f := range g.chs {
   g.ch <- f
  }
 }
 g.wg.Wait()
 if g.ch != nil {
  close(g.ch) // let all receiver exit
 }
 if g.cancel != nil {
  g.cancel()
 }
 return g.err
}

從 Go 函數中我們看到,當 g.ch != nil 時,f 函數首先嚐試進入 g.ch 中,當 g.ch 滿的時候存入 g.chs 中,這就是上面提到的,利用 chan 控制併發數量,利用 slice 作爲函數指針的緩存。

GOMAXPROCE 函數初始化 g.ch 用於開啓併發數量控制的開關。並且啓動 n 個 goroutine 來消費傳入的函數。

Wait 函數中會不斷將緩存中的函數不斷壓入 chan 中進行消費。

使用案例

func sleep1s(context.Context) error {
 time.Sleep(time.Second)
 return nil
}   

{
    ...
    g := Group{}
    g.GOMAXPROCS(2)//開啓併發控制
    g.Go(sleep1s)
    g.Go(sleep1s)
    g.Go(sleep1s)
    g.Go(sleep1s)
    g.Wait()
    ....
}

總結

errgroup 在 sync.WaitGroup 的功能之上添加了錯誤傳遞,以及在發生不可恢復的錯誤時取消整個 goroutine 集合的功能 (返回值 cancel)。

kratos 的加強版 errgroup 從統一 goroutine 控制,defer 錯誤捕獲,併發數量控制等方面對 errgroup 進行了功能擴充,利用匿名函數的參數 context.Context 的參數傳遞從整體上控制 goroutine 的生命週期。

參考資料

https://github.com/golang/sync/blob/master/errgroup/errgroup.go

https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgrou

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/B9F1Ta0QWJZNpkzq8URGRQ