從源代碼中窺探 Go 的 WaitGroup 實現和應用

sync.WaitGroup Overview

Go 作爲雲原生開發的代表,以其在併發編程中的易用性而聞名。在大多數情況下,人們會在處理併發時使用 WaitGroup。我經常想要了解它是如何工作的,所以本文主要談談我對 WaitGroup 的理解。

在 Go 語言中,sync.WaitGroup 允許主程序或其他 goroutines 在繼續執行之前等待多個 goroutines 執行完畢。

它主要用於以下情況:

例如:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var counter int64
    var mu sync.Mutex
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            for j := 0; j < 1000; j++ {
                mu.Lock()
                counter++
                mu.Unlock()
            }
            wg.Done()
        }()
    }

    wg.Wait()
    fmt.Println("Final Counter:", counter)
}

sync.WaitGroup in Go 1.17:

Go 1.20 之前的結構有一些巧妙的地方,因此本文將以 Go 1.17 爲例重點講解。

type WaitGroup struct {
 noCopy noCopy

 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
 // 64-bit atomic operations require 64-bit alignment, but 32-bit
 // compilers do not ensure it. So we allocate 12 bytes and then use
 // the aligned 8 bytes in them as state, and the other 4 as storage
 // for the sema.
 state1 [3]uint32
}

要理解註釋的內容,首先需要了解內存對齊方式,以及在 Add() 和 Wait() 中如何使用 state1。內存對齊要求數據地址必須是某個值的倍數,這可以提高 CPU 讀取內存數據的效率:

在 Add() 和 Wait() 中,計數器和等待器合併爲一個 64 位整數使用。

statep, semap := wg.state()
...
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)

當需要更改計數器和等待器的值時,64 位整數會通過原子方式進行原子操作。但原子中你有一些需要注意的使用點,golang 官方文檔 sync/atomic/PKG - note - bugs[1] 中就有這樣的內容:

在 ARM、386 和 32 位 MIPS 上,調用者有責任安排通過原始原子函數原子訪問的 64 位字的 64 位對齊(Int64 和 Uint64 類型自動對齊)。分配的結構體、數組或片段中的第一個字;全局變量中的第一個字;或局部變量中的第一個字(因爲所有原子操作的對象都會逃逸到堆中)都可以依賴於 64 位對齊。

基於這一前提,在 32 位系統中,我們需要自己保證 "count+waiter" 的 64 位對齊。那麼問題來了,如果是你來實現,該如何寫呢?

state()

讓我們來看下官方的實現:

state1 [3]uint32

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state()(statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1)) % 8 == 0 {
        return (*uint64)(unsafe.Pointer(&wg.state1))&wg.state1[2]
    } else {
        return (*uint64)(unsafe.Pointer(&wg.state1[1]))&wg.state1[0]
    }
}

如圖所示:

在 64 位系統上,都符合 8 字節對齊要求。而在 32 位系統上,也可能是這樣。

在不符合 8 字節對齊要求的 32 位系統上,sema 字段向前移動 4 個字節,以確保狀態字段符合 8 字節對齊要求。

只需重新安排 sema 字段的位置,我們就能保證計數器 + 等待器始終對齊 64 位邊界,這確實非常聰明。

簡化實現流程

現在,讓我們考慮一下原始結構,爲簡單起見,忽略內存對齊和併發安全因素:

type WaitGroup struct {
    counter int32
    waiter uint32
    sema    uint32
}

注意,這只是一個簡化的實施過程,實際代碼可能更加複雜。

Add()、Done()、Wait()

可以先閱讀下這段代碼 cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go[2]

結合我們常見的使用場景,關鍵流程如下:

調用 WaitGroup.Add(n) 時,計數器將按 n 遞增:counter += n

state := atomic.AddUint64(statep, uint64(delta)<<32)

在調用 WaitGroup.Wait() 時,它將遞增 waiter++ 並調用 runtime_Semacquire(semap) 來增加 semaphore 並暫停當前的 goroutine。

