條件變量 Cond 實現

Cond 是什麼

下面是 wikipedia 對條件變量的定義,大體是說條件變量總的來說是等待特定條件的線程的容器。

A condition variable is basically a container of threads that are waiting for a certain condition.

Cond 是 Go 標準庫 sync 包提供的條件變量原語,目的是爲等待通知場景下的併發問題提供解決方法。Cond 通常應用於等待某個條件的一個或一組 goroutine, 當等待條件變爲 true 時,其中一個或一組所有的 goroutine 都被喚醒執行。通俗來說,Cond 和某個條件相關,這個條件可以是一個表達式、一個 bool 變量或是一個函數調用,只要它們的結果是 bool 類型的值就行。一個或一組 goroutine 需要這個條件才能協同完成,在條件還沒有滿足的時候,所有等待該條件的 goroutine 都會被阻塞,當條件滿足的時候,等待的 goroutine 才能夠繼續運行。舉個例子,在奧運會 100 米短跑比賽中,將每個運動員看作一個個 goroutine,只有在發令槍響之後,運動員才能開始跑,這裏的發令槍響就是條件變量,只有槍響之後,也就是條件滿足之後,goroutine 才能運行,在槍響之前,運動員處於等待狀態。

Cond 使用場景

我們先通過一個例子來了解 Cond 解決的是什麼問題,該例子來至於文末的引用 1。下面的程序啓動了一個 goroutine,當 rec.data 有內容的時候,打印內容退出,沒有內容的時候進行空轉。main goroutine 休息 2 秒鐘後更新 rec 的值。編譯運行下面的程序,可以看到 CPU 使用率高達接近 100%。

type Record struct {
 sync.Mutex
 data string
}

func main() {
 var wg sync.WaitGroup

 rec := &Record{}
 wg.Add(1)
 go func(rec *Record) {
  defer wg.Done()
  for {
   rec.Lock()
   if rec.data != "" {
    fmt.Println("Data: ", rec.data)
    rec.Unlock()
    return
   }
   rec.Unlock()
  }
 }(rec)

 time.Sleep(2 * time.Second)
 rec.Lock()
 rec.data = "gopher"
 rec.Unlock()

 wg.Wait()
}
type Record struct {
 sync.Mutex
 data string

 cond *sync.Cond
}

func NewRecord() *Record {
 r := Record{}
 r.cond = sync.NewCond(&r)

 return &r
}

func main() {
 var wg sync.WaitGroup

 rec := NewRecord()
 wg.Add(1)
 go func(rec *Record) {
  defer wg.Done()

  rec.Lock()
  rec.cond.Wait()
  rec.Unlock()
  fmt.Println("Data: ", rec.data)
  return
 }(rec)

 time.Sleep(2 * time.Second)
 rec.Lock()
 rec.data = "gopher"
 rec.Unlock()
 rec.cond.Signal()

 wg.Wait()
}

上面的程序使用了 Cond 的 3 個接口,分別是構造函數 NewCond、等待函數 Wait, 通知函數 Signal. 啓動的 goroutine 會阻塞等待在 rec.cond.Wait() 這裏,直到有人發信號給他,它纔會繼續運行。main goroutine 休眠 2 秒後,更新 rec.data 的值,然後調用 rec.cond.Signal 發送信號,收到信號後,啓動的 goroutine 繼續運行,最後打印 data 的內容並退出。通過信號機制,goroutine 在條件不滿足時休眠,滿足時被喚醒繼續執行,非常完美。

Cond 實現原理

下面分析的源碼是 Go1.14 版本,Cond 實現在 sync 包下的 cond.go 文件中,代碼加註釋不到 100 行,非常簡單,關鍵的邏輯調用了運行時中的信號量代碼,本文只分析與 Cond 相關的代碼,詳細信號量代碼源碼分析準備專門寫一篇文章。

結構體定義

Cond 結構定義如下,核心字段是 L 和 notify。noCopy 和 checker 是輔助字段,用於檢查 Cond 對象是否被複制使用了,因爲 Cond 同 Mutex 一樣,也是不能被複制的。L 是一個接口,定義的有兩個方法 Lock 和 Unlock, 一般將 Mutex 或 RWMutex 對象賦值給 L,因爲它們都實現了 Locker 的方法。notify 是一個等待隊列,調用用 Wait 方法後,goroutine 會掛起等待在 notify 上。

等待隊列類型爲 notifyList,它裏面的 5 個字段可以分爲 3 部分理解,lock 是加鎖用的。wait 和 notify 都是一個計數器,它們的初始值都爲 0,每次調用 Wait 操作,wait 的值都會增加 1.wait 的值可以理解爲調用 Wait 操作程序所在的 goroutine 的編號,notify 值表示小於它的阻塞的 goroutine 已經喚醒處理過,調用 Signal 或者 Broadcast 時喚醒阻塞在 [notify,wait) 範圍編號上的 goroutine。head 和 tail 是一個單鏈表的頭尾指針節點。

