Go Mutex 錯過後悔的重要知識點

Part1 Go Mutex 的基本用法

Mutex 我們一般只會用到它的兩個方法:

Mutex 的模型可以用下圖表示:

Go Mutex 模型. png

說明:

這幾點也是 Mutex 的基本原理。

Part2 Go Mutex 原子操作

Mutex結構體定義:

type Mutex struct {
   state int32 // 狀態字段
   sema  uint32 // 信號量
}

其中 state 字段記錄了四種不同的信息:

這四種不同信息在源碼中定義了不同的常量:

const (
   mutexLocked      = 1 << iota // 表示有 goroutine 擁有鎖
   mutexWoken                   // 喚醒(就是第 2 位)
   mutexStarving                // 飢餓(第 3 位)
   mutexWaiterShift = iota      // 表示第 4 位開始,表示等待者的數量

   starvationThresholdNs = 1e6  // 1ms 進入飢餓模式的等待時間閾值
)

而 sema 的含義比較簡單,就是一個用作不同 goroutine 同步的信號量。

go 的 Mutex 實現中,state 字段是一個 32 位的整數,不同的位記錄了四種不同信息,在這種情況下, 只需要通過原子操作就可以保證一次性實現對四種不同狀態信息的更改,而不需要更多額外的同步機制。

但是毋庸置疑,這種實現會大大降低代碼的可讀性,因爲通過一個整數來記錄不同的信息, 就意味着,需要通過各種位運算來實現對這個整數不同位的修改。

當然,這只是 Mutex 實現中最簡單的一種位運算了。下面以 state 記錄的四種不同信息爲維度來具體講解一下:

在上面做了這一系列的位運算之後,我們會得到一個新的 state 狀態,假設名爲 new,那麼我們就可以通過 CAS 操作來將 Mutex 的 state 字段更新:

atomic.CompareAndSwapInt32(&m.state, old, new)

通過上面這個原子操作,我們就可以一次性地更新 Mutex 的 state 字段,也就是一次性更新了四種狀態信息。

這種通過一個整數記錄不同狀態的寫法在 sync 包其他的一些地方也有用到,比如 WaitGroup 中的 state 字段。

最後,對於這種操作,我們需要注意的是,因爲我們在執行 CAS 前後是沒有其他什麼鎖或者其他的保護機制的, 這也就意味着上面的這個 CAS 操作是有可能會失敗的,那如果失敗了怎麼辦呢?

如果失敗了,也就意味着肯定有另外一個 goroutine 率先執行了 CAS 操作並且成功了,將 state 修改爲了一個新的值。這個時候,其實我們前面做的一系列位運算得到的結果實際上已經不對了,在這種情況下,我們需要獲取最新的 state,然後再次計算得到一個新的 state

所以我們會在源碼裏面看到 CAS 操作是寫在 for 循環裏面的。

Part3 state 的狀態及枚舉

GzvDHo

在看下面代碼之前,一定要記住這幾個狀態之間的 與運算 或運算,否則代碼裏的與運算或運算

state:   |32|31|...|3|2|1|
         __________/ | |
               |      | |
               |      | mutex的佔用狀態(1被佔用,0可用)
               |      |
               |      mutex的當前goroutine是否被喚醒
               |
               當前阻塞在mutex上的goroutine數

Part4 互斥鎖的作用

互斥鎖是保證同步的一種工具,主要體現在以下 2 個方面:

  1. 避免多個線程在同一時刻操作同一個數據塊 (sum)

  2. 可以協調多個線程,以避免它們在同一時刻執行同一個代碼塊 (sum++)

Part5 什麼時候用

  1. 需要保護一個數據或數據塊時

  2. 需要協調多個協程串行執行同一代碼塊,避免併發問題時

比如 經常遇到 A 給 B 轉賬 100 元的例子,這個時候就可以用互斥鎖來實現。

Part6 注意的坑

1. 不同 goroutine 可以 Unlock 同一個 Mutex,但是 Unlock 一個無鎖狀態的 Mutex 就會報錯。
2. 因爲 mutex 沒有記錄 goroutine_id,所以要避免在不同的協程中分別進行上鎖 / 解鎖操作,不然很容易造成死鎖。

建議:先 Lock 再 Unlock、兩者成對出現。

3. Mutex 不是可重入鎖

Mutex 不會記錄持有鎖的協程的信息,所以如果連續兩次 Lock 操作,就直接死鎖了。

如何實現可重入鎖?記錄上鎖的 goroutine 的唯一標識,在重入上鎖 / 解鎖的時候只需要增減計數。

type RecursiveMutex struct {
   sync.Mutex
   owner     int64 // 當前持有鎖的 goroutine id // 可以換成其他的唯一標識
   recursion int32 // 這個 goroutine 重入的次數
}

func (m *RecursiveMutex) Lock() {
   gid := goid.Get()  // 獲取唯一標識
   // 如果當前持有鎖的 goroutine 就是這次調用的 goroutine,說明是重入
   if atomic.LoadInt64(&m.owner) == gid {
      m.recursion++
      return
   }
   m.Mutex.Lock()

   // 獲得鎖的 goroutine 第一次調用,記錄下它的 goroutine id,調用次數加1
   atomic.StoreInt64(&m.owner, gid)
   m.recursion = 1
}

func (m *RecursiveMutex) Unlock() {
   gid := goid.Get()
   // 非持有鎖的 goroutine 嘗試釋放鎖,錯誤的使用
   if atomic.LoadInt64(&m.owner) != gid {
      panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
   }

   // 調用次數減1
   m.recursion--
   if m.recursion != 0 { // 如果這個 goroutine 還沒有完全釋放,則直接返回
      return
   }

   // 此 goroutine 最後一次調用,需要釋放鎖
   atomic.StoreInt64(&m.owner, -1)
   m.Mutex.Unlock()
}
4. 多高的 QPS 才能讓 Mutex 產生強烈的鎖競爭?

模擬一個 10ms 的接口,接口邏輯中使用全局共享的 Mutex,會發現在較低 QPS 的時候就開始產生激烈的鎖競爭(打印鎖等待時間和接口時間)。

解決方式:首先要儘量避免使用 Mutex。如果要使用 Mutex,儘量多聲明一些 Mutex,採用取模分片的方式去使用其中一個 Mutex 進行資源控制。避免一個 Mutex 對應過多的併發。

簡單總結:壓測或者流量高的時候發現系統不正常,打開 pprof 發現 goroutine 指標在飆升,並且大量 Goroutine 都阻塞在 Mutex 的 Lock 上,這種現象下基本就可以確定是鎖競爭。

5. Mutex 千萬不能被複制

因爲複製的時候會將原鎖的 state 值也進行復制。複製之後,一個新 Mutex 可能莫名處於持有鎖、喚醒或者飢餓狀態,甚至等阻塞等待數量遠遠大於 0。而原鎖 Unlock 的時候,卻不會影響複製鎖。

Part7 關於鎖的使用建議:

Part8 參考

Part9 結束語

本篇文章介紹說明了:

sync.Mutex 的基本用法

sync.Mutex 原子操作

sync.Mutex state 的

sync.Mutex 注意的坑

希望本篇文章對你有所幫助,謝謝。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/sVw5V-OGddoe9oKNsKj8kQ