【Go】 errgroup 併發小工具
使用場景:微服務中的併發請求
併發編程是 Golang 語言的強大特性之一。在微服務架構中,面對用戶的請求,我們常常需要向下遊請求大量的數據繼而組裝成所需數據,不同的數據很可能會由不同的服務提供,這裏一一請求顯然是效率十分低效的,所以併發成爲提高響應效率的優選方法。
errgroup 庫
基礎版本安裝
$ go get -u golang.org/x/sync/errgroup
加強版本 https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgroup
演變歷程
channel 版本
res_ch := make(chan interface{},3)
go func() {
r := funA()
res_ch <- r
}()
go func() {
r := funB()
res_ch <- r
}()
go func() {
r := funC()
res_ch <- r
}()
res := make([]interface{},0,3)
for i := 0; i < 3; i++ {
data := <- res_ch
res = append(res,data)
}
此版本運用了官方推薦的用於 goroutine 通信的 channel 結構。預計完整接收 goroutine 的結果。
問題 1:goroutine 數量控制較爲繁瑣
問題 2:若 goroutine 內部發生錯誤,會導致接收程序阻塞,無法正常退出
基本版本 errgroup
源碼
//源代碼結構
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
閱讀源碼我們可以得知,Group 結構中使用 sync.WaitGroup 來控制 goroutine 的併發,成員變量 err 來記錄運行中發生的錯誤,這裏只記錄第一次返回的錯誤值。
使用
group,ctx := errgroup.WithContent(context.Background())
urls :=[]string{
...
}
for _,v := range urls {
group.Go(func()error{
resp,err := http.Get(v)
if err != nil {
resp.Body.Close()
}
...
return err
})
}
if err := g.Wait();err != nil {
fmt.Println(err)
}
一些說明
-
Wait 函數在所有 goroutine 運行結束纔會返回,返回值記錄了第一個發生的錯誤。
-
WithContext 函數的第二返回值爲 ctx,Group 會在 goroutine 發生錯誤時調用與 ctx 對應的 cancel 函數,所以 ctx 不適合作爲其他調用的參數。
加強版本
下面是 kratos 的 errgroup 加強版,其針對幾個問題作出的改進。
//基礎版本
type Group struct {
cancel func()
wg sync.WaitGroup
errOnce sync.Once
err error
}
//kratos 版本
type Group struct {
err error
wg sync.WaitGroup
errOnce sync.Once
workerOnce sync.Once
ch chan func(ctx context.Context) error
chs []func(ctx context.Context) error
ctx context.Context
cancel func()
}
我們先從結構體定義的角度來看待加強點。
-
ch、chs、workerOnce 用於控制 goroutine 的併發數量, 在基礎版的代碼中我們發現在使用 Go(function()error) 函數的調用過程中是全開放的,即對於同時進行的 goroutine 數量並沒有做限制。kratos 在基礎版本的基礎上添加了一個 chan 控制併發數量,一個 slice 來緩存爲併發的函數指針。
-
kratos 將產生的 context 對象緩存,並且更改了方法 Go 的函數簽名加入了 context 參數,即 func (g *Group) Go(f func(ctx context.Context) error)。在基礎版本中,當 error 發生的是時候函數,仍然需要等到所有 goroutine 運行結束纔會返回,kratos 的 Group 可以使用成員函數 ctx 作爲參數,從而控制全部併發的生命週期。
控制併發數量源碼分析
func (g *Group) Go(f func(ctx context.Context) error) {
g.wg.Add(1)
if g.ch != nil {
select {
case g.ch <- f:
default:
g.chs = append(g.chs, f)
}
return
}
go g.do(f)
}
func (g *Group) GOMAXPROCS(n int) {
if n <= 0 {
panic("errgroup: GOMAXPROCS must great than 0")
}
g.workerOnce.Do(func() {
g.ch = make(chan func(context.Context) error, n)
for i := 0; i < n; i++ {
go func() {
for f := range g.ch {
g.do(f)
}
}()
}
})
}
func (g *Group) Wait() error {
if g.ch != nil {
for _, f := range g.chs {
g.ch <- f
}
}
g.wg.Wait()
if g.ch != nil {
close(g.ch) // let all receiver exit
}
if g.cancel != nil {
g.cancel()
}
return g.err
}
從 Go 函數中我們看到,當 g.ch != nil 時,f 函數首先嚐試進入 g.ch 中,當 g.ch 滿的時候存入 g.chs 中,這就是上面提到的,利用 chan 控制併發數量,利用 slice 作爲函數指針的緩存。
GOMAXPROCE 函數初始化 g.ch 用於開啓併發數量控制的開關。並且啓動 n 個 goroutine 來消費傳入的函數。
Wait 函數中會不斷將緩存中的函數不斷壓入 chan 中進行消費。
使用案例
func sleep1s(context.Context) error {
time.Sleep(time.Second)
return nil
}
{
...
g := Group{}
g.GOMAXPROCS(2)//開啓併發控制
g.Go(sleep1s)
g.Go(sleep1s)
g.Go(sleep1s)
g.Go(sleep1s)
g.Wait()
....
}
總結
errgroup 在 sync.WaitGroup 的功能之上添加了錯誤傳遞,以及在發生不可恢復的錯誤時取消整個 goroutine 集合的功能 (返回值 cancel)。
kratos 的加強版 errgroup 從統一 goroutine 控制,defer 錯誤捕獲,併發數量控制等方面對 errgroup 進行了功能擴充,利用匿名函數的參數 context.Context 的參數傳遞從整體上控制 goroutine 的生命週期。
參考資料
https://github.com/golang/sync/blob/master/errgroup/errgroup.go
https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgrou
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/B9F1Ta0QWJZNpkzq8URGRQ