通俗理解,notifyList 爲一個隊列,它裏面存儲是 goroutine。wait 和 notify 分別表示生產者和消費者的位置。這個隊列是一個單鏈表,裏面的 goroutine 按照 wait 值從小到大排列。

type Cond struct {
 noCopy noCopy

 // 當觀察或修改等待條件的時候需要加鎖
 L Locker
 // 等待隊列
 notify  notifyList
 checker copyChecker
}

type Locker interface {
 Lock()
 Unlock()
}

type notifyList struct {
 // wait是一個計數器,它的值爲調用Wait的goroutine的編號
 wait uint32
 // notify也是一個計數器,它的值表示調用Signal或者Broadcast時喚醒阻塞在[notify,wait)
 // 範圍編號上的goroutine
 notify uint32
 lock   uintptr
 // head是一個指針,指向sudog單鏈表的頭節點,阻塞g的隊列中第一個g的位置
 head unsafe.Pointer
 // tail也是一個指針,指向sudog單鏈表的尾節點,阻塞g的隊列中最後一個g的位置
 tail unsafe.Pointer
}
Wait 方法

Wait 方法的核心功能就是將當前的 goroutine 掛起,等待 Signal 或者 Broadcast 喚醒。需要注意,Wait 方法中會先進行釋放鎖操作,後面又會執行加鎖操作。這意味用戶程序在調用 Wait 操作之前必須加鎖,Wait 操作完成之後需要釋放鎖,否則會存在釋放未加鎖的鎖,引發 panic 等問題。

現在來分析從用戶調用 Wait 操作之前加鎖到這裏 c.L.Unlock 過程中,鎖在保護哪些內容。下面 t 肯定是受保護了,確保了每個 ticket 與關聯的 goroutine 的唯一性。還有就是在這個過程中如果有併發操作的對象也是受保護的。

Wait 方法中調用的 runtime_notifyListAdd 和 runtime_notifyListWait 函數是在 runtime 包中的 sema.go 文件實現的。執行 runtime_notifyListWait 操作將當前的 goroutine 掛起阻塞等待在 notify 隊列上,收到喚醒信號之後會恢復運行。最後執行了 c.L.Lock(),與我們在用戶程序調用 Wait 之後的釋放鎖操作配對。

func (c *Cond) Wait() {
 // 檢查Cond對象是否被複制過
 c.checker.check()
 // wait自增1
 t := runtime_notifyListAdd(&c.notify)
 // 釋放鎖,所以在調用Wait前,必須進行加鎖
 c.L.Unlock()
 // 將當前的goroutine掛起阻塞等待在notify隊列上,收到喚醒信號之後
 // 恢復運行
 runtime_notifyListWait(&c.notify, t)
 // 加鎖,在執行Wait之後的代碼前,Cond又被加鎖了
 c.L.Lock()
}

notifyListAdd 原子性的將等待隊列中的 wait 值加 1

func notifyListAdd(l *notifyList) uint32 {
 // 將wait的值原子性操作自增1,wait的初始值爲0
 return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait 會創建一個 sudog 對象 s,並設置 s 的 ticket 值,將它和當前的 goroutine 關聯起來。然後加入到隊尾。最後調用 gopark 將當前的 goroutine 掛起。

func notifyListWait(l *notifyList, t uint32) {
 lock(&l.lock)

 // 小於notify的值的對應編號阻塞的goroutine之前已經喚醒過了,直接返回
 if less(t, l.notify) {
  unlock(&l.lock)
  return
 }

 // 獲取一個sudog對象s
 s := acquireSudog()
 // 設置s中的g爲當前的goroutine
 s.g = getg()
 // 設置ticket值爲傳入的t,可以理解爲ticket與當前阻塞的goroutine(s.g)對應
 s.ticket = t
 s.releasetime = 0
 t0 := int64(0)
 if blockprofilerate > 0 {
  t0 = cputicks()
  s.releasetime = -1
 }
 // 將新創建的sudog對象s加入到隊列的尾部,這個過程是在lock加鎖的條件下進行的
 // 不用擔心併發將s加入到l.tail衝突問題
 if l.tail == nil {
  l.head = s
 } else {
  l.tail.next = s
 }
 l.tail = s
 // 調用gopark阻塞當前的goroutine運行
 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
 if t0 != 0 {
  blockevent(s.releasetime-t0, 2)
 }
 releaseSudog(s)
}
Signal 方法

Signal 方法將喚醒等待隊列中隊頭的 goroutine,真正的實現在 notifyListNotifyOne 函數,此函數實現也在 runtime 包中的 sema.go 文件。

func (c *Cond) Signal() {
 c.checker.check()
 // 喚醒notifyList隊列中隊頭的goroutine
 runtime_notifyListNotifyOne(&c.notify)
}

notifyListNotifyOne 函數找到隊頭中 ticket 爲 l.notify 的對象,並將該對象關聯的 goroutine 喚醒恢復運行。

func notifyListNotifyOne(l *notifyList) {
 // 如果wait和notify值相等,說明沒有阻塞等待的goroutine,也就沒有要喚醒的g了,這裏直接返回
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }
 // 加鎖執行下面操作
 lock(&l.lock)

 // 加鎖後再次檢查wait的值跟notify是否相等,如果相等同上直接釋放鎖返回
 t := l.notify
 if t == atomic.Load(&l.wait) {
  unlock(&l.lock)
  return
 }

 // notify加1,相當於消費者消費一個數據(g),下面會將隊列頭的goroutine喚醒
 atomic.Store(&l.notify, t+1)
 // 執行循環操作,從隊列中找出ticket等於notify(l.notify-1,因爲此時l.notify已加1)的sudog對象
 // 從sudog對象中獲取到綁定的g,然後執行readyWithTime,readyWithTime會調用goread將g喚醒
 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
  if s.ticket == t {
   n := s.next
   if p != nil {
    p.next = n
   } else {
    l.head = n
   }
   if n == nil {
    l.tail = p
   }
   unlock(&l.lock)
   s.next = nil
   readyWithTime(s, 4)
   return
  }
 }
 // 釋放鎖
 unlock(&l.lock)
}
Broadcast 方法

