被遺棄在角落裏的 sync-Con

Go 語言通過 go 關鍵字開啓 goroutine 讓開發者可以輕鬆地實現併發編程,而併發程序的有效運行,往往離不開 sync 包的保駕護航。目前,sync 包的賦能列表包括:sync.atomic 下的原子操作、sync.Map 併發安全 map、sync.Mutex 與 sync.RWMutex 提供的互斥鎖與讀寫鎖、sync.Pool 複用對象池、sync.Once 單例模式、 sync.Waitgroup 的多任務協作模式、sync.Cond 的監視器模式。當然,除了 sync 包,還有封裝層面更高的 channel 與 context。

要想寫出合格的 Go 程序,以上的這些併發原語是必須要掌握的。對於大多數 Gopher 而言,sync.Cond 應該是最爲陌生,本文將一探究竟。

sync.Cond 字面意義就是同步條件變量,它實現的是一種監視器(Monitor)模式。

In concurrent programming(also known as parallel programming), a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become false.

對於 Cond 而言,它實現一個條件變量,是 goroutine 間等待和通知的點。條件變量與共享的數據隔離,它可以同時阻塞多個 goroutine,直到另外的 goroutine 更改了條件變量,並通知喚醒阻塞着的一個或多個 goroutine。

初次接觸的讀者,可能會不太明白,那麼下面我們看一下 GopherCon 2018 上《Rethinking Classical Concurrency Patterns》 中的演示代碼例子。

 1type Item = int
 2
 3type Queue struct {
 4   items     []Item
 5   itemAdded sync.Cond
 6}
 7
 8func NewQueue() *Queue {
 9   q := new(Queue)
10   q.itemAdded.L = &sync.Mutex{} // 爲 Cond 綁定鎖
11   return q
12}
13
14func (q *Queue) Put(item Item) {
15   q.itemAdded.L.Lock()
16   defer q.itemAdded.L.Unlock()
17   q.items = append(q.items, item)
18   q.itemAdded.Signal()        // 當 Queue 中加入數據成功,調用 Singal 發送通知
19}
20
21func (q *Queue) GetMany(n int) []Item {
22   q.itemAdded.L.Lock()
23   defer q.itemAdded.L.Unlock()
24   for len(q.items) < n {     // 等待 Queue 中有 n 個數據
25      q.itemAdded.Wait()      // 阻塞等待 Singal 發送通知
26   }
27   items := q.items[:n:n]
28   q.items = q.items[n:]
29   return items
30}
31
32func main() {
33   q := NewQueue()
34
35   var wg sync.WaitGroup
36   for n := 10; n > 0; n-- {
37      wg.Add(1)
38      go func(n int) {
39         items := q.GetMany(n)
40         fmt.Printf("%2d: %2d\n", n, items)
41         wg.Done()
42      }(n)
43   }
44
45   for i := 0; i < 100; i++ {
46      q.Put(i)
47   }
48
49   wg.Wait()
50}

在這個例子中,Queue 是存儲數據 Item 的結構體,它通過 Cond 類型的 itemAdded 來控制數據的輸入與輸出。可以注意到,這裏通過 10 個 goroutine 來消費數據,但它們所需的數據量並不相等,我們可以稱之爲 batch,依次在 1-10 之間。之後,逐步添加 100 個數據至 Queue 中。最後,我們能夠看到 10 個 gotoutine 都能被喚醒,得到它想要的數據。

程序運行結果如下

 1 6: [ 7  8  9 10 11 12]
 2 5: [50 51 52 53 54]
 3 9: [14 15 16 17 18 19 20 21 22]
 4 1: [13]
 5 2: [33 34]
 6 4: [35 36 37 38]
 7 3: [39 40 41]
 8 7: [ 0  1  2  3  4  5  6]
 9 8: [42 43 44 45 46 47 48 49]
1010: [23 24 25 26 27 28 29 30 31 32]

當然,程序每次運行結果都不會相同,以上輸出只是某一種情況。

sync.Cond 實現

$GOPATH/src/sync/cond.go 中,Cond 的結構體定義如下

1type Cond struct {
2   noCopy noCopy
3   L Locker
4   notify  notifyList
5   checker copyChecker
6}

其中,noCopychecker 字段均是爲了避免 Cond 在使用過程中被複制,詳見小菜刀的 《no copy 機制》 一文。

L 是 Locker 接口,一般該字段的實際對象是 *RWmutex 或者 *Mutex

