Golang 之 WaitGroup 源碼解析

如果我們有一個大的任務要做,我們會嘗試將這個任務分解,分解完成之後併發交由 goroutine 去做,並且我需要當全部的任務完成之後再進行下面的步驟,在 sync 包下,就有這樣一個東西適合上述情況,WaitGroup,今天我們來看看具體它是怎麼實現的。

PS:在下面我統一用 wg 來簡稱 WaitGroup

使用

它的使用非常簡單,如下:

func main () {
    wg := sync.WaitGroup {}
    for i := 0; i < 10; i++ {
        wg.Add (1)
        go func (job int) {
            defer wg.Done ()
            //do something
            fmt.Printf ("job % d done\n", job)
        }(i)
    }
    wg.Wait ()
    fmt.Println ("all done")
}

輸出:

job 9 done
job 1 done
job 0 done
job 8 done
job 7 done
job 3 done
job 6 done
job 2 done
job 4 done
job 5 done
all done

我們可以看到,使用非常簡單,每次有一個任務就使用 Add 方法加一個,每次做完任務就使用 Done 方法告訴它已經完成了,而 Wait 就是等着所有的任務完成。

思考問題

在看 wg 的實現之前,首先來問幾個問題,來考考自己。

  1. Wait 方法能否被多次調用,比如再開一個 goroutine 去 wait

  2. Wait 方法調用後是否還能再繼續調用 Add 添加任務

  3. 每次只能 Done 一個任務,能否一次性 Done 多個任務呢

  4. wg 能否被拷貝或作爲參數傳遞

  5. 如果讓你自己實現一個,你會如何實現

前幾個問題,如果你都能很清楚的回答,那麼你對 wg 的瞭解可以說已經非常熟悉了。首選我來說一下對於最後的一個問題的回答,因爲在看源碼之前我都會想想如果是我,我會如何去實現,那麼我想的也很簡單。

源碼分析

結構

type WaitGroup struct {
 noCopy noCopy
 // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
 // 64-bit atomic operations require 64-bit alignment, but 32-bit
 //compilers do not ensure it. So we allocate 12 bytes and then use
 //the aligned 8 bytes in them as state, and the other 4 as storage
 //for the sema.
 state1 [3] uint32
}

結構非常簡單,就只有兩個熟悉,一個 noCopy 還有一個 state1(我也很好奇爲什麼要用 1 來結尾命名,大佬的想法總是很奇妙)

noCopy:sync 包下的一個特殊標記吧,vet 檢查,如果有拷貝的變量則會報錯

func main () {
    wg := sync.WaitGroup {}
    w := wg
    fmt.Println (w, wg)
}

你 run 肯定沒問題的,但是如果你使用 go vet 做個檢查就有警告了

➜  go vet main.go
# command-line-arguments
./main.go:10:10: assignment copies lock value to w: sync.WaitGroup contains sync.noCopy
./main.go:11:17: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy
./main.go:11:20: call of fmt.Println copies lock value: sync.WaitGroup contains sync.noCopy

state1:是用來存放任務計數器和等待者計數器的(我一看到這個結構就明白肯定後面又是位操作這樣的高端操作了)

其中 waiter 是等待者計數,counter 是任務計數,sema 是信號量

奇怪的是在 64 位還 32 位操作系統上是不一樣的,具體原因以及對於它操作請繼續看下去

state
//state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state () (statep *uint64, semap *uint32) {
 if uintptr (unsafe.Pointer (&wg.state1))%8 == 0 {
  return (*uint64)(unsafe.Pointer (&wg.state1))&wg.state1 [2]
 } else {
  return (*uint64)(unsafe.Pointer (&wg.state1 [1]))&wg.state1 [0]
 }
}

這個方法是一個內部方法,就是將 state1 中存儲的狀態取出來,返回值 statep 就是計數器的狀態,semap 是信號量

Done

func (wg *WaitGroup) Done () { wg.Add (-1) } 沒想到吧~居然 Done 就是調用 Add 並傳遞一個 - 1

所以其實我們完全可以再外部調用 Add 傳遞一個 - 3 一次性結束 3 個任務

Add

