sync-Cond 設計與實現
概述
sync.Cond
實現了一種條件變量同步原語,可以讓一個 goroutine
集合在滿足特定條件時被喚醒。
sync.Cond
典型的使用場景是 生產-消費者模式
,多個 goroutine
等待某個事件發生, 單個 goroutine
通知某個事件已發生。比如電商中的用戶下單事件發生時,會通知到訂單、用戶、積分、優惠券、倉儲等服務,如果是單個生產者對單個消費者,直接使用 互斥鎖
或 channel
就可以。
爲什麼多個消費者模式不使用互斥鎖或 channel 呢?
可以想象一個非常簡單的場景: 有一個 goroutine
在異步接收數據,剩下的多個 goroutine
必須等待該 goroutine
接收完才能讀取。在這種情況下,如果單純使用 互斥鎖
或 channel
,就只能有一個 goroutine
可以等待並讀取到數據,其他的 goroutine
沒辦法讀取。
當然我們可以通過折衷的方案來解決,例如 可以創建一個全局變量,用來標誌這個 goroutine
數據是否接收完成,剩下的 goroutine
反覆檢查該全局變量,直到滿足條件。或者 可以創建多個 channel
,每個 goroutine
阻塞在一個 channel
上面,接收數據的 goroutine
在數據接收完畢後,逐個通知。但是不論哪種方式,實現複雜度都大大增加了。
sync.Cond
提供了簡潔優雅的方式來解決上述問題。
示例
通過一個小例子展示 sync.Cond
的使用方法。
package main
import (
"fmt"
"sync"
"time"
)
// 條件變量
var done = false
// 數據讀取操作
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
// 等待生產者寫入通知
c.Wait()
}
fmt.Println(name, "starts reading")
c.L.Unlock()
}
// 數據寫入操作
func write(name string, c *sync.Cond) {
fmt.Println(name, "starts writing")
time.Sleep(100 * time.Millisecond)
c.L.Lock()
// 設置條件變量
done = true
c.L.Unlock()
fmt.Println(name, "wakes all")
// 通知所有消費者
c.Broadcast()
}
func main() {
// 創建對象時傳入一個互斥鎖
cond := sync.NewCond(&sync.Mutex{})
// 3 個消費者
go read("reader-1", cond)
go read("reader-2", cond)
go read("reader-3", cond)
// 1 個生產者
write("writer-1", cond)
time.Sleep(time.Second)
}
$ go run main.go
# 輸出如下
writer-1 starts writing
writer-1 wakes all
reader-2 starts reading
reader-1 starts reading
reader-3 starts reading
從輸出結果中可以看到,消費者剛開始時調用 Wait
方法阻塞,直到生產者 (write) 寫入完成後調用 Broadcast
方法通知所有消費者 (read),然後所有消費者依次輸出。
調用關係圖
內部實現
我們來探究一下 sync.Cond
的內部實現,文件路徑爲 $GOROOT/src/sync/cond.go
,筆者的 Go 版本爲 go1.19 linux/amd64
。
Cond 對象
Cond
對象表示同步條件變量,可以讓 goroutins
等待或通知某個事件發生,Cond 對象一旦使用後,就不能再複製。
每一個 Cond
對象都持有一個對應的 Locker
接口 (通常是一個互斥鎖或讀寫鎖),當條件發生變化以及調用 Wait
方法時,必須持有對應的鎖。
在簡單的應用場景中,更好的選擇是使用 channel
完成同步操作 (Go 的標準庫設計理念是上層應用盡量使用 channel
作爲同步原語),可以將兩者的對應關係簡單概況如下:
-
關閉 channel 對應 Cond.Broadcast 方法
-
向 channel 發送數據對應 Cond.Signal 方法
Signal 和 Broadcast
type Cond struct {
// 保證編譯期間不會發生複製
noCopy noCopy
// 當訪問或者修改條件時,必須持有 L
L Locker
// goroutine 鏈表
notify notifyList
// 保證運行期間不會發生複製
checker copyChecker
}
notifyList 對象
notifyList
對象表示一個 goroutine
鏈表數據結構。
// runtime/sema.go
type notifyList struct {
// 等待的 goroutine 索引,可以在沒有獲取鎖的情況下原子性遞增
wait uint32
// 已經通知到的 goroutine 索引
// 可以在沒有獲取鎖的情況下進行讀取操作,但是必須在獲得鎖的情況下進行寫入操作
notify uint32
lock mutex
// 等待索引和已通知索引可以是環形隊列結構
// 鏈表頭指針
head *sudog
// 鏈表尾指針
tail *sudog
}
sync.Cond 對象結構
NewCond 方法
NewCond
方法創建一個 Cond
對象,參數爲一個 Locker
接口。
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
Wait 方法
Wait
方法 (阻塞調用) 會解鎖 c.L
字段並且休眠當前 goroutine
,等到當前 goroutine
被喚醒後,Wait
方法在返回之前再對 c.L
字段加鎖。
func (c *Cond) Wait() {
// 複製檢測
c.checker.check()
// 等待索引 + 1
t := runtime_notifyListAdd(&c.notify)
// goroutine 休眠之前先解鎖 (否則其他 goroutine 獲取不到鎖,會造成死鎖問題)
c.L.Unlock()
// 等待喚醒,並傳遞等待索引
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
runtime_notifyListAdd 方法
runtime_notifyListAdd
方法通過鏈接器鏈接到 notifyListAdd
方法,notifyListAdd
方法將當前調用方 goroutine
添加到通知鏈表中以便接收通知。
// runtime/sema.go
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// 等待索引計數 + 1
return atomic.Xadd(&l.wait, 1) - 1
}
runtime_notifyListWait 方法
runtime_notifyListWait
方法通過鏈接器鏈接到 notifyListWait
方法,如果在調用 notifyListAdd
方法之後已經發送過通知,notifyListWait
就會立即返回,否則就陷入阻塞。
// runtime/sema.go
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
// 如果之前已經發送過通知,直接返回即可
if less(t, l.notify) {
unlock(&l.lock)
return
}
// 獲取當前 goroutine 並追加到鏈表尾部
s := acquireSudog()
s.g = getg()
// 獲取等待索引
s.ticket = t
// 將 goroutine 加入到鏈表中
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// 休眠當前 goroutine
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
// 歸還 goroutine
releaseSudog(s)
}
Signal 方法
Signal
方法喚醒鏈表頭部等待的 goroutine
。
func (c *Cond) Signal() {
// 複製檢測
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
runtime_notifyListNotifyOne 方法
runtime_notifyListNotifyOne
方法通過鏈接器鏈接到 notifyListNotifyOne
方法,喚醒鏈表頭部的 goroutine
。
// runtime/sema.go
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// 如果已通知索引和等待索引相同
// 說明沒有等待的 goroutine, 直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 獲取已通知索引
t := l.notify
// 如果已通知索引和等待索引相同
// 說明沒有等待的 goroutine, 直接返回
// 又是一個經典的雙重檢測
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// 已通知索引 + 1
atomic.Store(&l.notify, t+1)
// 根據已通知索引,找到對應的 goroutine 喚醒
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
// 喚醒滿足條件的 goroutine
readyWithTime(s, 4)
return
}
}
}
爲什麼不直接喚醒鏈表的頭部元素呢?
這就是 已通知索引 notify
字段存在的意義,因爲獲取等待索引和加入到鏈表兩個步驟不是原子操作,這意味着在併發場景下,會出現順序不一致的情況。
例如,goroutine
對應的等待索引爲 2, 但是因爲併發問題,加入到鏈表的時候,排到了第 3 個位置,如圖所示:
亂序問題示例
不過不需要擔心,我們可以根據 已通知索引 notify
字段,保證在發送單個通知時保證順序的一致性,避免亂序可能帶來的 先到的 goroutine 反而等待時間長
這類問題。
當前 notify
字段對應的 goroutine
通知後,會變更指針到下一個 goroutine
,如圖所示:
亂序問題解決
從算法時間複雜度來分析,直接喚醒鏈表頭部元素是 O(1)
, 通過 notify
字段喚醒是 O(N)
, 但是官方的註釋中寫道:
This scan looks linear but essentially always stops quickly.
所以即便出現亂序,notify
索引字段對應的 goroutine
也不會太靠後,所以不會產生太多的性能問題。
Broadcast 方法
Broadcast
方法喚醒所有等待的 goroutine
。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
runtime_notifyListNotifyAll 方法
runtime_notifyListNotifyAll
方法通過鏈接器鏈接到 notifyListNotifyAll
方法,喚醒所有等待的 goroutine
。
// runtime/sema.go
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// 沒有等待的 goroutine, 直接返回
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// 既然是全部喚醒,也就不用擔心上面提到的亂序問題了
// 直接遍歷 goroutine 鏈表,逐個喚醒 goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
check 方法
Cond.copyChecker
字段持有指向自身的指針,用來檢測是否被複制,當指針值和實際地址值不一致時,說明發生了複製。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
// 臨界區域重複檢測,避免原子對比後的瞬間,值被複制
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
check
方法的實現很有意思,裏面有 3 個判斷條件:
-
uintptr(*c) != uintptr(unsafe.Pointer(c))
比較 copyChecker 的指針值 (默認是 0) -
atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c)))
CAS 操作 copyChecker 的指針值 -
uintptr(*c) != uintptr(unsafe.Pointer(c))
比較 copyChecker 的指針值
爲什麼 CAS 操作
之後又重複比較了一次呢?主要是針對臨界區的檢測,因爲可能會出現一種極端情況: CAS 操作
之後,copyChecker 瞬間被複制了。
noCopy 對象
noCopy
對象可以添加到具體的結構體中,實現 "首次使用之後,無法被複制" 的功能 (由編譯器實現)。
noCopy.Lock
方法是一個空操作,由 go vet
工具鏈中的 -copylocks checker 參數指令使用。
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
小結
sync.Cond
不是一個常用的同步機制,在條件變量長時間無法滿足時,sync.Cond
能夠讓出處理器的使用權,和單純使用 for {}
進行無限等待相比, 可以提高 CPU 的利用率。但是使用時我們也需要注意以下問題:
-
Wait
方法在調用之前一定要完成加鎖操作,否則程序會panic
(因爲方法內部會釋放互斥鎖) -
Signal
方法會喚醒鏈表 (隊列) 最前面、等待最久的goroutine
(通過等待索引字段保證順序) -
Broadcast
方法會按照鏈表的順序 (並不是先進先出,因爲可能存在亂序問題) 喚醒所有等待的goroutine
Reference
-
Go sync.Cond[1]
-
Go 設計與實現 [2]
-
Golang sync.Cond 條件變量源碼分析 [3]
參考資料
[1]
Go sync.Cond: https://geektutu.com/post/hpg-sync-cond.html
[2]
Go 設計與實現: https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#cond
[3]
Golang sync.Cond 條件變量源碼分析: https://www.cyhone.com/articles/golang-sync-cond/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/VAx257qmOWDovmx-eTlyww