從源代碼中窺探 Go 的 WaitGroup 實現和應用
sync.WaitGroup Overview
Go 作爲雲原生開發的代表,以其在併發編程中的易用性而聞名。在大多數情況下,人們會在處理併發時使用 WaitGroup。我經常想要了解它是如何工作的,所以本文主要談談我對 WaitGroup 的理解。
在 Go 語言中,sync.WaitGroup 允許主程序或其他 goroutines 在繼續執行之前等待多個 goroutines 執行完畢。
它主要用於以下情況:
-
等待一組執行程序完成:當我們有多個併發任務需要執行,並希望在所有這些任務完成後繼續執行後續操作時。
-
確保資源釋放:在併發操作中,爲了避免資源競爭和數據不一致,有必要在釋放資源前確保所有 goroutine 都已執行完畢。
例如:
package main
import (
"fmt"
"sync"
)
func main() {
var counter int64
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := 0; j < 1000; j++ {
mu.Lock()
counter++
mu.Unlock()
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("Final Counter:", counter)
}
sync.WaitGroup in Go 1.17:
Go 1.20 之前的結構有一些巧妙的地方,因此本文將以 Go 1.17 爲例重點講解。
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
-
nocopy 是一種防止結構被複制的保護機制,將在後面介紹。
-
state1 主要存儲計數狀態和 semaphore,我們接下來將重點討論。
要理解註釋的內容,首先需要了解內存對齊方式,以及在 Add() 和 Wait() 中如何使用 state1。內存對齊要求數據地址必須是某個值的倍數,這可以提高 CPU 讀取內存數據的效率:
-
32 位對齊:數據的起始地址必須是 4 的倍數
-
64 位對齊:數據的起始地址必須是 8 的倍數
在 Add() 和 Wait() 中,計數器和等待器合併爲一個 64 位整數使用。
statep, semap := wg.state()
...
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
當需要更改計數器和等待器的值時,64 位整數會通過原子方式進行原子操作。但原子中你有一些需要注意的使用點,golang 官方文檔 sync/atomic/PKG - note - bugs[1] 中就有這樣的內容:
在 ARM、386 和 32 位 MIPS 上,調用者有責任安排通過原始原子函數原子訪問的 64 位字的 64 位對齊(Int64 和 Uint64 類型自動對齊)。分配的結構體、數組或片段中的第一個字;全局變量中的第一個字;或局部變量中的第一個字(因爲所有原子操作的對象都會逃逸到堆中)都可以依賴於 64 位對齊。
基於這一前提,在 32 位系統中,我們需要自己保證 "count+waiter" 的 64 位對齊。那麼問題來了,如果是你來實現,該如何寫呢?
state()
讓我們來看下官方的實現:
state1 [3]uint32
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state()(statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1)) % 8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
在 64 位系統上,都符合 8 字節對齊要求。而在 32 位系統上,也可能是這樣。
在不符合 8 字節對齊要求的 32 位系統上,sema 字段向前移動 4 個字節,以確保狀態字段符合 8 字節對齊要求。
只需重新安排 sema 字段的位置,我們就能保證計數器 + 等待器始終對齊 64 位邊界,這確實非常聰明。
簡化實現流程
現在,讓我們考慮一下原始結構,爲簡單起見,忽略內存對齊和併發安全因素:
type WaitGroup struct {
counter int32
waiter uint32
sema uint32
}
-
計數器表示尚未完成的任務數。WaitGroup.Add(n) 將導致計數器 += n,而 WaitGroup.Done() 將導致計數器 --。
-
waiter 表示調用了 WaitGroup.Wait 的程序數目。
-
sema 對應 Go 運行時的內部信號實現。在 WaitGroup 中,我們使用了兩個相關函數:runtime_Semacquire 和 runtime_Semrelease。runtime_Semacquire 會增加一個 semaphore 並暫停當前的 goroutine。
注意,這只是一個簡化的實施過程,實際代碼可能更加複雜。
Add()、Done()、Wait()
可以先閱讀下這段代碼 cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go[2]
結合我們常見的使用場景,關鍵流程如下:
調用 WaitGroup.Add(n) 時,計數器將按 n 遞增:counter += n
state := atomic.AddUint64(statep, uint64(delta)<<32)
在調用 WaitGroup.Wait() 時,它將遞增 waiter++ 並調用 runtime_Semacquire(semap) 來增加 semaphore 並暫停當前的 goroutine。
if atomic.CompareAndSwapUint64(statep, state, state+1) {
...
runtime_Semacquire(semap)
...
當調用 WaitGroup.Done() 時,它將遞減計數器 --。如果遞減後的計數器等於 0,則表示 WaitGroup 的等待進程已經結束,我們需要調用 runtime_Semrelease 來釋放 semaphore,並喚醒 WaitGroup.Wait 上等待的程序。
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
Go 1.20 中的 WaitGroup
cs.opensource.google/go/go/+/refs/tags/go1.20:src/sync/waitgroup.go[3]
相信有人已經注意到了一個問題,即計數器和等待器在更改時需要確保併發安全。爲什麼不直接使用 atomic.Uint64 呢?
這是因爲 atomic.Uint64 只在 1.17 以後的版本中才受支持。
在 Go 1.20 中,我們可以注意到內存對齊邏輯被 atomic.Uint64 所取代,雖然在 Go 1.20 的發佈說明中沒有提及,但我們可以從中學習到很多東西。
Reference: sync: use atomic.Uint64 for WaitGroup state[4]
noCopy
在 waitGroup 結構中,我們看到了 noCopy。爲什麼需要 noCopy?讓我們來看一個例子:
package main
import "fmt"
// Define a struct type
type Person struct {
Name string
Age int
}
func main() {
// Create a struct instance
person := Person{Name: "Alice", Age: 30}
// Create a pointer to the struct
p := &person
// Access and modify the struct's fields through the pointer
fmt.Println(p.Name) // Output: Alice
fmt.Println((*p).Name) // Output: Alice
p1 := p
p.Age = 32
fmt.Println(p.Age) // Output: 32
fmt.Println(p1.Age) // Output: 32
}
在 Go 中,指針複製是一種淺層複製,即只複製頂層結構。如果原始結構及其副本都指向相同的底層數據,這可能會導致意想不到的行爲。如果一個結構的數據被修改,可能會影響到另一個結構。
使用 noCopy 字段有助於進行靜態編譯檢查。使用 go vet,可以檢查對象或對象中的字段是否已被複制。
關於 WaitGroup 的說明
探索使用 WaitGroup 時的一些限制和潛在隱患,並學習如何避免這些問題。如果你看過 Go 源代碼,可能會注意到下面這些總結要點的經典註釋:
-
Add() 操作必須在 Wait() 操作之前執行。
-
調用 Done() 的次數必須與 Add() 設置的計數器值一致。
-
如果計數器的值小於 0,就會出現 panic
-
不能同時調用 Add() 和 Wait();例如,在兩個不同的程序中調用這兩個函數會導致 panic。
-
必須等到 Wait() 完成後,才能對 WaitGroup 進行後續調用。
Semaphores
在上一節中,我們提到了 semaphores,它是一種保護共享資源和防止多個線程同時訪問同一資源的機制。讓我們來看看 Semaphores 在 Unix/Linux 系統中是如何工作的:
一個 Semaphore 包含一個非負整數變量和兩個原子操作:等待(下)和信號(上)。等待操作也可稱爲 P 或 down,它將值遞減 1;而信號操作也稱爲 V 或 up,它將值遞增 1。Semaphores 使用原子操作來實現對併發資源的控制。
-
等待(P,向下)操作:如果 semaphore 的非負整數變量 S > 0,wait 將遞減它;如果 S = 0,wait 將阻塞線程。
-
信號(V,向上)操作:遞增後,如果遞增前的值爲負數(表示有進程在等待資源),則被阻塞的進程將從 semaphore 的等待隊列移到就緒隊列;如果沒有線程被阻塞在 semaphore 上,則 signal 會簡單地在 S 上加 1。
這與 Go 中使用 WaitGroup 的常見情況一致:
-
首先使用 runtime_Semacquire(semap) 執行 Wait(),這樣會將 semap 設置爲 0,並增加 semaphore 和暫停當前程序。
-
當所有運行程序都完成了 Done() 執行後,執行 runtime_Semrelease 以釋放寄存器,並喚醒 WaitGroup.Wait 上等待的運行程序。
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
}
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
例如,讓我們來看看 semacquire1(等待、P、向下):
- 嘗試獲取信號:
if cansemacquire(addr) {
return
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
- 阻止並等待:
for {
...
if cansemacquire(addr) {
root.nwait.Add(-1)
unlock(&root.lock)
break
}
root.queue(addr, s, lifo)
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
...
}
參考資料
[1]
pkg-note-BUG: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
[2]
waitgroup.go: https://cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go
[3]
waitgroup-1.20.go: https://cs.opensource.google/go/go/+/refs/tags/go1.20:src/sync/waitgroup.go
[4]
sync: use atomic.Uint64 for WaitGroup state: https://groups.google.com/g/golang-checkins/c/2jn_9xpsah8
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/zeEl6vvTVElpMGAqiCF3Uw