sync-Cond 設計與實現

概述

sync.Cond 實現了一種條件變量同步原語,可以讓一個 goroutine 集合在滿足特定條件時被喚醒。

sync.Cond 典型的使用場景是 生產-消費者模式,多個 goroutine 等待某個事件發生, 單個 goroutine 通知某個事件已發生。比如電商中的用戶下單事件發生時,會通知到訂單、用戶、積分、優惠券、倉儲等服務,如果是單個生產者對單個消費者,直接使用 互斥鎖channel 就可以。

爲什麼多個消費者模式不使用互斥鎖或 channel 呢?

可以想象一個非常簡單的場景: 有一個 goroutine 在異步接收數據,剩下的多個 goroutine 必須等待該 goroutine 接收完才能讀取。在這種情況下,如果單純使用 互斥鎖channel,就只能有一個 goroutine 可以等待並讀取到數據,其他的 goroutine 沒辦法讀取。

當然我們可以通過折衷的方案來解決,例如 可以創建一個全局變量,用來標誌這個 goroutine 數據是否接收完成,剩下的 goroutine 反覆檢查該全局變量,直到滿足條件。或者 可以創建多個 channel,每個 goroutine 阻塞在一個 channel 上面,接收數據的 goroutine 在數據接收完畢後,逐個通知。但是不論哪種方式,實現複雜度都大大增加了。

sync.Cond 提供了簡潔優雅的方式來解決上述問題。

示例

通過一個小例子展示 sync.Cond 的使用方法。

package main

import (
 "fmt"
 "sync"
 "time"
)

// 條件變量
var done = false

// 數據讀取操作
func read(name string, c *sync.Cond) {
 c.L.Lock()
 for !done {
    // 等待生產者寫入通知
  c.Wait()
 }
 fmt.Println(name, "starts reading")
 c.L.Unlock()
}

// 數據寫入操作
func write(name string, c *sync.Cond) {
 fmt.Println(name, "starts writing")
 time.Sleep(100 * time.Millisecond)

 c.L.Lock()
  // 設置條件變量
 done = true 
 c.L.Unlock()

 fmt.Println(name, "wakes all")
  // 通知所有消費者
 c.Broadcast() 
}

func main() {
 // 創建對象時傳入一個互斥鎖
 cond := sync.NewCond(&sync.Mutex{})

 // 3 個消費者
 go read("reader-1", cond)
 go read("reader-2", cond)
 go read("reader-3", cond)

 // 1 個生產者
 write("writer-1", cond)

 time.Sleep(time.Second)
}
$ go run main.go

# 輸出如下
writer-1 starts writing
writer-1 wakes all
reader-2 starts reading
reader-1 starts reading
reader-3 starts reading

從輸出結果中可以看到,消費者剛開始時調用 Wait 方法阻塞,直到生產者 (write) 寫入完成後調用 Broadcast 方法通知所有消費者 (read),然後所有消費者依次輸出。

調用關係圖

內部實現

我們來探究一下 sync.Cond 的內部實現,文件路徑爲 $GOROOT/src/sync/cond.go,筆者的 Go 版本爲 go1.19 linux/amd64

Cond 對象

Cond 對象表示同步條件變量,可以讓 goroutins 等待或通知某個事件發生,Cond 對象一旦使用後,就不能再複製。

每一個 Cond 對象都持有一個對應的 Locker 接口 (通常是一個互斥鎖或讀寫鎖),當條件發生變化以及調用 Wait 方法時,必須持有對應的鎖。

在簡單的應用場景中,更好的選擇是使用 channel 完成同步操作 (Go 的標準庫設計理念是上層應用盡量使用 channel 作爲同步原語),可以將兩者的對應關係簡單概況如下:

Signal 和 Broadcast

type Cond struct {
 // 保證編譯期間不會發生複製
 noCopy noCopy
 // 當訪問或者修改條件時,必須持有 L
 L Locker
 // goroutine 鏈表
 notify  notifyList
 // 保證運行期間不會發生複製
 checker copyChecker
}

notifyList 對象

notifyList 對象表示一個 goroutine 鏈表數據結構。

// runtime/sema.go

type notifyList struct {
 // 等待的 goroutine 索引,可以在沒有獲取鎖的情況下原子性遞增
 wait uint32

 // 已經通知到的 goroutine 索引
 // 可以在沒有獲取鎖的情況下進行讀取操作,但是必須在獲得鎖的情況下進行寫入操作
 notify uint32

 lock mutex
 // 等待索引和已通知索引可以是環形隊列結構
 // 鏈表頭指針
 head *sudog
  // 鏈表尾指針
 tail *sudog
}

sync.Cond 對象結構

NewCond 方法

NewCond 方法創建一個 Cond 對象,參數爲一個 Locker 接口。

func NewCond(l Locker) *Cond {
 return &Cond{L: l}
}

Wait 方法

Wait 方法 (阻塞調用) 會解鎖 c.L 字段並且休眠當前 goroutine,等到當前 goroutine 被喚醒後,Wait 方法在返回之前再對 c.L 字段加鎖。

func (c *Cond) Wait() {
 // 複製檢測
 c.checker.check()
 // 等待索引 + 1
 t := runtime_notifyListAdd(&c.notify)
  // goroutine 休眠之前先解鎖 (否則其他 goroutine 獲取不到鎖,會造成死鎖問題)
 c.L.Unlock()
  // 等待喚醒,並傳遞等待索引
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

runtime_notifyListAdd 方法

runtime_notifyListAdd 方法通過鏈接器鏈接到 notifyListAdd 方法,notifyListAdd 方法將當前調用方 goroutine 添加到通知鏈表中以便接收通知。

// runtime/sema.go

//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
 // 等待索引計數 + 1
  return atomic.Xadd(&l.wait, 1) - 1
}