1type Locker interface {
2   Lock()
3   Unlock()
4}

notifyList 記錄的是一個基於票號的通知列表,這裏初次看註釋看不懂沒關係,和下文來回連貫着看。

1type notifyList struct {
2   wait   uint32         // 用於記錄下一個等待者 waiter 的票號
3   notify uint32         // 用於記錄下一個應該被通知的 waiter 的票號
4   lock   uintptr        // 內部鎖
5   head   unsafe.Pointer // 指向等待者 waiter 的隊列隊頭
6   tail   unsafe.Pointer // 指向等待者 waiter 的隊列隊尾
7}

其中,headtail 是指向 sudog 結構體的指針,sudog 是代表的處於等待列表的 goroutine,它本身就是雙向鏈表。值得一提的是,在 sudog 中有一個字段 ticket 就是用於給當前 goroutine 記錄票號使用的。

Cond 實現的核心模式爲票務系統(ticket system),每一個想要來買票的 goroutine (調用 Cond.Wait())我們稱之爲 waiter,票務系統會給每個 waiter 分配一個取票碼,等供票方有該取票碼的號時,就會喚醒 waiter。賣票的 goroutine 有兩種,第一種是調用 Cond.Signal() 的,它會按照票號喚醒一個買票的 waiter (如果有的話),第二種是調用 Cond.Broadcast() 的,它會通知喚醒所有的阻塞 waiter。爲了方便讀者能夠比較輕鬆地理解票務系統,下面我們給出圖解示例。

在 上文中,我們知道 Cond 字段中 notifyList 結構體是一個記錄票號的通知列表。這裏將 notifyList 比作排隊取票買電影票,當 G1 通過 Wait 來買票時,發現此時並沒有票可買,因此他只能阻塞等待有票之後的通知,此時他手上已經取得了專屬取票碼 0。同樣的,G2 和 G3 也同樣無票可買,它們分別取到了自己的取票碼 1 和 2。而 G4 是電影票提供商,它是賣票的,它通過兩次 Signal 先後帶來了兩張票,按照票號順序依次通知了 G1 和 G2 來取票,並把 notify 更新爲了最新的 1。G5 也是買票的,它發現此時已經無票可買了,拿了自己的取票碼 3 ,就阻塞等待了。G6 是個大票商,它通過 Broadcast 可以滿足所有正在等待的買票者都買到票,此時等待的是 G3 和 G5,因此他直接喚醒了 G3 和 G5,並將 notify 更新到和 wait 值相等。

理解了上述取票系統的運作原理後,我們下面來看 Cond 包下四個實際對外方法函數的實現。

1func NewCond(l Locker) *Cond {
2   return &Cond{L: l}
3}

用於初始化 Cond  對象,就是初始化控制鎖。

1func (c *Cond) Wait() {
2   c.checker.check()
3   t := runtime_notifyListAdd(&c.notify)
4   c.L.Unlock()
5   runtime_notifyListWait(&c.notify, t)
6   c.L.Lock()
7}

runtime_notifyListAdd  的實現在 runtime/sema.go 的 notifyListAdd ,它用於原子性地增加等待者的 waiter 票號,並返回當前 goroutine 應該取的票號值 t 。runtime_notifyListWait 的實現在 runtime/sema.go 的 notifyListWait,它會嘗試去比較此時 goroutine 的應取票號 tnotify 中記錄的當前應該被通知的票號。如果 t 小於當前票號,那麼直接能得到返回,否則將會則塞等待,通知取號。

同時,這裏需要注意的是,由於在進入 runtime_notifyListWait 時,當前 goroutine 通過 c.L.Unlock() 將鎖解了,這就意味着有可能會有多個 goroutine 來讓條件發生變化。那麼,當前 goroutine 是不能保證在 runtime_notifyListWait 返回後,條件就一定是真的,因此需要循環判斷條件。正確的 Wait 使用姿勢如下:

1//    c.L.Lock()
2//    for !condition() {
3//        c.Wait()
4//    }
5//    ... make use of condition ...
6//    c.L.Unlock()
1func (c *Cond) Signal() {
2   c.checker.check()
3   runtime_notifyListNotifyOne(&c.notify)
4}

runtime_notifyListNotifyOne 的詳細實現在 runtime/sema.go 的 notifyListNotifyOne,它的目的就是通知 waiter 取票。具體操作是:如果在上一次通知取票之後沒有新的 waiter 取票者,那麼該函數會直接返回。否則,它會將取票號 +1,並通知喚醒等待取票的 waiter。

