Go 互斥鎖源碼解析
我們在進行併發編程的時候,往往離不開多個併發操作修改同一份資源,當然,離不開鎖這個服務。
鎖是在執行併發操作時用於強行限制資源訪問的同步機制,即用於在併發控制中保證對互斥要求的滿足。
Go 語言在 Mutex 包中提供了互斥鎖,sync.Mutex
Go 語言的 sync.Mutex 由兩個字段組成
type Mutex struct {
state int32
sema int32
}
其中 Mutex.state 表示了當前互斥鎖處於的狀態
waiterNum 表示目前互斥鎖等待隊列中有多少 goroutine 在等待
straving 表示目前互斥鎖是否處於飢餓狀態
woken 表示目前互斥鎖是否爲喚醒狀態
locked 表示目前互斥鎖資源是否被 goroutine 持有
而 Mutex.sema 主要用於等待隊列
互斥鎖的狀態
互斥鎖通常保持兩種狀態 正常模式 與 飢餓模式
引入飢餓模式的原因是,爲了保持互斥鎖的公平性。
在正常模式下,鎖資源一般會交給剛被喚醒的 goroutine,而爲了怕部分 goroutine 被 “餓死”,所以引入了飢餓模式,在飢餓模式下,goroutine 在釋放鎖資源的時候會將鎖資源交給等待隊列中的下一個 goroutine。
加鎖邏輯
當我們在執行 Mutex.Lock(),會執行以下邏輯
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
首先使用 CAS 方法判斷是否可以直接獲取鎖資源
CAS 指的是 CompareAndSwap,也就是如果可以獲得鎖資源,則修改 Mutex.state 中的 locked 位,併成功獲取,如果獲取不到,則執行 lowSlow() 方法
比較並交換 (compare and swap, CAS),是原子操作的一種,可用於在多線程編程中實現不被打斷的數據交換操作,從而避免多線程同時改寫某一數據時由於執行順序不確定性以及中斷的不可預知性產生的數據不一致問題。該操作通過將內存中的值與指定數據進行比較,當數值一樣時將內存中的數據替換爲新的值。
lockSlow 方法
func (m *MyLock)lockSlow() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked){
// 通過CAS方式,如果鎖還沒被獲取,則直接加上鎖即可
return
}
var waitStartTime int3
starving := false
awoke := false
iter := 0
old := m.state
for {
// 嘗試獲取鎖資源,暫時先註釋掉
}
}
首先 lowSlow 會嘗試使用 CAS 獲取鎖資源,如果獲取不到,初始化當前 goroutine 需要的變量,執行 for 循環嘗試獲取鎖資源
func (m *MyLock)Lock() {
/*
通過CAS獲取鎖資源,獲取不到則初始化當前goroutine所需要的變量
*/
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && sync_runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old >> mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
/*
處理自旋邏輯結束後的邏輯
*/
}
}
for 循環的第一段代碼主要是,嘗試通過自旋方式獲取鎖資源,自旋可以避免 goroutine 切換,但是消耗的資源更多,當 goroutine 進行自旋的時候,實際上是調用 sync_runtime_doSpin 方法,該方法會在 CPU 上執行若干次 PAUSE 指令,也就是什麼都不做,sleep 一小會兒,但是會佔用 CPU 資源。
所以 goroutine 進入自旋的條件非常苛刻,通常需要滿足以下三個條件
-
互斥鎖只有在普通模式才能進行自旋
-
sync_runtime_canSpin(iter) 返回 true
自旋次數(iter)小於 4
ncpu > 1 也就是 CPU 核數大於 1
當前機器上有一個運行的 P 隊列,且 GOMAXPROS(可以用的處理器)大於 1
更新持有鎖狀態副本的值:
func (m *MyLock)Lock() {
/*
通過CAS獲取鎖資源,獲取不到則初始化當前goroutine所需要的變量
*/
for {
/*
處理自旋邏輯
*/
new := old
if old & mutexStarving == 0 {
// 如果不是飢餓狀態,則嘗試更新鎖的狀態到new上
new = new | mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 如果鎖處於飢餓狀態或被其他goroutine持有
// 等待隊列+1
new += 1 << mutexWaiterShift
}
if starving && old&mutexLocked != 0 {
// 如果當前鎖爲飢餓狀態,並且鎖資源被其他goroutine持有
// 更新當前鎖狀態副本的飢餓狀態的值
new = new | mutexStarving
}
if awoke {
if new & mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
// 如果本goroutine爲喚醒狀態,清除掉副本內的環星表金
// 因爲在這種情況下,本goroutine要麼獲得了鎖,要麼進入休眠
new &^= mutexWoken
}
/*
執行接下來的獲取鎖資源
*/
}
}
當我們更新了互斥鎖狀態後,執行以下邏輯
func (m *MyLock)Lock() {
/*
通過CAS獲取鎖資源,獲取不到則初始化當前goroutine所需要的變量
*/
for {
/*
處理自旋邏輯
*/
/*
更新互斥鎖狀態
*/
// 通過CAS設置new的值
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 當 鎖 沒有處在飢餓以及狀態下,可以視作成功獲取了鎖
if old&(mutexStarving|mutexLocked) == 0 {
break
}
// 判斷是否處於等待狀態
queueLifo := waitStartTime != 0
// 獲取等待開始的時間
if waitStartTime == 0 {
// runtime_nanotime 實際上是一個系統調用,獲取當前時間
waitStartTime = runtime_nanotime()
}
/*
如果我們沒有通過 CAS 獲得鎖,
會調用 sync.runtime_SemacquireMutex
使用信號量保證資源不會被兩個 Goroutine 獲取。
sync.runtime_SemacquireMutex
會在方法中不斷調用嘗試獲取鎖並休眠當前 Goroutine 等待信號量的釋放,
一旦當前 Goroutine 可以獲取信號量,它就會立刻返回,
*/
runtime_SemaacquireMutex(&m.sema, queueLifo)
// 當等待時間超過1ms時候,變爲飢餓狀態
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 如果爲飢餓狀態的話
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
panic("sync: inconsistent mutex state")
}
// delta的實際值爲 等待的位數
delta := int32(mutexLocked - 1 << mutexWaiterShift)
if !starving || old >> mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 對m.state中的等待計進行ADD的原子操作
atomic.AddInt32(&m.state, delta)
break
}
// 喚醒次數爲true, 自旋次數重置
awoke = true
iter = 0
} else {
// 如果CAS不成功,則獲取新的state狀態
old = m.state
}
break
}
}
}
我們首先嚐試嘗試使用 CAS 設置目前 new 的值。
如果沒有成功設置則代表有新的 goroutine 更新了當前的鎖資源,我們需要更新當前鎖狀態,重新進行 for 循環嘗試獲取鎖。
如果當前鎖不處於飢餓狀態以及沒有被別的 goroutine 獲取,則視爲拿到鎖資源
判斷等待實際以及更新等待時間,調用 runtime_SemaacquireMutex 使用信號量使資源不會被兩個 goroutine 同時獲取,而當有別的 goroutine 釋放了鎖資源,則第一時間會將信號量返回給該 goroutine,立即獲得鎖資源。
當等待時間超過 1ms 得時候,更新飢餓狀態。
如果鎖處於飢餓狀態,且當前 goroutine 不屬於飢餓狀態,或鎖位未處於飢餓狀態,則退出飢餓模式
如果鎖不處於飢餓狀態,喚醒該 goroutine 然後將自旋次數重置。
整個加鎖過程結束。
解鎖邏輯
解鎖邏輯相較於加鎖邏輯就沒有那麼複雜了。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
解鎖邏輯實際上就是判斷
如果鎖不處於飢餓狀態,且不屬於喚醒狀態,則直接釋放鎖資源
否則執行 unlockSlow 函數
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
runtime_Semrelease(&m.sema, true, 1)
}
}
實際上相較於加鎖邏輯的 lockslow, 該 unlockSlow 也很簡單
-
如果直接解鎖一個沒有被鎖定的鎖,拋出一場
-
判斷鎖是否爲飢餓狀態
如果鎖不爲飢餓狀態,且鎖不爲(鎖住、喚醒、飢餓)狀態的任一,直接解鎖
如果爲上面三種情況的一種,需要喚醒在等待隊列中的 goroutine
如果鎖處於飢餓狀態,直接喚醒等待隊列中的 goroutine.
總結
其實整個加鎖邏輯總結下來只要明白兩點
一、 當前鎖處於什麼狀態
二、 當前進行到哪一步了
即可很輕鬆的理解並掌握整個互斥鎖的邏輯,解鎖更爲簡單
轉自:
zhuanlan.zhihu.com/p/152214855
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/mV1jG7NdKCiG7H-aBke8PQ