源碼剖析 sync-cond-條件變量的實現機制)

前言

哈嘍,大家好,我是asong,這是我併發編程系列的第三篇文章,這一篇我們一起來看看sync.Cond的使用與實現。之前寫過java的朋友對等待 / 通知 (wait/notify) 機制一定很熟悉,可以利用等待 / 通知機制實現阻塞或者喚醒,在Go語言使用Cond也可以達到同樣的效果,接下來我們一起來看看它的使用與實現。

sync.Cond的基本使用

Go標準庫提供了Cond原語,爲等待 / 通知場景下的併發問題提供支持。Cond他可以讓一組的Goroutine都在滿足特定條件 (這個等待條件有很多,可以是某個時間點或者某個變量或一組變量達到了某個閾值,還可以是某個對象的狀態滿足了特定的條件) 時被喚醒,Cond是和某個條件相關,這個條件需要一組goroutine協作共同完成,在條件還沒有滿足的時候,所有等待這個條件的goroutine都會被阻塞住,只有這一組goroutine通過協作達到了這個條件,等待的goroutine纔可以繼續進行下去。

先看這樣一個例子:

var (
 done = false
 topic = "Golang夢工廠"
)

func main() {
 cond := sync.NewCond(&sync.Mutex{})
 go Consumer(topic,cond)
 go Consumer(topic,cond)
 go Consumer(topic,cond)
 Push(topic,cond)
 time.Sleep(5 * time.Second)

}

func Consumer(topic string,cond *sync.Cond)  {
 cond.L.Lock()
 for !done{
  cond.Wait()
 }
 fmt.Println("topic is ",topic," starts Consumer")
 cond.L.Unlock()
}

func Push(topic string,cond *sync.Cond)  {
 fmt.Println(topic,"starts Push")
 cond.L.Lock()
 done = true
 cond.L.Unlock()
 fmt.Println("topic is ",topic," wakes all")
 cond.Broadcast()
}
// 運行結果
Golang夢工廠 starts Push
topic is  Golang夢工廠  wakes all
topic is  Golang夢工廠  starts Consumer
topic is  Golang夢工廠  starts Consumer
topic is  Golang夢工廠  starts Consumer

上述代碼我們運行了4Goroutine,其中三個Goroutine分別做了相同的事情,通過調用cond.Wait()等特定條件的滿足,1 個Goroutine會調用cond.Broadcast喚醒所用陷入等待的Goroutine。畫個圖看一下更清晰:

我們看上面這一段代碼,Cond使用起來並不簡單,使用不當就出現不可避免的問題,所以,有的開發者會認爲,Cond是唯一難以掌握的Go併發原語。爲了讓大家能更好的理解Cond,接下來我們一起看看Cond的實現原理。

Cond實現原理

Cond的實現還是比較簡單的,代碼量比較少,複雜的邏輯已經被Locker或者runtime的等待隊列實現了,所以我們來看這些源代碼也會輕鬆一些。首先我們來看一下它的結構體:

type Cond struct {
 noCopy noCopy

 // L is held while observing or changing the condition
 L Locker

 notify  notifyList
 checker copyChecker
}

主要有4個字段:

type notifyList struct {
 wait   uint32
 notify uint32
 lock   uintptr // key field of the mutex
 head   unsafe.Pointer
 tail   unsafe.Pointer
}

我們簡單分析一下notifyList的各個字段:

基本結構我們都知道了,下面我就來看一看Cond提供的三種方法是如何實現的~。

wait

我們先來看一下wait方法源碼部分:

func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

代碼量不多,執行步驟如下:

runtime_notifyListAdd的實現:

// See runtime/sema.go for documentation.
func notifyListAdd(l *notifyList) uint32 {
 // This may be called concurrently, for example, when called from
 // sync.Cond.Wait while holding a RWMutex in read mode.
 return atomic.Xadd(&l.wait, 1) - 1
}

代碼實現比較簡單,原子操作將等待計數器加 1,因爲wait代表的是下一個等待喚醒Goroutine的索引,所以需要減 1 操作。

runtime_notifyListWait的實現:

