Go 同步原語 — sync-Con

概述

Go 語言標準庫中還包含條件變量 sync.Cond,它可以讓一組 Goroutine 都在滿足特定條件時被喚醒。每一個sync.Cond結構體在初始化時都需要傳入一個互斥鎖,我們可以通過下面的例子瞭解它的使用方法:

var status int64

func main(){
    c := sync.NewCond(&sync.mutex{})
    for i := 0; i < 10; i++ {
        go listen(c)
    }
    time.Sleep(1 * time.Second)
    go broadcast(c)
    
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    atomic.StoreInt64(&status, 1)
    c.Broadcast()
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    for atomic.LoadInt64(&status) != 1 {
        c.Wait()
    }
    fmt.Println("listen")
    c.L.Unlock()
}

運行結果:

listen
...
listen

上述代碼同時運行了 11Goroutine,它們分別做了不同事情:

調用sync.Cond.Broadcast方法後,上述代碼會打印出 10"listen" 並結束調用。

Cond 條件廣播

結構體

sync.Cond的結構體中包含以下 4 個字段:

type Cond struct {
    noCopy   noCopy
    L        Locker
    notify   notifyList
    checker  copyChecker
}
type notifyList struct {
    wait   uint32
    notify uint32
    lock   mutex
    head   *sudog
    tail   *sudog
}

sync.notifyList結構體中,headtail分別指向鏈表的頭和尾,waitnotify分別表示當前正在等待的和已經通知的Goroutine的索引。

接口

sync.Cond對外暴露的sync.Cond.Wait方法會令當前Goroutine陷入休眠狀態,它的執行過程分成以下兩個步驟:

func (c *Cond) Wait () {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)  // runtime.notifyListAdd 的鏈接名
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)   //runtime.notifyListWait 的鏈接名
    c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

runtime.notifyListWait 會獲取當前Goroutine並將它追加到Goroutine通知鏈表的末端:

func notifyListWait(l *notifyList, t uint32) {
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

除了將當前Goroutine追加到鏈表末端外,我們還會調用runtime.goparkunlock令當前Goroutine陷入休眠。該函數也是在 Go 語言切換Goroutine時常用的方法,它會直接讓出當前處理器的使用權並等待調度器喚醒。

Cond 條件通知列表

sync.Cond.Signalsync.Cond.Broadcast方法就是用來喚醒陷入休眠的Goroutine的,它們的實現有一些細微差別:

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

runtime.notifyListNotifyOne只會從sync.notifyList鏈表中找到滿足sudog.ticket == l.notify條件的Goroutine,並通過runtime.readyWithTime將其喚醒:

func notifyListNotifyOne(l *notifyList) {
    t := l.notify
    atomic.Store(&l.notify, t + 1)
    
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.tiket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
}

runtime.notifyListNotifyAll會依次通過runtime.readyWithTime喚醒鏈表中的Goroutine

func notifyListNotifyAll(l *notifyList) {
    s := l.head
    l.head = nil
    l.tail = nil
    
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

Goroutine的喚醒順序也是按照加入隊列的先後順序,先加入的會先被喚醒,而後加入的Goroutine可能需要等待調度器的調度。

一般情況下,我們會先調用sync.Cond.Wait陷入休眠等待滿足期望條件,當滿足期望條件時,就可以選用sync.Cond.Signal或者sync.Cond.Broadcast喚醒一個或者全部Goroutine

小結

sync.Cond不是常用的同步機制,但是在條件長時間無法滿足時,與使用for {}進行忙碌等待相比,sync.Cond能夠讓出處理器的使用權,提高CPU的利用率。

使用時需要注意以下問題:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/CV9CNMDHZzYAga898BhrdA