Go 併發編程之 Mutex

我們比較常見的大型項目的設計中都會出現併發訪問問題,併發就是爲了解決數據的準確性,保證同一個臨界區的數據只能被一個線程進行操作,日常中使用到的併發場景也是很多的:

上面都是併發帶來的數據準確性的問題,決絕方案就是使用互斥鎖,也就是今天併發編程中的所要描述的 Mutex 併發原語。

實現機制

互斥鎖 Mutex 就是爲了避免併發競爭建立的併發控制機制,其中有個 “臨界區” 的概念。

在併發編程過程中,如果程序中一部分資源或者變量會被併發訪問或者修改,爲了避免併發訪問導致數據的不準確,這部分程序需要率先被保護起來,之後操作,操作結束後去除保護,這部分被保護的程序就叫做臨界區

使用互斥鎖,限定臨界區只能同時由一個線程持有,若是臨界區此時被一個線程持有,那麼其他線程想進入到這個臨界區的時候,就會失敗或者等待釋放鎖,持有此臨界區的線程退出,其他線程纔有機會獲得這個臨界區。

go mutex 臨界區示意圖

Mutex 是 Go 語言中使用最廣泛的同步原語,也稱爲併發原語,解決的是併發讀寫共享資源,避免出現數據競爭 data race 問題

基本使用

互斥鎖 Mutex 提供了兩個方法 Lock 和 Unlock:進入到臨界區使用 Lock 方法加鎖,退出臨界區使用 Unlock 方法釋放鎖 🔒。

type Locker interface {
    Lock()
    Unlock()
}

func(m *Mutex)Lock()
func(m *Mutex)Unlock()

當一個 goroutine 調用 Lock 方法獲取到鎖後,其他 goroutine 會阻塞在 Lock 的調用上,直到當前獲取到鎖的 goroutine 釋放鎖。

接下來是一個計數器的例子,是由 100 個 goroutine 對計數器進行累加操作,最後輸出結果:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Mutex
    countNum := 0

    // 確認輔助變量是否都執行完成
    var wg sync.WaitGroup

    // wg 添加數目要和 創建的協程數量保持一致
    wg.Add(100)
    for i := 0; i < 100; i++ {
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                mu.Lock()
                countNum++
                mu.Unlock()
            }
        }()
    }
    wg.Wait()
    fmt.Printf("countNum: %d", countNum)
}

實際使用

很多時候 Mutex 並不是單獨使用的,而是嵌套在 Struct 中使用,作爲結構體的一部分,如果嵌入的 struct 有多個字段,我們一般會把 Mutex 放在要控制的字段上面,然後使用空格把字段分隔開來。

甚至可以把獲取鎖、釋放鎖、計數加一的邏輯封裝成一個方法。

package main
import (
    "fmt"
    "sync"
)

// 線程安全的計數器
type Counter struct {
    CounterType int
    Name        string

    mu    sync.Mutex
    count uint64
}

// 加一方法
func (c *Counter) Incr() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

// 取數值方法 線程也需要受保護
func (c *Counter) Count() uint64 {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    // 定義一個計數器
    var counter Counter

    var wg sync.WaitGroup
    wg.Add(100)

    for i := 0; i < 100; i++ {
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                counter.Incr()
            }
        }()
    }
    wg.Wait()

    fmt.Printf("%dn", counter.Count())
}

思考問題

Q:你已經知道,如果 Mutex 已經被一個 goroutine 獲取了鎖,其它等待中的 goroutine 們只能一直等待。那麼,等這個鎖釋放後,等待中的 goroutine 中哪一個會優先獲取 Mutex 呢?

A:FIFO,先來先服務的策略,Go 的 goroutine 調度中,會維護一個保障 goroutine 運行的隊列,當獲取到鎖的 goroutine 執行完臨界區的操作的時候,就會釋放鎖,在隊列中排在第一位置的 goroutine 會拿到鎖進行臨界區的操作。

實現原理

Mutex 的架構演進目前分爲四個階段:

Mutex 演化過程

初版 Mutex

// 互斥鎖的結構,包含兩個字段
type Mutex struct {
    key  int32 // 鎖是否被持有的標識
    sema int32 // 信號量專用,用以阻塞/喚醒goroutine
}

Unlock 方法可以被任意的 goroutine 調用釋放鎖,即使是沒持有這個互斥鎖的 goroutine,也可以進行這個操作。這是因爲,Mutex 本身並沒有包含持有這把鎖的 goroutine 的信息,所以,Unlock 也不會對此進行檢查。Mutex 的這個設計一直保持至今。

在使用 Mutex 的時候,需要嚴格遵循 “誰申請,誰釋放” 原則。

解決飢餓

由於使用了給新人機會,會出現每次都會被新來的 goroutine 獲取到鎖,導致等待的 goroutine 一直獲取不到鎖,造成飢餓問題。

type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving // 從state字段中分出一個飢餓標記
    mutexWaiterShift = iota

    starvationThresholdNs = 1e6
)

func (m *Mutex) Lock() {
    // Fast path: 幸運之路,一下就獲取到了鎖
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // Slow path:緩慢之路,嘗試自旋競爭或飢餓狀態下飢餓goroutine競爭
    m.lockSlow()
}

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false // 此goroutine的飢餓標記
    awoke := false // 喚醒標記
    iter := 0 // 自旋次數
    old := m.state // 當前的鎖的狀態
    for {
        // 鎖是非飢餓狀態,鎖還沒被釋放,嘗試自旋
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state // 再次獲取鎖的狀態,之後會檢查是否鎖被釋放了
            continue
        }
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked // 非飢餓狀態,加鎖
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift // waiter數量加1
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving // 設置飢餓狀態
        }
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken // 新狀態清除喚醒標記
        }
        // 成功設置新狀態
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 原來鎖的狀態已釋放,並且不是飢餓狀態,正常請求到了鎖,返回
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // 處理飢餓狀態

            // 如果以前就在隊列裏面,加入到隊列頭
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 阻塞等待
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 喚醒之後檢查鎖是否應該處於飢餓狀態
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果鎖已經處於飢餓狀態,直接搶到鎖,返回
            if old&mutexStarving != 0 {
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 有點繞,加鎖並且將waiter數減1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving // 最後一個waiter或者已經不飢餓了,清除飢餓標記
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 1)
    }
}

思考問題

Q:目前 Mutex 的 state 字段有幾個意義,這幾個意義分別是由哪些字段表示的?

A:state 字段一共有四個子字段,前三個 bit 是 mutexLocked(鎖標記)、mutexWoken(喚醒標記)、mutexStarving(飢餓標記),剩餘 bit 標示 mutexWaiter(等待數量)。

Q:等待一個 Mutex 的 goroutine 數最大是多少?是否能滿足現實的需求?

目前的設計來看取決於 state 的類型,目前是 int32,由於 3 個字節代表了狀態,還有:2^(32 – 3) – 1 等於 536870911,一個 goroutine 初始化的爲 2kb,約等於 1024 GB 即 1TB,目前內存體量那麼大的服務還是少有的,可以滿足現在的使用。

常見錯誤的四種場景

Lock/Unlock 不是成對出現、Copy 已使用的 Mutex、重入和死鎖。

轉自:

debuginn.cn/6670.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/6jXKkW5anz_I21KSs-cp8g