Go WaitGroup 詳解

WaitGroup 概述

WaitGroup在 go 語言中,用於線程同步,單從字面意思理解,wait等待的意思,group組、團隊的意思,WaitGroup就是指等待一組,等待一個系列執行完成後纔會繼續向下執行。

WatiGroupsync包中的一個struct類型,用來收集需要等待執行完成的goroutine。下面是它的定義:

// WaitGroup用於等待一組線程的結束。 父線程調用Add方法來設定應等待的線程的數量。 
// 每個被等待的線程在結束時應調用Done方法。同時,主線程裏可以調用Wait方法阻塞至所有線程結束。 
type WaitGroup struct { 
// 包含隱藏或非導出字段 
} 
// Add方法向內部計數加上delta,delta可以是負數; 
// 如果內部計數器變爲0,Wait方法阻塞等待的所有線程都會釋放,如果計數器小於0,返回panic。 
// 注意Add加上正數的調用應在Wait之前,否則Wait可能只會等待很少的線程。 
// 一般來說本方法應在創建新的線程或者其他應等待的事件之前調用。 
func (wg *WaitGroup) Add(delta int) 

// Done方法減少WaitGroup計數器的值,應在線程的最後執行。 
func (wg *WaitGroup) Done() 

// Wait方法阻塞直到WaitGroup計數器減爲0。 
func (wg *WaitGroup) Wait()

sync.WaitGroup3 個方法

例如 Add(2) 或者兩次調用 Add(1) 都會設置等待計數器的值爲 2,表示要等待 2 個goroutine完成

綜上所述:

Add() 用來增加要等待的goroutine的數量

Done() 用來表示goroutine已經完成了,減少一次計數器

Wait() 用來等待所有需要等待的goroutine完成。

使用示例

一個常見的使用場景是:批量發出 RPC 或者 HTTP 請求:

requests := []*Request{...}
wg := sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

我們可以通過 sync.WaitGroup 將原本順序執行的代碼在多個 Goroutine 中併發,如下圖:

結構體

sync.WaitGroup 結構體中只包含兩個成員:

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

sync.noCopy 是一個特殊的私有結構體,源碼包中的分析器會在編譯期間檢查被複制的變量中是否含有 sync.noCopy 或者實現了 LockUnlock 方法,如果包含有該結構體或者實現了對應的方法,就會拋出錯誤:

func main(){
    wg := sync.WaitGorup{}
    yawg := wg
    fmt.Println(wg, yawg)
}tGorp

$ go vet proc.go
./proc.go:10:10: assignment copies lock value to yawg: sync.WaitGroup

除了 sync.noCopysync.WaitGroup 結構體中還包含一個總共佔用 12 字節的數組,該數組會存儲當前結構體的狀態,在 64 位與 32 位機器中表現也不同,如下圖:

sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能夠幫我們從 state1 字段中取出對應的狀態和信號量。

接口

sync.WaitGroup 對外總共暴露 3 個方法:

sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法中傳入了 -1,因此咱們主要分析另外兩個方法:sync.WaitGroup.Addsync.WaitGroup.Wait

func (wg *sync.WaitGroup) Add (delta int){
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if V > 0 || w == 0 {
        return
    }
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的計數器 counter

雖然 sync.WaitGroup.Add 方法傳入的參數可以爲_負數_,但是計數器只能是_非負數_,一旦出現負數程序就會崩潰。

當調用的計數器歸零,即所有任務都執行完成時,纔會通過 runtime_Semrelease 喚醒處於等待狀態的 Goroutine

sync.WaitGroup 的另一個方法 sync.WaitGroup.Wait 會在計數器大於 0 並且不存在等待的 Goroutine 時,調用 runtime.sync_runtime_Semacquire 陷入睡眠狀態:

func (wg *sync.WaitGroup) Wait () {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            return
        }
        if atomic.CompareAndSwapUint64(statep, state, state + 1) {
            runtime_Semacquire(semap)
            if +statep != 0 {
                panic("sync: WaitGroup is resused before previous Wait has returned")
            }
            return
        }
    }
}

sync.WaitGroup 的計數器歸零時,陷入睡眠狀態的 Goroutine 會被喚醒,sync.WaitGroup.Wait 方法也會立刻放回。

小結

通過 sync.WaitGroup 的分析和研究,可以得出以下結論:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/hKfZnVlsGXBtuA-bt1bCLQ