Go WaitGroup 詳解
WaitGroup 概述
WaitGroup
在 go 語言中,用於線程同步,單從字面意思理解,wait
等待的意思,group
組、團隊的意思,WaitGroup
就是指等待一組,等待一個系列執行完成後纔會繼續向下執行。
WatiGroup
是sync
包中的一個struct
類型,用來收集需要等待執行完成的goroutine
。下面是它的定義:
// WaitGroup用於等待一組線程的結束。 父線程調用Add方法來設定應等待的線程的數量。
// 每個被等待的線程在結束時應調用Done方法。同時,主線程裏可以調用Wait方法阻塞至所有線程結束。
type WaitGroup struct {
// 包含隱藏或非導出字段
}
// Add方法向內部計數加上delta,delta可以是負數;
// 如果內部計數器變爲0,Wait方法阻塞等待的所有線程都會釋放,如果計數器小於0,返回panic。
// 注意Add加上正數的調用應在Wait之前,否則Wait可能只會等待很少的線程。
// 一般來說本方法應在創建新的線程或者其他應等待的事件之前調用。
func (wg *WaitGroup) Add(delta int)
// Done方法減少WaitGroup計數器的值,應在線程的最後執行。
func (wg *WaitGroup) Done()
// Wait方法阻塞直到WaitGroup計數器減爲0。
func (wg *WaitGroup) Wait()
sync.WaitGroup 有 3 個方法
- Add():每次激活想要被等待完成的
goroutine
之前,先調用 Add(),用來設置或添加要等待完成的goroutine
數量
例如 Add(2) 或者兩次調用 Add(1) 都會設置等待計數器的值爲 2,表示要等待 2 個
goroutine
完成
-
Done():每次需要等待的
goroutine
在真正完成之前,應該調用該方法來人爲表示goroutine
完成了,該方法會對等待計數器減 1 -
Wait():在等待計數器減爲_ 0_ 之前,_Wait()_ 會一直阻塞當前的
goroutine
綜上所述:
Add() 用來增加要等待的
goroutine
的數量Done() 用來表示
goroutine
已經完成了,減少一次計數器Wait() 用來等待所有需要等待的
goroutine
完成。
使用示例
一個常見的使用場景是:批量發出 RPC 或者 HTTP 請求:
requests := []*Request{...}
wg := sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我們可以通過 sync.WaitGroup 將原本順序執行的代碼在多個 Goroutine 中併發,如下圖:
結構體
sync.WaitGroup 結構體中只包含兩個成員:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
-
noCopy —— 保證 sync.WaitGroup 不會被開發者通過再賦值的方式賦值
-
state1 —— 存儲狀態和信號量
sync.noCopy 是一個特殊的私有結構體,源碼包中的分析器會在編譯期間檢查被複制的變量中是否含有 sync.noCopy 或者實現了 Lock 和 Unlock 方法,如果包含有該結構體或者實現了對應的方法,就會拋出錯誤:
func main(){
wg := sync.WaitGorup{}
yawg := wg
fmt.Println(wg, yawg)
}tGorp
$ go vet proc.go
./proc.go:10:10: assignment copies lock value to yawg: sync.WaitGroup
除了 sync.noCopy,sync.WaitGroup 結構體中還包含一個總共佔用 12 字節的數組,該數組會存儲當前結構體的狀態,在 64 位與 32 位機器中表現也不同,如下圖:
sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能夠幫我們從 state1 字段中取出對應的狀態和信號量。
接口
sync.WaitGroup 對外總共暴露 3 個方法:
-
sync.WaitGroup.Add
-
sync.WaitGroup.Wait
-
sync.WaitGroup.Done
因 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法中傳入了 -1,因此咱們主要分析另外兩個方法:sync.WaitGroup.Add 、sync.WaitGroup.Wait
func (wg *sync.WaitGroup) Add (delta int){
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if V > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的計數器 counter 。
雖然 sync.WaitGroup.Add 方法傳入的參數可以爲_負數_,但是計數器只能是_非負數_,一旦出現負數程序就會崩潰。
當調用的計數器歸零,即所有任務都執行完成時,纔會通過 runtime_Semrelease 喚醒處於等待狀態的 Goroutine。
sync.WaitGroup 的另一個方法 sync.WaitGroup.Wait 會在計數器大於 0 並且不存在等待的 Goroutine 時,調用 runtime.sync_runtime_Semacquire 陷入睡眠狀態:
func (wg *sync.WaitGroup) Wait () {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state + 1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is resused before previous Wait has returned")
}
return
}
}
}
當 sync.WaitGroup 的計數器歸零時,陷入睡眠狀態的 Goroutine 會被喚醒,sync.WaitGroup.Wait 方法也會立刻放回。
小結
通過 sync.WaitGroup 的分析和研究,可以得出以下結論:
-
sync.WaitGroup 必須在 sync.WaitGroup.Wait 方法返回之後才能重新使用
-
sync.WaitGroup.Done 只是對 sync.WaitGroup.Add 方法的簡單封裝,我們可以通過 sync.WaitGroup.Add 方法傳入任意負數(需要保證計數器非負),快速將計數器歸零以喚醒等待的 Goroutine
-
可以同時有多個 Goroutine 等待當前的 sync.WaitGroup 計數器歸零,這些 Goroutine 會被同時喚醒
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/hKfZnVlsGXBtuA-bt1bCLQ