Go 語言 errgroup 庫的使用方式和實現原理
大家好,我是 frank。
歡迎大家關注「Golang 語言開發棧」公衆號。
01 介紹
在 Go 語言中,我們可以使用 errgroup
庫處理 goroutine
中的錯誤。
errgroup
庫最近更新了,新增支持限制併發數量的功能。
本文我們介紹 errgroup
庫的使用方式和實現原理。
02 使用方式
errgroup
庫使用非常簡單,我們通過三個簡單示例代碼,分別介紹三種使用方式。
基礎使用
func main() {
eg := errgroup.Group{}
eg.Go(func() error {
fmt.Println("go1")
return nil
})
eg.Go(func() error {
fmt.Println("go2")
err := errors.New("go2 err")
return err
})
err := eg.Wait()
if err != nil {
fmt.Println("err =", err)
}
}
閱讀上面這段代碼,我們使用 errgroup
庫的 Go()
方法啓動兩個 goroutine
,分別模擬錯誤 goroutine
和正常 goroutine
。
然後,使用 errgroup
庫的 Wait()
方法判斷是否有 goroutine
返回錯誤信息。
附加 cancel 功能
func main() {
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
fmt.Println("go1 cancel, err = ", ctx.Err())
default:
fmt.Println("go1 run")
}
return nil
})
eg.Go(func() error {
err := errors.New("go2 err")
return err
})
err := eg.Wait()
if err != nil {
fmt.Println("err =", err)
}
}
閱讀上面這段代碼,我們使用 errgroup
庫的 WithContext()
函數,可以附加 cancel
功能。
我們在第一個使用 Go()
方法啓動的協程函數中,使用 select ... case ... default
監聽其他協程是否返回錯誤並做出相應的邏輯處理。
限制併發數量
func main() {
eg := errgroup.Group{}
eg.SetLimit(2)
eg.TryGo(func() error {
fmt.Println("go1 run")
return nil
})
eg.TryGo(func() error {
err := errors.New("go2 err")
return err
})
eg.TryGo(func() error {
fmt.Println("go3 run")
return nil
})
err := eg.Wait()
if err != nil {
fmt.Println("err =", err)
}
}
閱讀上面這段代碼,我們使用 errgroup
庫新增的限制併發數量的功能。
首先,使用 SetLimit()
方法設置併發數量,然後使用 TryGo()
方法替換 Go()
方法。
03 實現原理
我們通過閱讀 errgroup
庫的源碼,簡單介紹 errgroup
的實現原理。
我們先閱讀 Group
結構體的源碼。
type Group struct {
cancel func()
wg sync.WaitGroup
sem chan token
errOnce sync.Once
err error
}
在源碼中,我們可以發現 Group
結構體包含的 5 個字段,其中 sem
字段是最近爲了實現限制併發數量功能而新增的。
通過 Group
結構體的字段,我們可以看出 errgroup
實際上是對 sync
和 context
的封裝。
其中,cancel
是使用 context
的 cancel
方法;wg
是使用 sync.WairGroup
的相關方法;sem
是通過 channel
實現控制併發數量;errOnce
是使用 sync.Once
的特性,只保存第一個返回的 goroutine
錯誤;err
是 goroutine
返回的錯誤。
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
}
我們閱讀 errgroup
庫的 Go()
方法,首先,通過判斷 g.sem
的值是否是 nil
,如果 g.sem
的值不是 nil
,說明已設置併發數量,就通過向 g.sem
中發送一個空結構體 token{}
,來搶佔資源。
如果搶到資源,就啓動一個 goroutine
,否則,就阻塞,等待其他正在執行的 goroutine
釋放一個資源。
細心的讀者可能已經發現,Go()
方法除了開頭新增判斷 g.sem
的值是否爲 nil
的邏輯代碼之外,defer
也發生了變化,由之前的直接調用 sync.WaitGroup
的 Done()
方法,改爲調用 errgroup
庫新增的 done()
方法。
done()
方法源碼:
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}
通過閱讀 done()
方法的源碼,我們可以發現,在調用 sync.WaitGroup
的 Done()
方法之前,先判斷 g.sem
的值是否是 nil
,如果不是 nil
,則釋放資源。
我們再閱讀 Wait()
方法的源碼:
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
通過閱讀 Wait()
方法的源碼,我們可以發現它實際上是封裝 sync.WaitGroup
的 Wait()
方法,和 context
包的 cancel
,並且返回所有運行的 goroutine
中第一個返回的錯誤。
最後,我們閱讀新增控制併發數量的功能 TryGo()
方法和 SetLimit()
方法的源碼:
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel()
}
})
}
}()
return true
}
通過閱讀 TryGo()
方法的源碼,我們可以發現,它和 Go()
方法的區別就是在處理 g.sem
的值上,使用的邏輯不同。
TryGo()
方法在處理 g.sem
的值時,使用 select ... case ... default
語句,先嚐試一次搶佔資源,當無法搶到資源時,不再阻塞,而是直接返回 false
,表示執行失敗。
SetLimit()
方法的源碼:
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
通過閱讀 SetLimit()
方法的源碼,我們可以看出當入參 n
的值小於 0
時,直接給 g.sem
賦值爲 nil
,表示不限制併發數量。
在調用 SetLimit()
方法時,g.sem
必須是一個空通道,否則程序會 panic
。
除去 SetLimit()
方法的判斷邏輯代碼,實際上 SetLimit()
方法就是創建一個大小爲 n
的有緩衝 channel
。
SetLimit()
和 TryGo()
通常一起使用。
04 總結
本文我們介紹 Go 方法提供的 errgroup
庫,該庫最近新增了控制併發數量的功能。
我們先介紹了三種使用方式,然後通過閱讀源碼,分析其實現原理。
errgroup
庫的代碼不多,建議感興趣的讀者朋友們閱讀完整源碼。
參考資料:
-
https://github.com/golang/sync/blob/master/errgroup/errgroup.go
-
https://pkg.go.dev/golang.org/x/sync@v0.1.0/errgroup
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/R8f_-cl98th7ZiSVxaeUAQ