需要注意的是,調用 Signal 方法時,並不需要持有 c.L 鎖。

1func (c *Cond) Broadcast() {
2   c.checker.check()
3   runtime_notifyListNotifyAll(&c.notify)
4}

runtime_notifyListNotifyAll 的詳細實現在 runtime/sema.go 的 notifyListNotifyAll,它會通知喚醒所有的 waiter,並將 notify 值置爲 和 wait 值相等。調用 Broadcast 方法時,也不需要持有 c.L 鎖。

討論

$GOPATH/src/sync/cond.go 下,我們可以發現其代碼量非常之少,但它呈現的只是核心邏輯,其實現細節位於 runtime/sema.go 之中,依賴的是 runtime 層的調度原語,對細節感興趣的讀者可以深入學習。

問題來了,爲什麼在日常開發中,我們很少會使用到 sync.Cond ?

前文中我們提到,使用 Cond.Wait 正確姿勢如下

1    c.L.Lock()
2    for !condition() {
3        c.Wait()
4    }
5    ... make use of condition ...
6    c.L.Unlock()

以文章開頭的例子而言,如果在每次調用 Put 方法時,使用 Broadcast 方法喚醒所有的 waiter,那麼很大概率上被喚醒的 waiter 醒來發現條件並不滿足,又會重新進入等待。儘管是調用 Signal 方法喚醒指定的 waiter,但是它也不能保證喚醒的 waiter 條件一定滿足。因此,在實際的使用中,我們需要儘量保證喚醒操作是有效地,爲了做到這點,代碼的複雜度難免會增加。

還是以文章開頭例子爲例,如果同時有多個 goroutine 執行 GetMany(3) 和 GetMany(3000),執行 GetMany(3) 與執行 GetMany(3000) 的 goroutine 被喚醒的概率是一樣的,但是由於 GetMany(3) 只需要 3 個數據就能滿足條件,那麼如果一直存在 GetMany(3) 的 goroutine,執行 GetMany(3000) 的 goroutine 將永遠拿不到數據,一直被無效喚醒。

條件變量的意義在於讓 goroutine 等待某種條件發生時進入睡眠狀態。但是這會讓 goroutine 在等待條件時,可能會錯過一些需要注意的其他事件。例如,調用 Cond.Wait 的函數中包含了 context 上下文,當 context 傳來取消信號時,它並不能像我們期望的一樣,獲取到取消信號並退出。Cond 的使用,讓我們不能同時選擇(select)條件和其他事件。

通過對 sync.Cond 幾個對外方法的分析,我們不難看到,它的使用場景是可以被 channel 所代替的,但是這也會增加代碼的複雜性。上文中的例子,可以使用 channel 改寫如下。

 1type Item = int
 2
 3type waiter struct {
 4    n int
 5    c chan []Item
 6}
 7
 8type state struct {
 9    items []Item
10    wait  []waiter
11}
12
13type Queue struct {
14    s chan state
15}
16
17func NewQueue() *Queue {
18    s := make(chan state, 1)
19    s <- state{}
20    return &Queue{s}
21}
22
23func (q *Queue) Put(item Item) {
24    s := <-q.s
25    s.items = append(s.items, item)
26    for len(s.wait) > 0 {
27        w := s.wait[0]
28        if len(s.items) < w.n {
29            break
30        }
31        w.c <- s.items[:w.n:w.n]
32        s.items = s.items[w.n:]
33        s.wait = s.wait[1:]
34    }
35    q.s <- s
36}
37
38func (q *Queue) GetMany(n int) []Item {
39    s := <-q.s
40    if len(s.wait) == 0 && len(s.items) >= n {
41        items := s.items[:n:n]
42        s.items = s.items[n:]
43        q.s <- s
44        return items
45    }
46
47    c := make(chan []Item)
48    s.wait = append(s.wait, waiter{n, c})
49    q.s <- s
50
51    return <-c
52}

最後,雖然在上文的討論中都是列出的 sync.Cond 潛在問題,但是如果開發者能夠在使用中考慮到以上的幾點問題,對於監視器模型的實現而言,在代碼的語義邏輯上,sync.Cond 的使用會比 channel 的模式更易理解和維護。記住一點,通俗易懂的代碼模型總是比深奧的炫技要接地氣。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/-D68Ua4LYk6XVTn3yrZc1w