if atomic.CompareAndSwapUint64(statep, state, state+1) {
    ...
    runtime_Semacquire(semap)
    ...

當調用 WaitGroup.Done() 時,它將遞減計數器 --。如果遞減後的計數器等於 0,則表示 WaitGroup 的等待進程已經結束,我們需要調用 runtime_Semrelease 來釋放 semaphore,並喚醒 WaitGroup.Wait 上等待的程序。

for ; w != 0; w-- {
    runtime_Semrelease(semap, false, 0)
}

Go 1.20 中的 WaitGroup

cs.opensource.google/go/go/+/refs/tags/go1.20:src/sync/waitgroup.go[3]

相信有人已經注意到了一個問題,即計數器和等待器在更改時需要確保併發安全。爲什麼不直接使用 atomic.Uint64 呢?

這是因爲 atomic.Uint64 只在 1.17 以後的版本中才受支持。

在 Go 1.20 中,我們可以注意到內存對齊邏輯被 atomic.Uint64 所取代,雖然在 Go 1.20 的發佈說明中沒有提及,但我們可以從中學習到很多東西。

Reference: sync: use atomic.Uint64 for WaitGroup state[4]

noCopy

在 waitGroup 結構中,我們看到了 noCopy。爲什麼需要 noCopy?讓我們來看一個例子:

package main
import "fmt"

// Define a struct type
type Person struct {
    Name string
    Age int
}

func main() {
    // Create a struct instance
    person := Person{Name: "Alice", Age: 30}

    // Create a pointer to the struct
    p := &person

    // Access and modify the struct's fields through the pointer
    fmt.Println(p.Name)      // Output: Alice
    fmt.Println((*p).Name)   // Output: Alice
    
    p1 := p
    p.Age = 32
    fmt.Println(p.Age)   // Output: 32
    fmt.Println(p1.Age)   // Output: 32
}

在 Go 中,指針複製是一種淺層複製,即只複製頂層結構。如果原始結構及其副本都指向相同的底層數據,這可能會導致意想不到的行爲。如果一個結構的數據被修改,可能會影響到另一個結構。

使用 noCopy 字段有助於進行靜態編譯檢查。使用 go vet,可以檢查對象或對象中的字段是否已被複制。

關於 WaitGroup 的說明

探索使用 WaitGroup 時的一些限制和潛在隱患,並學習如何避免這些問題。如果你看過 Go 源代碼,可能會注意到下面這些總結要點的經典註釋:

Semaphores

在上一節中,我們提到了 semaphores,它是一種保護共享資源和防止多個線程同時訪問同一資源的機制。讓我們來看看 Semaphores 在 Unix/Linux 系統中是如何工作的:

一個 Semaphore 包含一個非負整數變量和兩個原子操作:等待(下)和信號(上)。等待操作也可稱爲 P 或 down,它將值遞減 1;而信號操作也稱爲 V 或 up,它將值遞增 1。Semaphores 使用原子操作來實現對併發資源的控制。

這與 Go 中使用 WaitGroup 的常見情況一致:

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    semrelease1(addr, handoff, skipframes)
}

例如,讓我們來看看 semacquire1(等待、P、向下):

if cansemacquire(addr) {
    return
}

func cansemacquire(addr *uint32) bool {
    for {
        v := atomic.Load(addr)
        if v == 0 {
            return false
        }
        if atomic.Cas(addr, v, v-1) {
            return true
        }
    }
}
for {
    ...
    if cansemacquire(addr) {
        root.nwait.Add(-1)
        unlock(&root.lock)
        break
    }
    root.queue(addr, s, lifo)
    goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
    if s.ticket != 0 || cansemacquire(addr) {
        break
    }
    ...
}

參考資料

[1]

pkg-note-BUG: https://golang.org/pkg/sync/atomic/#pkg-note-BUG

[2]

waitgroup.go: https://cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go

[3]

waitgroup-1.20.go: https://cs.opensource.google/go/go/+/refs/tags/go1.20:src/sync/waitgroup.go

[4]

sync: use atomic.Uint64 for WaitGroup state: https://groups.google.com/g/golang-checkins/c/2jn_9xpsah8

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/zeEl6vvTVElpMGAqiCF3Uw