條件變量 Cond 實現
Cond 是什麼
下面是 wikipedia 對條件變量的定義,大體是說條件變量總的來說是等待特定條件的線程的容器。
A condition variable is basically a container of threads that are waiting for a certain condition.
Cond 是 Go 標準庫 sync 包提供的條件變量原語,目的是爲等待通知場景下的併發問題提供解決方法。Cond 通常應用於等待某個條件的一個或一組 goroutine, 當等待條件變爲 true 時,其中一個或一組所有的 goroutine 都被喚醒執行。通俗來說,Cond 和某個條件相關,這個條件可以是一個表達式、一個 bool 變量或是一個函數調用,只要它們的結果是 bool 類型的值就行。一個或一組 goroutine 需要這個條件才能協同完成,在條件還沒有滿足的時候,所有等待該條件的 goroutine 都會被阻塞,當條件滿足的時候,等待的 goroutine 才能夠繼續運行。舉個例子,在奧運會 100 米短跑比賽中,將每個運動員看作一個個 goroutine,只有在發令槍響之後,運動員才能開始跑,這裏的發令槍響就是條件變量,只有槍響之後,也就是條件滿足之後,goroutine 才能運行,在槍響之前,運動員處於等待狀態。
Cond 使用場景
我們先通過一個例子來了解 Cond 解決的是什麼問題,該例子來至於文末的引用 1。下面的程序啓動了一個 goroutine,當 rec.data 有內容的時候,打印內容退出,沒有內容的時候進行空轉。main goroutine 休息 2 秒鐘後更新 rec 的值。編譯運行下面的程序,可以看到 CPU 使用率高達接近 100%。
type Record struct {
sync.Mutex
data string
}
func main() {
var wg sync.WaitGroup
rec := &Record{}
wg.Add(1)
go func(rec *Record) {
defer wg.Done()
for {
rec.Lock()
if rec.data != "" {
fmt.Println("Data: ", rec.data)
rec.Unlock()
return
}
rec.Unlock()
}
}(rec)
time.Sleep(2 * time.Second)
rec.Lock()
rec.data = "gopher"
rec.Unlock()
wg.Wait()
}
type Record struct {
sync.Mutex
data string
cond *sync.Cond
}
func NewRecord() *Record {
r := Record{}
r.cond = sync.NewCond(&r)
return &r
}
func main() {
var wg sync.WaitGroup
rec := NewRecord()
wg.Add(1)
go func(rec *Record) {
defer wg.Done()
rec.Lock()
rec.cond.Wait()
rec.Unlock()
fmt.Println("Data: ", rec.data)
return
}(rec)
time.Sleep(2 * time.Second)
rec.Lock()
rec.data = "gopher"
rec.Unlock()
rec.cond.Signal()
wg.Wait()
}
上面的程序使用了 Cond 的 3 個接口,分別是構造函數 NewCond、等待函數 Wait, 通知函數 Signal. 啓動的 goroutine 會阻塞等待在 rec.cond.Wait() 這裏,直到有人發信號給他,它纔會繼續運行。main goroutine 休眠 2 秒後,更新 rec.data 的值,然後調用 rec.cond.Signal 發送信號,收到信號後,啓動的 goroutine 繼續運行,最後打印 data 的內容並退出。通過信號機制,goroutine 在條件不滿足時休眠,滿足時被喚醒繼續執行,非常完美。
Cond 實現原理
下面分析的源碼是 Go1.14 版本,Cond 實現在 sync 包下的 cond.go 文件中,代碼加註釋不到 100 行,非常簡單,關鍵的邏輯調用了運行時中的信號量代碼,本文只分析與 Cond 相關的代碼,詳細信號量代碼源碼分析準備專門寫一篇文章。
結構體定義
Cond 結構定義如下,核心字段是 L 和 notify。noCopy 和 checker 是輔助字段,用於檢查 Cond 對象是否被複制使用了,因爲 Cond 同 Mutex 一樣,也是不能被複制的。L 是一個接口,定義的有兩個方法 Lock 和 Unlock, 一般將 Mutex 或 RWMutex 對象賦值給 L,因爲它們都實現了 Locker 的方法。notify 是一個等待隊列,調用用 Wait 方法後,goroutine 會掛起等待在 notify 上。
等待隊列類型爲 notifyList,它裏面的 5 個字段可以分爲 3 部分理解,lock 是加鎖用的。wait 和 notify 都是一個計數器,它們的初始值都爲 0,每次調用 Wait 操作,wait 的值都會增加 1.wait 的值可以理解爲調用 Wait 操作程序所在的 goroutine 的編號,notify 值表示小於它的阻塞的 goroutine 已經喚醒處理過,調用 Signal 或者 Broadcast 時喚醒阻塞在 [notify,wait) 範圍編號上的 goroutine。head 和 tail 是一個單鏈表的頭尾指針節點。
通俗理解,notifyList 爲一個隊列,它裏面存儲是 goroutine。wait 和 notify 分別表示生產者和消費者的位置。這個隊列是一個單鏈表,裏面的 goroutine 按照 wait 值從小到大排列。
type Cond struct {
noCopy noCopy
// 當觀察或修改等待條件的時候需要加鎖
L Locker
// 等待隊列
notify notifyList
checker copyChecker
}
type Locker interface {
Lock()
Unlock()
}
type notifyList struct {
// wait是一個計數器,它的值爲調用Wait的goroutine的編號
wait uint32
// notify也是一個計數器,它的值表示調用Signal或者Broadcast時喚醒阻塞在[notify,wait)
// 範圍編號上的goroutine
notify uint32
lock uintptr
// head是一個指針,指向sudog單鏈表的頭節點,阻塞g的隊列中第一個g的位置
head unsafe.Pointer
// tail也是一個指針,指向sudog單鏈表的尾節點,阻塞g的隊列中最後一個g的位置
tail unsafe.Pointer
}
Wait 方法
Wait 方法的核心功能就是將當前的 goroutine 掛起,等待 Signal 或者 Broadcast 喚醒。需要注意,Wait 方法中會先進行釋放鎖操作,後面又會執行加鎖操作。這意味用戶程序在調用 Wait 操作之前必須加鎖,Wait 操作完成之後需要釋放鎖,否則會存在釋放未加鎖的鎖,引發 panic 等問題。
現在來分析從用戶調用 Wait 操作之前加鎖到這裏 c.L.Unlock 過程中,鎖在保護哪些內容。下面 t 肯定是受保護了,確保了每個 ticket 與關聯的 goroutine 的唯一性。還有就是在這個過程中如果有併發操作的對象也是受保護的。
Wait 方法中調用的 runtime_notifyListAdd 和 runtime_notifyListWait 函數是在 runtime 包中的 sema.go 文件實現的。執行 runtime_notifyListWait 操作將當前的 goroutine 掛起阻塞等待在 notify 隊列上,收到喚醒信號之後會恢復運行。最後執行了 c.L.Lock(),與我們在用戶程序調用 Wait 之後的釋放鎖操作配對。
func (c *Cond) Wait() {
// 檢查Cond對象是否被複制過
c.checker.check()
// wait自增1
t := runtime_notifyListAdd(&c.notify)
// 釋放鎖,所以在調用Wait前,必須進行加鎖
c.L.Unlock()
// 將當前的goroutine掛起阻塞等待在notify隊列上,收到喚醒信號之後
// 恢復運行
runtime_notifyListWait(&c.notify, t)
// 加鎖,在執行Wait之後的代碼前,Cond又被加鎖了
c.L.Lock()
}
notifyListAdd 原子性的將等待隊列中的 wait 值加 1
func notifyListAdd(l *notifyList) uint32 {
// 將wait的值原子性操作自增1,wait的初始值爲0
return atomic.Xadd(&l.wait, 1) - 1
}
notifyListWait 會創建一個 sudog 對象 s,並設置 s 的 ticket 值,將它和當前的 goroutine 關聯起來。然後加入到隊尾。最後調用 gopark 將當前的 goroutine 掛起。
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
// 小於notify的值的對應編號阻塞的goroutine之前已經喚醒過了,直接返回
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 獲取一個sudog對象s
s := acquireSudog()
// 設置s中的g爲當前的goroutine
s.g = getg()
// 設置ticket值爲傳入的t,可以理解爲ticket與當前阻塞的goroutine(s.g)對應
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
// 將新創建的sudog對象s加入到隊列的尾部,這個過程是在lock加鎖的條件下進行的
// 不用擔心併發將s加入到l.tail衝突問題
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 調用gopark阻塞當前的goroutine運行
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
Signal 方法
Signal 方法將喚醒等待隊列中隊頭的 goroutine,真正的實現在 notifyListNotifyOne 函數,此函數實現也在 runtime 包中的 sema.go 文件。
func (c *Cond) Signal() {
c.checker.check()
// 喚醒notifyList隊列中隊頭的goroutine
runtime_notifyListNotifyOne(&c.notify)
}
notifyListNotifyOne 函數找到隊頭中 ticket 爲 l.notify 的對象,並將該對象關聯的 goroutine 喚醒恢復運行。
func notifyListNotifyOne(l *notifyList) {
// 如果wait和notify值相等,說明沒有阻塞等待的goroutine,也就沒有要喚醒的g了,這裏直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 加鎖執行下面操作
lock(&l.lock)
// 加鎖後再次檢查wait的值跟notify是否相等,如果相等同上直接釋放鎖返回
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// notify加1,相當於消費者消費一個數據(g),下面會將隊列頭的goroutine喚醒
atomic.Store(&l.notify, t+1)
// 執行循環操作,從隊列中找出ticket等於notify(l.notify-1,因爲此時l.notify已加1)的sudog對象
// 從sudog對象中獲取到綁定的g,然後執行readyWithTime,readyWithTime會調用goread將g喚醒
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
// 釋放鎖
unlock(&l.lock)
}
Broadcast 方法
Broadcast 方法會喚醒隊列中所有的 goroutine.
func (c *Cond) Broadcast() {
c.checker.check()
// 喚醒notifyList隊列中所有的goroutine
runtime_notifyListNotifyAll(&c.notify)
}
notifyListNotifyAll 函數也在 sema.go 文件,將等待隊列中所有的 goroutine 執行 goready 進行喚醒。在實現的時候,通過拷貝的方法將當前鏈表拷貝到臨時變量 s 中,達到了快速釋放鎖。這裏鎖的粒度比 Signal 還要小,處理的非常優雅。
func notifyListNotifyAll(l *notifyList) {
// 如果wait和notify值相等,說明沒有阻塞等待的goroutine,也就沒有要喚醒的g了,這裏直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 加鎖,將當前鏈表拷貝到臨時變量s中,然後將原鏈表釋放
// 之後就可以解鎖了。通過是拷貝方式達到快速解鎖,這裏比
// 鎖的粒度比Signal還要小。
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// 原子將wait的值賦值給notify,表示[notify,wait)範圍內阻塞的goroutine都將被喚醒了
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// 遍歷鏈表中每一個sudog對象,將綁定在sudog對象上的goroutine喚醒
for s != nil {
next := s.next
s.next = nil
// readyWithTime會調用goready將goroutine喚醒
readyWithTime(s, 4)
s = next
}
}
Cond 使用注意事項
我們先來看 stackoverflow 網站討論 Cond 錯誤使用的例子。分析錯誤的原因,總結經驗。
下面的程序會出現:fatal error: all goroutines are asleep - deadlock! 爲什麼會這樣呢?程序中有 main goroutine 和一個 main 函數中啓動的 goroutine. 執行 go func 之後會啓動的 goroutine 並不一定執行而是先放入可以運行隊列中,本例中會放入 p.runnext 中。然後 main goroutine 繼續執行,執行完 m.Lock 後,睡眠了,這時會切換執行 go func 程序,睡眠一秒。然後會執行 c.Broadcast,因爲 main goroutine 睡眠的是 2 秒時間還沒到。好傢伙,執行 Broadcast 時 wati 和 notify 是相等的,直接退出了,次 goroutine 也就退出了。2 秒後 main goroutine 執行 c.Wait,將 main goroutine 掛起,此時沒有可運行的 goroutine 了,所以打印上面的 deadlock.
package main
import (
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
time.Sleep(1 * time.Second)
c.Broadcast()
}()
m.Lock()
time.Sleep(2 * time.Second)
c.Wait()
}
-
sync.Cond 不能複製使用 sync.Cond 結構中有一個 notifyList 隊列,如果複製 Cond,相當於複製了 notifyList 值,在併發場景下不同 goroutine 操作的並不是同一個 notifyList,會出現與預期不一致的效果,例如可能出現有些 goroutine 一直阻塞。
-
調用 Wait 操作前需加鎖,調用完之後釋放鎖 Wait 內部先釋放了鎖然後又加鎖。所以在調用 Wait 之前必須加鎖,調用完之後釋放鎖,否則會出現加鎖和釋放鎖不能配對問題。
-
Wait 通常放在在 for 循環內部調用,例如採用如下模式,因爲 waiter goroutine 被喚醒不等於等待條件被滿足,所以喚醒之後需要進一步檢查等待條件。
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
Understanding Condition Variable in Go[1]How to correctly use sync.Cond[2]
[1]
Understanding Condition Variable in Go: https://kaviraj.me/understanding-condition-variable-in-go/
[2]
How to correctly use sync.Cond: https://stackoverflow.com/questions/36857167/how-to-correctly-use-sync-cond
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/5oIfb4oyKRudVYxgaUJwVQ