func (wg *WaitGroup) Add (delta int) {
 // 首先獲取狀態值
 statep, semap := wg.state ()
 // 對於 statep 中 counter + delta
 state := atomic.AddUint64 (statep, uint64 (delta)<<32)
 // 獲取任務計數器的值
 v := int32 (state >> 32)
 // 獲取等待者計數器的值
 w := uint32 (state)
 
 // 任務計數器不能爲負數
 if v < 0 {
  panic ("sync: negative WaitGroup counter")
 }
 // 已經有人在等待,但是還在添加任務
 if w != 0 && delta > 0 && v == int32 (delta) {
  panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
 }
 // 沒有等待者或者任務還有沒做完的
 if v > 0 || w == 0 {
  return
 }
 // 有等待者,但是在這個過程中數據還在變動
 if *statep != state {
  panic ("sync: WaitGroup misuse: Add called concurrently with Wait")
 }

 // Reset waiters count to 0.
 // 重置狀態,並用發出等同於等待者數量的信號量,告訴所有等待者任務已經完成
 *statep = 0
 for ; w != 0; w-- {
  runtime_Semrelease (semap, false, 0)
 }
}

這裏有幾個要點我們其實已經看到了:

Wait 的 過程中 是不能 Add 的,不然就會 panic,要注意 雖然我們可以藉助 Add 一個負數來一次性結束多個任務,但是如果任務數量控制的不好,變成負數也會 panic,Done 次數多了也一樣 wg 是通過信號量來通知的,當然可以有很多人在等,wg 它都會一一通知到位的

Wait

func (wg *WaitGroup) Wait () {
 // 先獲取狀態
 statep, semap := wg.state ()
  
 for {
  // 這裏注意要用 atomic 的 Load 來保證一下寫操作已經完成
  state := atomic.LoadUint64 (statep)
  // 同樣的,這裏是任務計數
  v := int32 (state >> 32)
  // 這裏是等待者計數
  w := uint32 (state)
  // 如果沒有任務,那麼直接結束,不用等待了
  if v == 0 {
   return
  }
  // 使用 cas 操作,如果不相等,證明中間已經被其他人修改了狀態,重新走 for 循環
  // 注意這裏 if 進去之後等待者的數量就 +1 了
  if atomic.CompareAndSwapUint64 (statep, state, state+1) {
   // 等待信號量
   runtime_Semacquire (semap)
   // 如果信號量來了,但是狀態還不是 0,則證明 wait 之後還是在人在 add,證明有人想充分利用 wg 但是時機不對
   if *statep != 0 {
    panic ("sync: WaitGroup is reused before previous Wait has returned")
   }
   return
  }
 }
}

其實 wait 雖然簡單,也有要點

通過 load 和 cas 操作 + 循環來避免了鎖,其實這個操作可以學一下 其實這裏也說明明白了,wg 可以重用,但是你必須等到 wait 全部完成之後再說 其他注意點

func main () {
    wg := sync.WaitGroup {}
    for i := 0; i < 10; i++ {
        wg.Add (1)
        go func (job int) {
            doJob (job, wg)
        }(i)
    }
    wg.Wait ()
    fmt.Println ("all done")
}

func doJob (job int, wg sync.WaitGroup) {
    fmt.Printf ("job % d done\n", job)
    wg.Done ()
}

上面的代碼有問題嗎?問題在哪呢?

其實很簡單,wg 作爲一個參數傳遞的時候,wg 還是一個普通的結構體,我們在函數中操作的時候還是操作的一個拷貝的變量而已,對於原來的 wg 是不會改變的,所以這裏需要傳遞指針纔是正確的

func main () {
    wg := &sync.WaitGroup {}
        for i := 0; i < 10; i++ {
            wg.Add (1)
            go func (job int) {
                doJob (job, wg)
            }(i)
    }
    wg.Wait ()
    fmt.Println ("all done")
}

func doJob (job int, wg *sync.WaitGroup) {
    fmt.Printf ("job % d done\n", job)
    wg.Done ()
}

但是其實並不推薦這樣去傳遞 wg,因爲這樣很容易出現問題,一個不好就出問題了,個人還是建議直接在使用 goroutine 之後馬上接一個 defer wg.Done () 來的更加靠譜一些

總結

回過頭來看看,之前的問題也都有了答案:

  1. Wait 可以被調用多次,並且每個都會收到完成的通知

  2. Wait 之後,如果再 Wait 的過程中不能在 Add,否則會 panic,但是 Wait 結束之後可以繼續使用 Add 進行重用

  3. 可以使用 Add 傳遞負數的方式一次性結束多個任務,但是需要保證任務計數器非負,否則會 panic

  4. wg 作爲參數傳遞的時候需要注意傳遞指針,或者儘量避免傳遞

  5. 官方利用位操作節約了空間,存在在同一個地方;利用信號量來實現任務結束的通知….

總的來說 wg 的實現還是非常簡單的,需要注意的就是幾個使用上的點不要出現意外即可。

轉自:LinkinStar

linkinstar.wiki/2020/03/15/golang/source-code/sync-waitgroup-source-code

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/B7DOM73svpkNNhwLLDOPeQ