Broadcast 方法會喚醒隊列中所有的 goroutine.

func (c *Cond) Broadcast() {
 c.checker.check()
 // 喚醒notifyList隊列中所有的goroutine
 runtime_notifyListNotifyAll(&c.notify)
}

notifyListNotifyAll 函數也在 sema.go 文件,將等待隊列中所有的 goroutine 執行 goready 進行喚醒。在實現的時候,通過拷貝的方法將當前鏈表拷貝到臨時變量 s 中,達到了快速釋放鎖。這裏鎖的粒度比 Signal 還要小,處理的非常優雅。

func notifyListNotifyAll(l *notifyList) {
 // 如果wait和notify值相等,說明沒有阻塞等待的goroutine,也就沒有要喚醒的g了,這裏直接返回
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }

 // 加鎖,將當前鏈表拷貝到臨時變量s中,然後將原鏈表釋放
 // 之後就可以解鎖了。通過是拷貝方式達到快速解鎖,這裏比
 // 鎖的粒度比Signal還要小。
 lock(&l.lock)
 s := l.head
 l.head = nil
 l.tail = nil

 // 原子將wait的值賦值給notify,表示[notify,wait)範圍內阻塞的goroutine都將被喚醒了
 atomic.Store(&l.notify, atomic.Load(&l.wait))
 unlock(&l.lock)

 // 遍歷鏈表中每一個sudog對象,將綁定在sudog對象上的goroutine喚醒
 for s != nil {
  next := s.next
  s.next = nil
  // readyWithTime會調用goready將goroutine喚醒
  readyWithTime(s, 4)
  s = next
 }
}

Cond 使用注意事項

我們先來看 stackoverflow 網站討論 Cond 錯誤使用的例子。分析錯誤的原因,總結經驗。

下面的程序會出現:fatal error: all goroutines are asleep - deadlock! 爲什麼會這樣呢?程序中有 main goroutine 和一個 main 函數中啓動的 goroutine. 執行 go func 之後會啓動的 goroutine 並不一定執行而是先放入可以運行隊列中,本例中會放入 p.runnext 中。然後 main goroutine 繼續執行,執行完 m.Lock 後,睡眠了,這時會切換執行 go func 程序,睡眠一秒。然後會執行 c.Broadcast,因爲 main goroutine 睡眠的是 2 秒時間還沒到。好傢伙,執行 Broadcast 時 wati 和 notify 是相等的,直接退出了,次 goroutine 也就退出了。2 秒後 main goroutine 執行 c.Wait,將 main goroutine 掛起,此時沒有可運行的 goroutine 了,所以打印上面的 deadlock.

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}
  1. sync.Cond 不能複製使用 sync.Cond 結構中有一個 notifyList 隊列,如果複製 Cond,相當於複製了 notifyList 值,在併發場景下不同 goroutine 操作的並不是同一個 notifyList,會出現與預期不一致的效果,例如可能出現有些 goroutine 一直阻塞。

  2. 調用 Wait 操作前需加鎖,調用完之後釋放鎖 Wait 內部先釋放了鎖然後又加鎖。所以在調用 Wait 之前必須加鎖,調用完之後釋放鎖,否則會出現加鎖和釋放鎖不能配對問題。

  3. Wait 通常放在在 for 循環內部調用,例如採用如下模式,因爲 waiter goroutine 被喚醒不等於等待條件被滿足,所以喚醒之後需要進一步檢查等待條件。

  c.L.Lock()
   for !condition() {
       c.Wait()
   }
   ... make use of condition ...
   c.L.Unlock()

Understanding Condition Variable in Go[1]How to correctly use sync.Cond[2]

[1]

Understanding Condition Variable in Go: https://kaviraj.me/understanding-condition-variable-in-go/

[2]

How to correctly use sync.Cond: https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/5oIfb4oyKRudVYxgaUJwVQ