源碼剖析 sync-cond-條件變量的實現機制)
前言
哈嘍,大家好,我是asong
,這是我併發編程系列的第三篇文章,這一篇我們一起來看看sync.Cond
的使用與實現。之前寫過java
的朋友對等待 / 通知 (wait/notify) 機制一定很熟悉,可以利用等待 / 通知機制實現阻塞或者喚醒,在Go
語言使用Cond
也可以達到同樣的效果,接下來我們一起來看看它的使用與實現。
sync.Cond
的基本使用
Go
標準庫提供了Cond
原語,爲等待 / 通知場景下的併發問題提供支持。Cond
他可以讓一組的Goroutine
都在滿足特定條件 (這個等待條件有很多,可以是某個時間點或者某個變量或一組變量達到了某個閾值,還可以是某個對象的狀態滿足了特定的條件) 時被喚醒,Cond
是和某個條件相關,這個條件需要一組goroutine
協作共同完成,在條件還沒有滿足的時候,所有等待這個條件的goroutine
都會被阻塞住,只有這一組goroutine
通過協作達到了這個條件,等待的goroutine
纔可以繼續進行下去。
先看這樣一個例子:
var (
done = false
topic = "Golang夢工廠"
)
func main() {
cond := sync.NewCond(&sync.Mutex{})
go Consumer(topic,cond)
go Consumer(topic,cond)
go Consumer(topic,cond)
Push(topic,cond)
time.Sleep(5 * time.Second)
}
func Consumer(topic string,cond *sync.Cond) {
cond.L.Lock()
for !done{
cond.Wait()
}
fmt.Println("topic is ",topic," starts Consumer")
cond.L.Unlock()
}
func Push(topic string,cond *sync.Cond) {
fmt.Println(topic,"starts Push")
cond.L.Lock()
done = true
cond.L.Unlock()
fmt.Println("topic is ",topic," wakes all")
cond.Broadcast()
}
// 運行結果
Golang夢工廠 starts Push
topic is Golang夢工廠 wakes all
topic is Golang夢工廠 starts Consumer
topic is Golang夢工廠 starts Consumer
topic is Golang夢工廠 starts Consumer
上述代碼我們運行了4
個Goroutine
,其中三個Goroutine
分別做了相同的事情,通過調用cond.Wait()
等特定條件的滿足,1 個Goroutine
會調用cond.Broadcast
喚醒所用陷入等待的Goroutine
。畫個圖看一下更清晰:
我們看上面這一段代碼,Cond
使用起來並不簡單,使用不當就出現不可避免的問題,所以,有的開發者會認爲,Cond
是唯一難以掌握的Go
併發原語。爲了讓大家能更好的理解Cond
,接下來我們一起看看Cond
的實現原理。
Cond
實現原理
Cond
的實現還是比較簡單的,代碼量比較少,複雜的邏輯已經被Locker
或者runtime
的等待隊列實現了,所以我們來看這些源代碼也會輕鬆一些。首先我們來看一下它的結構體:
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
主要有4
個字段:
-
nocopy
:之前在講waitGroup
時介紹過,保證結構體不會在編譯器期間拷貝,原因就不在這裏說了,想了解的看這篇文章源碼剖析 sync.WaitGroup(文末思考題你能解釋一下嗎?) -
checker
:用於禁止運行期間發生拷貝,雙重檢查 (Double check
) -
L
:可以傳入一個讀寫鎖或互斥鎖,當修改條件或者調用wait
方法時需要加鎖 -
notify
:通知鏈表,調用wait()
方法的Goroutine
會放到這個鏈表中,喚醒從這裏取。我們可以看一下notifyList
的結構:
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
我們簡單分析一下notifyList
的各個字段:
-
wait
:下一個等待喚醒Goroutine
的索引,他是在鎖外自動遞增的. -
notify
:下一個要通知的Goroutine
的索引,他可以在鎖外讀取,但是隻能在鎖持有的情況下寫入. -
head
:指向鏈表的頭部 -
tail
:指向鏈表的尾部
基本結構我們都知道了,下面我就來看一看Cond
提供的三種方法是如何實現的~。
wait
我們先來看一下wait
方法源碼部分:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
代碼量不多,執行步驟如下:
-
執行運行期間拷貝檢查,如果發生了拷貝,則直接
panic
程序 -
調用
runtime_notifyListAdd
將等待計數器加一併解鎖; -
調用
runtime_notifyListWait
等待其他Goroutine
的喚醒並加鎖
runtime_notifyListAdd
的實現:
// See runtime/sema.go for documentation.
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
代碼實現比較簡單,原子操作將等待計數器加 1,因爲wait
代表的是下一個等待喚醒Goroutine
的索引,所以需要減 1 操作。
runtime_notifyListWait
的實現:
// See runtime/sema.go for documentation.
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}
// Enqueue itself.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
這裏主要執行步驟如下:
-
檢查當前
wait
與notify
索引位置是否匹配,如果已經被通知了,便立即返回. -
獲取當前
Goroutine
,並將當前Goroutine
追加到鏈表末端. -
調用
goparkunlock
方法讓當前Goroutine
進入等待狀態,也就是進入睡眠,等待喚醒 -
被喚醒後,調用
releaseSudog
釋放當前等待列表中的Goroutine
看完源碼我們來總結一下注意事項:
wait
方法會把調用者放入Cond
的等待隊列中並阻塞,直到被喚醒,調用wait
方法必須要持有c.L
鎖。
signal
和Broadcast
signal
和Broadcast
都會喚醒等待隊列,不過signal
是喚醒鏈表最前面的Goroutine
,Boradcast
會喚醒隊列中全部的Goroutine
。下面我們分別來看一下signal
和broadcast
的源碼:
signal
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
上面我們看wait
源代碼時,每次都會調用都會原子遞增wait
,那麼這個wait
就代表當前最大的wait
值,對應喚醒的時候,也就會對應一個notify
屬性,我們在notifyList
鏈表中逐個檢查,找到ticket
對應相等的notify
屬性。這裏大家肯定會有疑惑,我們爲何不直接取鏈表頭部喚醒呢?
notifyList
並不是一直有序的,wait
方法中調用runtime_notifyListAdd
和runtime_notifyListWait
完全是兩個獨立的行爲,中間還有釋放鎖的行爲,而當多個 goroutine
同時進行時,中間會產生進行併發操作,這樣就會出現亂序,所以採用這種操作即使在 notifyList
亂序的情況下,也能取到最先Wait
的 goroutine
。
broadcast
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
全部喚醒實現要簡單一些,主要是通過調用readyWithTime
方法喚醒鏈表中的goroutine
,喚醒順序也是按照加入隊列的先後順序,先加入的會先被喚醒,而後加入的可能 Goroutine
需要等待調度器的調度。
最後我們總結一下使用這兩個方法要注意的問題:
-
Signal
:允許調用者喚醒一個等待此Cond
的Goroutine
,如果此時沒有等待的goroutine
,顯然無需通知waiter
;如果Cond
等待隊列中有一個或者多個等待的goroutine
,則需要從等待隊列中移除第一個goroutine
並把它喚醒。調用Signal
方法時,不強求你一定要持有c.L
的鎖。 -
broadcast
:允許調用者喚醒所有等待此Cond
的goroutine
。如果此時沒有等待的goroutine
,顯然無需通知 waiter;如果Cond
等待隊列中有一個或者多個等待的goroutine
,則清空所有等待的goroutine
,並全部喚醒,不強求你一定要持有c.L
的鎖。
注意事項
-
調用
wait
方法的時候一定要加鎖,否則會導致程序發生panic
. -
wait
調用時需要檢查等待條件是否滿足,也就說goroutine
被喚醒了不等於等待條件被滿足,等待者被喚醒,只是得到了一次檢查的機會而已,推薦寫法如下:
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
Signal
和Boardcast
兩個喚醒操作不需要加鎖
總結
其實Cond
在實際項目中被使用的機會比較少,Go
特有的channel
就可以代替它,暫時只在Kubernetes
項目中看到了應用,使用場景是每次往隊列中成功增加了元素後就需要調用 Broadcast
通知所有的等待者,使用Cond
就很合適,相比channel
減少了代碼複雜性。
好啦,這篇文章就到這裏啦,素質三連(分享、點贊、在看)都是筆者持續創作更多優質內容的動力!
創建了一個 Golang 學習交流羣,歡迎各位大佬們踊躍入羣,我們一起學習交流。入羣方式:加我 vx 拉你入羣,或者公衆號獲取入羣二維碼
結尾給大家發一個小福利吧,最近我在看 [微服務架構設計模式] 這一本書,講的很好,自己也收集了一本 PDF,有需要的小夥可以到自行下載。獲取方式:關注公衆號:[Golang 夢工廠],後臺回覆:[微服務],即可獲取。
我翻譯了一份 GIN 中文文檔,會定期進行維護,有需要的小夥伴後臺回覆 [gin] 即可下載。
翻譯了一份 Machinery 中文文檔,會定期進行維護,有需要的小夥伴們後臺回覆 [machinery] 即可獲取。
我是 asong,一名普普通通的程序猿,讓我們一起慢慢變強吧。歡迎各位的關注,我們下期見~~~
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/szSxatDakPQMUA8Vm9u3qQ