深入理解 golang 的互斥鎖
在開始之前,我們需要知道鎖實現的幾種方式。
信號量 操作系統中有 P 和 V 操作。P 操作是將信號量值減去 1,V 操作是將信號量值增加 1. 因此信號量的操作模式爲:
-
初始化,給它一個非負整數值。
-
在程序試圖進入臨界區之前需要先運行 P 操作,然後會有兩種情況。
-
當信號量 S 減少到負值時,該過程將被阻止並且無法繼續。這個時候,進程會被阻塞。
-
當信號量 S 不爲負時,進程可以進入臨界區。
- 在程序離開臨界區時,需要執行 V 操作。當信號量 S 不是爲負時,之前被阻止的其他進程將允許進臨界區。
信號量和鎖 儘管信號量和鎖看起來相似,例如,當信號量爲 1 時,實現了互斥鎖,但實際上,它們具有不同的含義。
鎖用於保護臨界資源,例如讀和寫不能同時執行場景。
信號量是用於確保進程 (線程或 goroutine) 被調度。比如,三個進程共同計算 c = a+b
。首先,a+b
的計算和賦值操作不能同時執行。其次,必須確保首先執行 a+b
。c 在賦值之後執行,因此這個位置需要以信號量的形式執行。
此外,可以通過信號量實現鎖,然後 goroutine 可以根據規則阻塞和喚醒鎖;也可以通過自旋的方式實現鎖,goroutine 將持有 CPU,直到解鎖。
這兩種方式之間的區別是是否需要調度 goroutine,但是從本質上講,鎖是爲了確保不會錯誤地訪問臨界資源。
自旋鎖 CAS 理論是一種自旋鎖。
在同一時間只有一個線程獲得鎖,而沒有獲得鎖的線程通常有兩種處理方式:
-
一直循環等待,以確定資源是否釋放了鎖。這種鎖稱爲自旋鎖,它不會阻塞線程 (NON-BLOCKING)。
-
一直阻塞,等待重新調度,這種是互斥鎖。
自旋鎖的原理相對簡單。如果持有鎖的線程可以在短時間內釋放鎖定資源,那麼等待鎖的其他線程不需要在內核態和用戶態之間切換來回切換阻止狀態,他們只需要通過自旋的方式等一會,等待持有鎖的線程釋放鎖,然後獲取鎖,這種方式避免用戶進程在內核切換。
但如果長時間未釋放鎖,那麼自旋鎖的開銷會非常大,它會阻止其他線程的運行和調度。
線程持有鎖的時間越長,持有鎖的線程被 OS 調度中斷的風險就越大。
如果發生中斷,他線程將保持自旋狀態 (反覆嘗試獲取鎖),而持有鎖的線程不打算釋放鎖,這將導致無限延遲,直到持有鎖的線程完成並釋放鎖。
解決上述情況的一個好方法是爲自旋鎖定設置一個自旋時間,並在時間一到就釋放自旋鎖。
#悲觀鎖定和樂觀鎖定 悲觀鎖定是一種悲觀思維。它總是認爲最壞的情況可能會發生。它認爲這些數據很可能被其他人修改過。無論是讀還是寫,悲觀鎖都是在執行操作之前鎖定的。
樂觀鎖定的思想與悲觀鎖定的思想相反。它始終認爲資源和數據不會被別人修改,所以讀取不會被鎖定,但是樂觀鎖定會在寫操作時確定當前數據是否被修改過。
樂觀鎖定的實現方案主要包括 CAS 和版本號機制。樂觀鎖定適用於多讀場景這可以提高吞吐量。
CAS 是一個著名的基於比較和交換無鎖算法。
也就是在不使用鎖的情況下實現多個線程之間的變量同步,(即在不阻塞線程的情況下實現變量同步),所以又稱非阻塞同步。
CAS 涉及三種關係: 指向內存區域的指針 V、舊值 a 和要寫入的新值 B。
由 CAS 實現的樂觀鎖將帶來 ABA 問題。同時,整個樂觀鎖會在數據不一致的情況下觸發等待和重試機制,這對性能有很大的影響。
版本號機制通過版本的值實現版本控制。
有了以上的基礎知識,我們就可以開始分析 golang 是如何實現互斥鎖的了。
Golang 的 Mutex 實現一直在改進,到目前爲止,主要經歷了 4 個版本:
-
V1: 簡單實現的版本
-
V2: 新的 goroutine 參加鎖的競爭
-
V3: 新的 goroutines 更多參與競爭的機會
-
V4: 解決老 goroutine 飢餓的問題
每一次改進都是爲了提高系統的整體性能。這個升級是漸進的、持續的,因此有必要從 V1 版本開始慢慢地看 Mutex 的演變過程。
V1: 簡單實現的版本 在 V1 版本中,互斥的完整源代碼如下。commit[1]
核心代碼如下:
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
// The structure of the mutex, containing two fields
type Mutex struct {
key int32 // Indication of whether the lock is held
sema int32 // Semaphore dedicated to block/wake up goroutine
}
// Guaranteed to successfully increment the value of delta on val
func xadd(val *int32, delta int32) (new int32) {
for {
v := *val
if cas(val, v, v+delta) {
return v + delta
}
}
panic("unreached")
}
// request lock
func (m *Mutex) Lock() {
if xadd(&m.key, 1) == 1 { // Add 1 to the ID, if it is equal to 1, the lock is successfully acquired
return
}
semacquire(&m.sema) // Otherwise block waiting
}
func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 { // Subtract 1 from the flag, if equal to 0, there are no other waiters
return
}
semrelease(&m.sema) // Wake up other blocked goroutines
}
code here[2]
首先,互斥鎖的結構非常簡單,包括兩個字段,key
和 sema
。
key
表示有幾個 gorutines 正在使用或準備使用該鎖。如果 key==0
,表示當前互斥鎖已解鎖,否則,key>0
表示當前互斥鎖已鎖定。
sema
是實際導致 goroutine 阻塞和喚醒的信號量。
xadd
是一個基於 cas 的加減法函數。
Lock 和 Unlock 是鎖定當前互斥鎖的核心函數,但邏輯非常簡單。
Unlock
使用 xadd 方法對 key
進行 -1 操作。如果結果不是 0,則意味着 goroutine 當前正在等待,需要調用 semrelease
來喚醒 goroutine。
想象一下這樣一個場景: 被阻塞的 goroutine(命名爲 g1) 不能佔用 CPU,因此需要在 g1 被喚醒後執行上下文切換。如果此時出現一個新的 goroutine (g2),它就擁有 CPU 資源。
如果把鎖給 g2,那麼它可以立即執行並返回結果 (而不是等待 g1 的上下文切換),這樣整體效率就可以提高到更高的水平。
V2: 新的 goroutine 參加鎖競爭.
完整的源代碼地址 https://github.com/golang/go/blob/weekly.2011-07-07/src/pkg/sync/mutex.go
在 V2 版本中,核心特性是 goroutine 在被喚醒後不會立即執行任務,而是重複搶佔鎖的過程,以便新的 goroutine 有機會獲得鎖,這就是給後來者機會。
互斥鎖結構和常量的定義如下:
// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexWaiterShift = iota
)
雖然互斥鎖結構的定義基本不變,但從 V1 的 key
到 V2 的 state
,它們的內部結構卻有很大的不同。
第 0 位表示鎖定狀態 (L),即 0 表示解鎖,1 表示鎖定; 第 1 位表示喚醒狀態 (W),第 2 位到第 31 位表示阻塞等待的計數。
mutexLocked
的值是 0x1, mutexawake
的值是 0x2, mutexWaiterShift
的值是 0x2, mutexWaiterShift
表示任何表示阻塞等待數量的數組都需要左移兩位。
V2 版本的主要在於 Lock
方法的改進,代碼如下:
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
awoke := false
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime.Semacquire(&m.sema)
awoke = true
}
}
}
code here[3]
第 2 行和第 4 行中的代碼邏輯應用於解鎖狀態,goroutine 通過 CAS 將 L 位從 0 設置爲 1 獲得鎖。
此時不存在任何爭用,因此獲取鎖幾乎不需要花費任何成本。
然後,goroutine 進入一個循環,其中 CAS 方法可確保正確疊加新狀態。
-
嘗試獲得鎖。
-
嘗試將等待鎖的數量增加 1。
由於更改不是原子的,它可能會導致舊 state
的值變得無效。
同樣,即使當前狀態被鎖定,由於等待者的數量,舊狀態也將過時。
因此,您需要繼續通過循環獲得新的舊狀態。
在舊狀態未過時並覆蓋新狀態後,進入真正的鎖定步驟。
如果舊狀態未解鎖,則直接獲得鎖,否則,通過信號量機制阻塞當前 goroutine。
與 V1 不同的是,當前進程在被喚醒後,仍然會陷入 for 循環再次獲取鎖,這主要是給新的 goroutine 機會
-
如果舊的 goroutine 在上下文切換期間有一個新的 goroutine,鎖將被賦予新的 goroutine。
-
如果在完成上下文切換後,舊的 goroutine 仍然沒有新的 goroutine,那麼鎖將被交給舊的 goroutine。
Unlock 方法相對簡單。代碼如下:
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime.Semrelease(&m.sema)
return
}
old = m.state
}
}
code here[4]
有兩個主要的解鎖邏輯:
-
如果沒有等待鎖的 goroutine,或者當前系統處於解鎖狀態,並且有喚醒程序,則直接返回。
-
如果不滿足上述要求,則等待的數量將減少 1,隊列頭部的 goroutine 將通過信號量被喚醒。
V3:給新的 goroutines 更多機會 這個版本的優化,關鍵提交是 https://github.com/golang/go/commit/edcad8639a902741dc49f77d000ed62b0cc6956f
在 V2 的基礎上,如何進一步提高性能? 在大多數情況下,協程互斥鎖定期間的數據操作的耗時是非常低的,這可能比喚醒 + 切換上下文的耗時更低。
想象一個場景: GoroutineA 擁有 CPU 資源,GoroutineB 位於阻塞隊列的頭部。然後,當 GoroutineA 試圖獲取鎖時,它發現當前鎖已被佔用。根據 V2 策略,GoroutineA 立即被阻塞,假設鎖此時處於阻塞狀態。如果釋放,則 GoroutineB 將按計劃被喚醒,即整個運行時間包括 GoroutineB 的喚醒 + 上下文切換時間。
在 V3 版本的實現中,新的 Goroutine (GoroutineA) 被允許通過旋轉等待一段時間。如果在等待時間內釋放鎖,新的 Goroutine 立即獲取鎖資源,避免了舊 Goroutine 喚醒 + 上下文切換的耗時,提高了整體工作效率。
同樣,V3 的改進主要集中在 Lock
方法上,代碼如下:
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if raceenabled {
raceAcquire(unsafe.Pointer(m))
}
return
}
awoke := false
iter := 0
for {
old := m.state
new := old | mutexLocked
if old&mutexLocked != 0 {
if runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
continue
}
new = old + 1<<mutexWaiterShift
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
panic("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 {
break
}
runtime_Semacquire(&m.sema)
awoke = true
iter = 0
}
}
if raceenabled {
raceAcquire(unsafe.Pointer(m))
}
}
code here[5]
與 V2 的實現代碼相比,它主要集中在第 14-25 行,多了兩個自旋鎖的核心函數 runtime_canSpin
和 runtime_doSpin
。
第一個是 runtime_canSpin
函數。傳入參數是迭代器(代表當前旋轉的數量)。runtime_canSpin
函數的實現功能是確定是否可以輸入當前的旋轉等待狀態。
如前所述,旋轉鎖在等待時不會釋放 CPU 資源,也不會消耗上下文切換,但如果旋轉時間過長,則會導致無意義的 CPU 消耗,這將進一步影響性能。
因此,在使用自旋鎖時,必須嚴格控制自旋鎖的進入過程。
最後一個是 runtime_doSpin
函數,可以簡單地理解爲 CPU 空閒一段時間,這就是自旋過程。
整個過程非常清晰。所有持有 CPU 的 goroutine 在 runtime_canSpin
函數通過檢查後執行自旋操作。如果在旋轉操作完成後,它們仍然沒有持有鎖,則 Goroutine 將被阻塞。其他邏輯與 V2 相同。
V4:解決老 Goroutine 的飢餓問題。
源地址下: https://github.com/golang/go/blob/go1.15.5/src/sync/mutex.go
從 V2 到 V3 的改進是爲新的 goroutines 提供更多的機會,這導致了老的 goroutine 飢餓問題,即新的獲取鎖的機會不會讓給老的 goroutines,因此這個問題主要集中在 V4 中改進。
首先是互斥鎖結構的 State 字段有了新的變化,增加了飢餓指示器 (S)。
0 表示沒有發生飢餓,1 表示發生了飢餓。(圖片中的 S 標誌位)
在新的定義中,常數的定義也發生了一些變化:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving // separate out a starvation token from the state field
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
Lock 方法的邏輯如下所示:
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
code here[6]
我們主要關注 lockSlow 方法。在深入研究代碼之前,我們首先需要理解在什麼情況下當前鎖被認爲是飢餓的:
-
(場景 1) 舊的 Goroutine 被喚醒,但鎖被新的 Goroutine 佔據,對於舊的 Goroutine,我被喚醒了,但什麼也沒做,並立即再次被阻塞。
-
(場景 2)Goroutine 被阻塞的總時間超過閾值 (默認爲 1ms)。
所以核心是記錄當前 Goroutine 開始等待的時間: 對於第一次進入鎖的 Goroutine,開始等待時間爲 0。對於場景 1,判斷標準爲開始等待時間是否爲 0。如果它不是 0,就意味着它以前被阻塞過。通過 (第 45 行)。
對於場景 2,判斷標準爲當前時間與開始等待時間的差值是否超過閾值。如果是這樣,這意味着 Goroutine 已經等待太久,應該進入飢餓狀態 (第 50 行)。
進一步,當我們知道如何判斷飢餓狀態時,飢餓模式和非飢餓模式的區別是什麼
首先,如果當前鎖被耗盡,任何新的 goroutine 都不會旋轉 (第 15 行)。
其次,如果當前 Goroutine 處於飢餓狀態,那麼當它被阻塞時,它將被添加到等待隊列的頭部 (下一個喚醒操作肯定會喚醒當前處於飢餓狀態的 Goroutine,第 45 行和第 49 行)。
最後,允許飢餓的 goroutine 在被喚醒後立即持有鎖,而不必與其他 goroutine 重新競爭鎖 (比較 V2,第 52 行和 62 行)。
V4 對應的 Unlock 也根據飢餓狀態進行了調整。代碼如下:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}
code here[7]
與上鎖操作相比,解鎖操作更容易理解。
-END-
相關鏈接:
[1]https://github.com/golang/go/blob/d90e7cbac65c5792ce312ee82fbe03a5dfc98c6f/src/pkg/sync/mutex.go
[2] https://gist.github.com/tengergou/3851ef5ae21e4530272618ec096990ee#file-20220921-1-go
[3] https://gist.github.com/tengergou/77d6f4ab3acf0a951114fe045e1429a2#file-20220921-3-go
[4] https://gist.github.com/tengergou/53d9b51646b23369ae9e030631aca9b5#file-20220921-4-go
[5] https://gist.github.com/tengergou/13e1eff9a75821c5441cab49345e7c5d#file-20220921-5-go
[6] https://gist.github.com/tengergou/a10a675929d7ae203816c65cbf564672#file-20220921-6-go
[7]https://gist.github.com/tengergou/0ff186760513855ace82c56a68c6bc58#file-20220921-7-go
原文地址:
https://levelup.gitconnected.com/deep-understanding-of-golang-mutex-9964b02c56e9
原文作者:
Dwen
本文永久鏈接:
https:/github.com/gocn/translator/blob/master/2022/w47_Deep_Understanding_of_Golang_Mutex.md
譯者:小超人
校對:zxmfke
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/i1N9bmVSW1lGfOezvhcD7g