// See runtime/sema.go for documentation.
func notifyListWait(l *notifyList, t uint32) {
 lockWithRank(&l.lock, lockRankNotifyList)

 // Return right away if this ticket has already been notified.
 if less(t, l.notify) {
  unlock(&l.lock)
  return
 }

 // Enqueue itself.
 s := acquireSudog()
 s.g = getg()
 s.ticket = t
 s.releasetime = 0
 t0 := int64(0)
 if blockprofilerate > 0 {
  t0 = cputicks()
  s.releasetime = -1
 }
 if l.tail == nil {
  l.head = s
 } else {
  l.tail.next = s
 }
 l.tail = s
 goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
 if t0 != 0 {
  blockevent(s.releasetime-t0, 2)
 }
 releaseSudog(s)
}

這裏主要執行步驟如下:

看完源碼我們來總結一下注意事項:

wait方法會把調用者放入Cond的等待隊列中並阻塞,直到被喚醒,調用wait方法必須要持有c.L鎖。

signalBroadcast

signalBroadcast都會喚醒等待隊列,不過signal是喚醒鏈表最前面的GoroutineBoradcast會喚醒隊列中全部的Goroutine。下面我們分別來看一下signalbroadcast的源碼:

func (c *Cond) Signal() {
 c.checker.check()
 runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }
 lockWithRank(&l.lock, lockRankNotifyList)
 t := l.notify
 if t == atomic.Load(&l.wait) {
  unlock(&l.lock)
  return
 }

 atomic.Store(&l.notify, t+1)

 for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
  if s.ticket == t {
   n := s.next
   if p != nil {
    p.next = n
   } else {
    l.head = n
   }
   if n == nil {
    l.tail = p
   }
   unlock(&l.lock)
   s.next = nil
   readyWithTime(s, 4)
   return
  }
 }
 unlock(&l.lock)
}

上面我們看wait源代碼時,每次都會調用都會原子遞增wait,那麼這個wait就代表當前最大的wait值,對應喚醒的時候,也就會對應一個notify屬性,我們在notifyList鏈表中逐個檢查,找到ticket對應相等的notify屬性。這裏大家肯定會有疑惑,我們爲何不直接取鏈表頭部喚醒呢?

notifyList並不是一直有序的,wait方法中調用runtime_notifyListAddruntime_notifyListWait完全是兩個獨立的行爲,中間還有釋放鎖的行爲,而當多個 goroutine 同時進行時,中間會產生進行併發操作,這樣就會出現亂序,所以採用這種操作即使在 notifyList 亂序的情況下,也能取到最先Waitgoroutine

func (c *Cond) Broadcast() {
 c.checker.check()
 runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
 if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
  return
 }

 lockWithRank(&l.lock, lockRankNotifyList)
 s := l.head
 l.head = nil
 l.tail = nil

 atomic.Store(&l.notify, atomic.Load(&l.wait))
 unlock(&l.lock)
 for s != nil {
  next := s.next
  s.next = nil
  readyWithTime(s, 4)
  s = next
 }
}

全部喚醒實現要簡單一些,主要是通過調用readyWithTime方法喚醒鏈表中的goroutine,喚醒順序也是按照加入隊列的先後順序,先加入的會先被喚醒,而後加入的可能 Goroutine 需要等待調度器的調度。

最後我們總結一下使用這兩個方法要注意的問題:

注意事項

//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()

總結

其實Cond在實際項目中被使用的機會比較少,Go特有的channel就可以代替它,暫時只在Kubernetes項目中看到了應用,使用場景是每次往隊列中成功增加了元素後就需要調用 Broadcast 通知所有的等待者,使用Cond就很合適,相比channel減少了代碼複雜性。

好啦,這篇文章就到這裏啦,素質三連(分享、點贊、在看)都是筆者持續創作更多優質內容的動力!

創建了一個 Golang 學習交流羣,歡迎各位大佬們踊躍入羣,我們一起學習交流。入羣方式:加我 vx 拉你入羣,或者公衆號獲取入羣二維碼

結尾給大家發一個小福利吧,最近我在看 [微服務架構設計模式] 這一本書,講的很好,自己也收集了一本 PDF,有需要的小夥可以到自行下載。獲取方式:關注公衆號:[Golang 夢工廠],後臺回覆:[微服務],即可獲取。

我翻譯了一份 GIN 中文文檔,會定期進行維護,有需要的小夥伴後臺回覆 [gin] 即可下載。

翻譯了一份 Machinery 中文文檔,會定期進行維護,有需要的小夥伴們後臺回覆 [machinery] 即可獲取。

我是 asong,一名普普通通的程序猿,讓我們一起慢慢變強吧。歡迎各位的關注,我們下期見~~~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/szSxatDakPQMUA8Vm9u3qQ