runtime_notifyListWait 方法

runtime_notifyListWait 方法通過鏈接器鏈接到 notifyListWait 方法,如果在調用 notifyListAdd 方法之後已經發送過通知,notifyListWait 就會立即返回,否則就陷入阻塞。

// runtime/sema.go

//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
 // 如果之前已經發送過通知,直接返回即可
 if less(t, l.notify) {
  unlock(&l.lock)
  return
 }

 // 獲取當前 goroutine 並追加到鏈表尾部
 s := acquireSudog()
 s.g = getg()
 // 獲取等待索引
 s.ticket = t

 // 將 goroutine 加入到鏈表中
 if l.tail == nil {
  l.head = s
 } else {
  l.tail.next = s
 }
 l.tail = s

 // 休眠當前 goroutine
 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
 // 歸還 goroutine
  releaseSudog(s)
}

Signal 方法

Signal 方法喚醒鏈表頭部等待的 goroutine

func (c *Cond) Signal() {
  // 複製檢測
 c.checker.check()
 runtime_notifyListNotifyOne(&c.notify)
}

runtime_notifyListNotifyOne 方法

runtime_notifyListNotifyOne 方法通過鏈接器鏈接到 notifyListNotifyOne 方法,喚醒鏈表頭部的 goroutine

// runtime/sema.go

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
 // 如果已通知索引和等待索引相同
 // 說明沒有等待的 goroutine, 直接返回
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }

 // 獲取已通知索引
  t := l.notify
 // 如果已通知索引和等待索引相同
 // 說明沒有等待的 goroutine, 直接返回
 // 又是一個經典的雙重檢測
 if t == atomic.Load(&l.wait) {
  unlock(&l.lock)
  return
 }

 // 已通知索引 + 1
  atomic.Store(&l.notify, t+1)

 // 根據已通知索引,找到對應的 goroutine 喚醒
 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
  if s.ticket == t {
   // 喚醒滿足條件的 goroutine
   readyWithTime(s, 4)
   return
  }
 }
}

爲什麼不直接喚醒鏈表的頭部元素呢

這就是 已通知索引 notify 字段存在的意義,因爲獲取等待索引和加入到鏈表兩個步驟不是原子操作,這意味着在併發場景下,會出現順序不一致的情況。

例如,goroutine 對應的等待索引爲 2, 但是因爲併發問題,加入到鏈表的時候,排到了第 3 個位置,如圖所示:

亂序問題示例

不過不需要擔心,我們可以根據 已通知索引 notify 字段,保證在發送單個通知時保證順序的一致性,避免亂序可能帶來的 先到的 goroutine 反而等待時間長 這類問題。

當前 notify 字段對應的 goroutine 通知後,會變更指針到下一個 goroutine,如圖所示:

亂序問題解決

從算法時間複雜度來分析,直接喚醒鏈表頭部元素是 O(1), 通過 notify 字段喚醒是 O(N), 但是官方的註釋中寫道:

This scan looks linear but essentially always stops quickly.

所以即便出現亂序,notify 索引字段對應的 goroutine 也不會太靠後,所以不會產生太多的性能問題。

Broadcast 方法

Broadcast 方法喚醒所有等待的 goroutine

func (c *Cond) Broadcast() {
 c.checker.check()
 runtime_notifyListNotifyAll(&c.notify)
}

runtime_notifyListNotifyAll 方法

runtime_notifyListNotifyAll 方法通過鏈接器鏈接到 notifyListNotifyAll 方法,喚醒所有等待的 goroutine

// runtime/sema.go

//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
  // 沒有等待的 goroutine, 直接返回
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }

 // 既然是全部喚醒,也就不用擔心上面提到的亂序問題了
 // 直接遍歷 goroutine 鏈表,逐個喚醒 goroutine
 for s != nil {
  next := s.next
  s.next = nil
  readyWithTime(s, 4)
  s = next
 }
}

check 方法

Cond.copyChecker 字段持有指向自身的指針,用來檢測是否被複制,當指針值和實際地址值不一致時,說明發生了複製。

type copyChecker uintptr

func (c *copyChecker) check() {
 if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
  !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
  // 臨界區域重複檢測,避免原子對比後的瞬間,值被複制
  uintptr(*c) != uintptr(unsafe.Pointer(c)) {
  panic("sync.Cond is copied")
 }
}

check 方法的實現很有意思,裏面有 3 個判斷條件:

爲什麼 CAS 操作 之後又重複比較了一次呢?主要是針對臨界區的檢測,因爲可能會出現一種極端情況: CAS 操作 之後,copyChecker 瞬間被複制了

noCopy 對象

noCopy 對象可以添加到具體的結構體中,實現 "首次使用之後,無法被複制" 的功能 (由編譯器實現)。

noCopy.Lock 方法是一個空操作,由 go vet 工具鏈中的 -copylocks checker 參數指令使用。

type noCopy struct{}

func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

小結

sync.Cond 不是一個常用的同步機制,在條件變量長時間無法滿足時,sync.Cond 能夠讓出處理器的使用權,和單純使用 for {} 進行無限等待相比, 可以提高 CPU 的利用率。但是使用時我們也需要注意以下問題:

Reference

  1. Go sync.Cond[1]

  2. Go 設計與實現 [2]

  3. Golang sync.Cond 條件變量源碼分析 [3]

參考資料

[1]

Go sync.Cond: https://geektutu.com/post/hpg-sync-cond.html

[2]

Go 設計與實現: https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#cond

[3]

Golang sync.Cond 條件變量源碼分析: https://www.cyhone.com/articles/golang-sync-cond/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VAx257qmOWDovmx-eTlyww