Golang 之 WaitGroup 源碼解析
如果我們有一個大的任務要做,我們會嘗試將這個任務分解,分解完成之後併發交由 goroutine 去做,並且我需要當全部的任務完成之後再進行下面的步驟,在 sync 包下,就有這樣一個東西適合上述情況,WaitGroup,今天我們來看看具體它是怎麼實現的。
PS:在下面我統一用 wg 來簡稱 WaitGroup
使用
它的使用非常簡單,如下:
func main () {
wg := sync.WaitGroup {}
for i := 0; i < 10; i++ {
wg.Add (1)
go func (job int) {
defer wg.Done ()
//do something
fmt.Printf ("job % d done\n", job)
}(i)
}
wg.Wait ()
fmt.Println ("all done")
}
輸出:
job 9 done
job 1 done
job 0 done
job 8 done
job 7 done
job 3 done
job 6 done
job 2 done
job 4 done
job 5 done
all done
我們可以看到,使用非常簡單,每次有一個任務就使用 Add 方法加一個,每次做完任務就使用 Done 方法告訴它已經完成了,而 Wait 就是等着所有的任務完成。
思考問題
在看 wg 的實現之前,首先來問幾個問題,來考考自己。
-
Wait 方法能否被多次調用,比如再開一個 goroutine 去 wait
-
Wait 方法調用後是否還能再繼續調用 Add 添加任務
-
每次只能 Done 一個任務,能否一次性 Done 多個任務呢
-
wg 能否被拷貝或作爲參數傳遞
-
如果讓你自己實現一個,你會如何實現
前幾個問題,如果你都能很清楚的回答,那麼你對 wg 的瞭解可以說已經非常熟悉了。首選我來說一下對於最後的一個問題的回答,因爲在看源碼之前我都會想想如果是我,我會如何去實現,那麼我想的也很簡單。
-
使用一個變量進行計數
-
每次任務數量變更時使用 atom 原子操作 + 1 或者 - 1
-
-1 時判斷任務數量是否已經爲 0
-
如果爲 0 向一個 channel 裏面發送消息
-
所有 wait 的地方監聽 channel 的消息,收到消息則證明任務全部完成
源碼分析
結構
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
//compilers do not ensure it. So we allocate 12 bytes and then use
//the aligned 8 bytes in them as state, and the other 4 as storage
//for the sema.
state1 [3] uint32
}
結構非常簡單,就只有兩個熟悉,一個 noCopy 還有一個 state1(我也很好奇爲什麼要用 1 來結尾命名,大佬的想法總是很奇妙)
noCopy:sync 包下的一個特殊標記吧,vet 檢查,如果有拷貝的變量則會報錯
func main () {
wg := sync.WaitGroup {}
w := wg
fmt.Println (w, wg)
}
你 run 肯定沒問題的,但是如果你使用 go vet 做個檢查就有警告了
➜ go vet main.go
# command-line-arguments
./main.go:10:10: assignment copies lock value to w: sync.WaitGroup contains sync.noCopy
./main.go:11:17: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
./main.go:11:20: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
state1:是用來存放任務計數器和等待者計數器的(我一看到這個結構就明白肯定後面又是位操作這樣的高端操作了)
其中 waiter 是等待者計數,counter 是任務計數,sema 是信號量
奇怪的是在 64 位還 32 位操作系統上是不一樣的,具體原因以及對於它操作請繼續看下去
state
//state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state () (statep *uint64, semap *uint32) {
if uintptr (unsafe.Pointer (&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer (&wg.state1)), &wg.state1 [2]
} else {
return (*uint64)(unsafe.Pointer (&wg.state1 [1])), &wg.state1 [0]
}
}
這個方法是一個內部方法,就是將 state1 中存儲的狀態取出來,返回值 statep 就是計數器的狀態,semap 是信號量
Done
func (wg *WaitGroup) Done () { wg.Add (-1) } 沒想到吧~居然 Done 就是調用 Add 並傳遞一個 - 1
所以其實我們完全可以再外部調用 Add 傳遞一個 - 3 一次性結束 3 個任務
Add
func (wg *WaitGroup) Add (delta int) {
// 首先獲取狀態值
statep, semap := wg.state ()
// 對於 statep 中 counter + delta
state := atomic.AddUint64 (statep, uint64 (delta)<<32)
// 獲取任務計數器的值
v := int32 (state >> 32)
// 獲取等待者計數器的值
w := uint32 (state)
// 任務計數器不能爲負數
if v < 0 {
panic ("sync: negative WaitGroup counter")
}
// 已經有人在等待,但是還在添加任務
if w != 0 && delta > 0 && v == int32 (delta) {
panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 沒有等待者或者任務還有沒做完的
if v > 0 || w == 0 {
return
}
// 有等待者,但是在這個過程中數據還在變動
if *statep != state {
panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 重置狀態,並用發出等同於等待者數量的信號量,告訴所有等待者任務已經完成
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease (semap, false, 0)
}
}
這裏有幾個要點我們其實已經看到了:
Wait 的 過程中 是不能 Add 的,不然就會 panic,要注意 雖然我們可以藉助 Add 一個負數來一次性結束多個任務,但是如果任務數量控制的不好,變成負數也會 panic,Done 次數多了也一樣 wg 是通過信號量來通知的,當然可以有很多人在等,wg 它都會一一通知到位的
Wait
func (wg *WaitGroup) Wait () {
// 先獲取狀態
statep, semap := wg.state ()
for {
// 這裏注意要用 atomic 的 Load 來保證一下寫操作已經完成
state := atomic.LoadUint64 (statep)
// 同樣的,這裏是任務計數
v := int32 (state >> 32)
// 這裏是等待者計數
w := uint32 (state)
// 如果沒有任務,那麼直接結束,不用等待了
if v == 0 {
return
}
// 使用 cas 操作,如果不相等,證明中間已經被其他人修改了狀態,重新走 for 循環
// 注意這裏 if 進去之後等待者的數量就 +1 了
if atomic.CompareAndSwapUint64 (statep, state, state+1) {
// 等待信號量
runtime_Semacquire (semap)
// 如果信號量來了,但是狀態還不是 0,則證明 wait 之後還是在人在 add,證明有人想充分利用 wg 但是時機不對
if *statep != 0 {
panic ("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
其實 wait 雖然簡單,也有要點
通過 load 和 cas 操作 + 循環來避免了鎖,其實這個操作可以學一下 其實這裏也說明明白了,wg 可以重用,但是你必須等到 wait 全部完成之後再說 其他注意點
func main () {
wg := sync.WaitGroup {}
for i := 0; i < 10; i++ {
wg.Add (1)
go func (job int) {
doJob (job, wg)
}(i)
}
wg.Wait ()
fmt.Println ("all done")
}
func doJob (job int, wg sync.WaitGroup) {
fmt.Printf ("job % d done\n", job)
wg.Done ()
}
上面的代碼有問題嗎?問題在哪呢?
其實很簡單,wg 作爲一個參數傳遞的時候,wg 還是一個普通的結構體,我們在函數中操作的時候還是操作的一個拷貝的變量而已,對於原來的 wg 是不會改變的,所以這裏需要傳遞指針纔是正確的
func main () {
wg := &sync.WaitGroup {}
for i := 0; i < 10; i++ {
wg.Add (1)
go func (job int) {
doJob (job, wg)
}(i)
}
wg.Wait ()
fmt.Println ("all done")
}
func doJob (job int, wg *sync.WaitGroup) {
fmt.Printf ("job % d done\n", job)
wg.Done ()
}
但是其實並不推薦這樣去傳遞 wg,因爲這樣很容易出現問題,一個不好就出問題了,個人還是建議直接在使用 goroutine 之後馬上接一個 defer wg.Done () 來的更加靠譜一些
總結
回過頭來看看,之前的問題也都有了答案:
-
Wait 可以被調用多次,並且每個都會收到完成的通知
-
Wait 之後,如果再 Wait 的過程中不能在 Add,否則會 panic,但是 Wait 結束之後可以繼續使用 Add 進行重用
-
可以使用 Add 傳遞負數的方式一次性結束多個任務,但是需要保證任務計數器非負,否則會 panic
-
wg 作爲參數傳遞的時候需要注意傳遞指針,或者儘量避免傳遞
-
官方利用位操作節約了空間,存在在同一個地方;利用信號量來實現任務結束的通知….
總的來說 wg 的實現還是非常簡單的,需要注意的就是幾個使用上的點不要出現意外即可。
轉自:LinkinStar
linkinstar.wiki/2020/03/15/golang/source-code/sync-waitgroup-source-code
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/B7DOM73svpkNNhwLLDOPeQ