Golang 中如何正確使用條件變量 sync-Con

【導讀】sync 包中 Cond 在什麼場景下使用?它能解決什麼問題?本文對 sync.Cond 的使用做了詳細解讀。

Golang 的 sync 包中的 Cond 實現了一種條件變量,可以使用在多個 Reader 等待共享資源 ready 的場景(如果只有一讀一寫,一個鎖或者 channel 就搞定了)。

Cond 的匯合點:多個 goroutines 等待、1 個 goroutine 通知事件發生。

每個 Cond 都會關聯一個Lock(*sync.Mutex or *sync.RWMutex),當修改條件或者調用 Wait 方法時,必須加鎖,保護 condition

type Cond struct {
        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}

NewCond

func NewCond(l Locker) *Cond

新建一個 Cond 條件變量。

Broadcast

func (c *Cond) Broadcast()

Broadcast 會喚醒所有等待 c 的 goroutine。

調用 Broadcast 的時候,可以加鎖,也可以不加鎖。

Signal

func (c *Cond) Signal()

Signal 只喚醒 1 個等待 c 的 goroutine。

調用 Signal 的時候,可以加鎖,也可以不加鎖。

Wait

func (c *Cond) Wait()

Wait() 會自動釋放 c.L,並掛起調用者的 goroutine。之後恢復執行,Wait() 會在返回時對 c.L 加鎖。

除非被 Signal 或者 Broadcast 喚醒,否則 Wait() 不會返回。

由於 Wait() 第一次恢復時,C.L 並沒有加鎖,所以當 Wait 返回時,調用者通常並不能假設條件爲真。

取而代之的是, 調用者應該在循環中調用 Wait。(簡單來說,只要想使用 condition,就必須加鎖。)

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

舉個例子

下面這個例子,可以比較好的說明 Cond 的使用方法。

package main

import (
 "fmt"
 "sync"
 "time"
)

var sharedRsc = false

func main() {
 var wg sync.WaitGroup
 wg.Add(2)
 m := sync.Mutex{}
 c := sync.NewCond(&m)
 go func() {
  // this go routine wait for changes to the sharedRsc
  c.L.Lock()
  for sharedRsc == false {
   fmt.Println("goroutine1 wait")
   c.Wait()
  }
  fmt.Println("goroutine1", sharedRsc)
  c.L.Unlock()
  wg.Done()
 }()

 go func() {
  // this go routine wait for changes to the sharedRsc
  c.L.Lock()
  for sharedRsc == false {
   fmt.Println("goroutine2 wait")
   c.Wait()
  }
  fmt.Println("goroutine2", sharedRsc)
  c.L.Unlock()
  wg.Done()
 }()

 // this one writes changes to sharedRsc
 time.Sleep(2 * time.Second)
 c.L.Lock()
 fmt.Println("main goroutine ready")
 sharedRsc = true
 c.Broadcast()
 fmt.Println("main goroutine broadcast")
 c.L.Unlock()
 wg.Wait()
}

執行結果如下。

goroutine1 wait
goroutine2 wait
main goroutine ready
main goroutine broadcast
goroutine2 true
goroutine1 true

goroutine1 和 goroutine2 進入 Wait 狀態,在 main goroutine 在 2s 後資源滿足,發出 broadcast 信號後,從 Wait 中恢復並判斷條件是否確實已經滿足 (sharedRsc 不爲空),滿足則消費條件,並解鎖、wg.Done()

修改 1

我們做個修改,刪除main goroutine中的 2s 延時。

執行結果如下。

main goroutine ready
main goroutine broadcast
goroutine2 true
goroutine1 true

很有意思,兩個 goroutine 都沒有進入 Wait 狀態。

原因是,main goroutine執行的更快,在 goroutine1/goroutine2 加鎖之前就已經獲得了鎖,並完成了修改sharedRsc、發出Broadcast信號。

當子 goroutine 調用 Wait 之前檢驗 condition 時,條件已經滿足,因此就沒有必要再去調用 Wait 了。

修改 2

如果我們在子 goroutine 中不做校驗呢?

我們會得到 1 個死鎖。

main goroutine ready
main goroutine broadcast
goroutine2 wait
goroutine1 true
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x414028, 0x19)
 /usr/local/go/src/runtime/sema.go:56 +0x40
sync.(*WaitGroup).Wait(0x414020, 0x40c108)
 /usr/local/go/src/sync/waitgroup.go:130 +0x60
main.main()
 /tmp/sandbox947808816/prog.go:44 +0x2c0

goroutine 6 [sync.Cond.Wait]:
runtime.goparkunlock(...)
 /usr/local/go/src/runtime/proc.go:307
sync.runtime_notifyListWait(0x43e268, 0x0)
 /usr/local/go/src/runtime/sema.go:510 +0x120
sync.(*Cond).Wait(0x43e260, 0x40c108)
 /usr/local/go/src/sync/cond.go:56 +0xe0
main.main.func2(0x43e260, 0x414020)
 /tmp/sandbox947808816/prog.go:31 +0xc0
created by main.main
 /tmp/sandbox947808816/prog.go:27 +0x140

爲什麼呢?

main goroutine(goroutine 1)先執行,並停留在 wg.Wait()中,等待子 goroutine 的wg.Done();而子goroutine(goroutine 6)沒有判斷條件直接調用了 cond.Wait。

我們知道cond.Wait會釋放鎖並等待其他 goroutine 調用 Broadcast 或者 Signal 來通知其恢復執行,除此之外沒有其他的恢復途徑。但此時 main goroutine 已經調用了 Broadcast 並進入了等待狀態,沒有任何 goroutine 會去拯救還在cond.Wait中的子 goroutine 了,而該子 goroutine 也沒有機會調用wg.Done()去恢復 main goroutine,造成了死鎖。

因此,一定要注意,Broadcast 必須要在所有的 Wait 之後(當然了,可以通過條件判斷來決定要不要進 Wait)。

一個真實的例子

我們來看看 k8s 中使用 Cond 實現的 FIFO,它是如何處理條件的消費的。

func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
 f.lock.Lock()
 defer f.lock.Unlock()
 for {
  for len(f.queue) == 0 {
   // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
   // When Close() is called, the f.closed is set and the condition is broadcasted.
   // Which causes this loop to continue and return from the Pop().
   if f.IsClosed() {
    return nil, FIFOClosedError
   }

   f.cond.Wait()
  }
  id := f.queue[0]
    f.queue = f.queue[1:]
    ...
 }
}

func NewFIFO(keyFunc KeyFunc) *FIFO {
 f := &FIFO{
  items:   map[string]interface{}{},
  queue:   []string{},
  keyFunc: keyFunc,
 }
 f.cond.L = &f.lock
 return f
}

Cond 共用了 FIFO 的 lock,在 Pop 時,會先加鎖 f.lock.Lock(),而在f.cond.Wait()前,會先檢查len(f.queue)是否爲 0,防止 2 種情況:

如上面的例子 3,條件已經滿足,不需要 wait 喚醒時滿足,但被其他 goroutine 捷足先登,阻塞在 f.lock 的加鎖中;當獲得了鎖,加鎖成功以後,f.queue 已經被消費爲空,直接訪問f.queue[0]會訪問越界。

轉自:伊布

ieevee.com/tech/2019/06/15/cond.html

Go 開發大全

參與維護一個非常全面的 Go 開源技術資源庫。日常分享 Go, 雲原生、k8s、Docker 和微服務方面的技術文章和行業動態。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/7g1zC4dJrckZEHXYSua6Ug