聽說 Mutex 源碼是出名的不好看,我不信,來試一下
如何實現 Mutex
MVP 方案
Mutex 需要兩個變量:key 表示鎖的使用情況,value 爲 0 表示鎖未被持有,1 表示鎖被持有 且 沒有等待者,n 表示鎖被持有,有 n-1 個等待者;sema 表示等待隊列的信號量,sema 是個先進先出的隊列,用來阻塞、喚醒協程。
type Mutex struct {
key int32
sema int32
}
對 Mutex 加鎖本質是用 CAS 操作設置 key+1,如果鎖未被持有,則協程持有鎖;如果鎖被持有,則協程排隊等待;
對 Mutex 解鎖本質是用 CAS 操作設置 key-1,如果 key 爲 0,代表沒有等待者,直接返回;如果 key 大於 0,代表有協程被阻塞,則按照 FIFO 的順序喚醒協程。
把加鎖、解鎖過程看成排隊上廁所,可參考下圖,加鎖:
解鎖:
這個 mvp 版本只是單純的能用,離好用還差很遠。比如如果 CPU 上有協程來搶鎖,但是隻能嚴格按照 FIFO 順序排隊的話,這樣協程併發量不高。因爲喚醒協程需要做上下文切換,而此時 CPU 上正運行着其他協程,理論上如果處於 CPU 時間片上的協程搶到鎖的話會有更好的性能。
解決調度公平性
於是 Go 團隊更改了調度順序,CPU 上的協程也有機會搶鎖。
這就好比排隊上廁所一樣,CPU 上的人離廁所門只有 1m 的距離,而被喚醒的人離廁所門可能有 10m 的距離,從全局最優的角度考慮,離門近的人進入廁所可以有更高的吞吐。
具體是把 Mutex 結構體中key
由兩種含義拓然成了三種含義,並改名爲 state:
type Mutex struct {
state int32
sema uint32
}
這次改進把 state 中的第一位作爲 MutexLocked 專屬位,表示這個 Mutex 是否已經被協程持有。
第二位作爲 MutexWoken 專屬位,表示 Mutex 是否處於喚醒狀態,喚醒操作是一個比較耗性能的操作,由於每次搶鎖只會有一個協程獲取鎖,理論上不需要喚醒太多協程,當前執行喚醒操作時只會喚醒一個協程,且如果已有喚醒的協程就不會再執行喚醒操作了。
剩餘位爲 MutexWaiterShift 的位置,用來表示等待者的數量。
這個版本在加鎖時,CPU 上正在執行操作的協程可以和被喚醒的協程同時搶鎖,而不用嚴格執行先進先出。解鎖時 Mutex 要去除 MutexLocked 標記,並根據是否有等待者或是否有已經被喚醒的協程,去決定是否去喚醒協程。
然而這個版本還有一些問題,協程在搶鎖時只會執行一次,如果 Mutex 沒有被釋放,這個新來的協程可能就被阻塞排隊去了。
加入 spin 機制
這就好比上廁所一樣,你排了好久隊,好不容易排到了第一位可以去搶廁所了,由於廁所被鎖了,不得不重新排隊,是不是有點蛋疼。
此時如果你在廁所門口轉幾圈,理論上如果廁所裏面的人方便的比較快,你轉圈的時候它正好出來了,那你就可以進去了,這就是所謂的 spin 機制。
於是 Go 團隊繼續優化 Mutex,這次爲 Mutex 加入了自旋機制,協程檢測鎖是否被釋放時,如果鎖沒釋放會旋轉 (底層是執行若干次 PAUSE 指令),以期望在這個過程中鎖被釋放。自旋不是無限制的,協程旋轉幾圈後,如果還沒搶到鎖,那它只能乖乖去隊列尾部排隊了。
那麼到這裏 Mutex 是否完美了呢,其實這裏還有一個比較大的問題:如果你是第一次來搶 Mutex,搶不到讓你去隊尾排隊,這個看起來合乎常理,但是如果你已經排了半天隊,好不容易排到隊頭可以搶廁所了,卻被 CPU 上的協程搶走了鎖,還讓你去隊尾排隊有點殘忍,有可能造成你次次搶不到廁所,然後反覆到隊尾排隊,最後活活憋死了。
解決飢餓問題
於是 Go 團隊接着去解決這個飢餓問題。他們爲 Mutex 又新增了一個狀態:mutexStarving
,這個狀態表示 Mutex 是否處於飢餓狀態。如果一個協程等待時長超過 1ms,那 Mutex 會進入飢餓狀態。在飢餓狀態下,鎖的執行權會由解鎖的協程直接交給隊列頭部的協程, 各個協程不再搶鎖,也不再自旋,而是嚴格按照 FIFO 的順序執行。這次改進中,還修復了被喚醒的協程需重新去隊尾排隊的問題:如果協程被喚醒後搶鎖失敗,會被放到隊列頭部等待喚醒。
這就是目前 Mutex 的實現現狀~
協程的狀態
前面介紹了 Mutex 的 3 種狀態和一種計數,協程內部也有幾個狀態,這幾個狀態會影響 Mutex 的狀態,讓我們分別來看一下:
iter
iter 用於記錄協程的自旋次數,在 go1.15 版本時,超過 4 次就不再自旋了。
awoke
awoke 表示協程是否喚醒。協程在自旋時,相當於 CPU 上已經有在等鎖的協程。爲避免 Mutex 解鎖時再喚醒其他協程,自旋時要嘗試把 Mutex 置爲喚醒狀態,Mutex 處於喚醒狀態後 要把本協程的 awoke 也置爲 true。
在搶鎖前如果協程處於喚醒狀態,那就需要把 Mutex 的 mutexWoken 狀態清空,以便於 Mutex 在解鎖時做喚醒操作。
starving
starving 表示協程的等待時間,如果等待時長超過 1ms,starving 置爲 true,後續操作會把 Mutex 也標記爲飢餓狀態。
Mutex 的狀態內部影響
前面介紹了 Mutex 的 3 種狀態和一種計數,也介紹了協程內部的幾個狀態,下面來看看搶鎖、解鎖時,這些狀態對一些操作的具體影響:
自旋操作
自旋操作是 協程發現鎖被佔用時等鎖釋放的方式。若鎖處於飢餓狀態,協程不再自旋而是直接去排隊;自旋操作能否執行和 協程的旋轉次數 iter 以及一些機器的 CPU 信息相關。
源碼中在執行自旋操作時要判斷 Mutex 是否處於飢餓或者加鎖狀態:如果鎖已經釋放,或者 Mutex 處於飢餓狀態,就沒必要執行自旋操作了。
加鎖操作
協程搶鎖操作是在 Mutex 非飢餓情況下進行的,如果 Mutex 處於飢餓狀態,協程會直接排隊。
等待者操作
增加等待者:若協程自旋操作完成後,Mutex 仍然處於加鎖或者飢餓狀態,那新來的協程只好乖乖去排隊了,此時等待者數量加 1。
減少等待者:解鎖時要喚醒協程,此時需要把 MutexWoken 標記爲 1 表示已有喚醒協程,並減少一個等待者。
若 Mutex 處於飢餓狀態時,調度協程時需要把 Mutex 標記爲加鎖,並減少一個等待者。
飢餓操作
什麼時候會飢餓:
Mutex 的 mutexStarving 標記是由協程的 starving 狀態計算的,如果一個協程被喚醒後發現距離第一次排隊時已經超過了 1ms, 下次 for 循環在嘗試加鎖時會把 Mutex 標記爲飢餓狀態。
什麼時候會解除飢餓:
Mutex 處於飢餓狀態後,如果發現等待隊列中只有一個協程,或者這個協程等待時長小於 1ms,就需要把 Mutex 轉換爲正常模式。
兩個先決函數
協程的阻塞操作是調用 runtime_SemacquireMutex 函數執行的,喚醒操作是調用 runtime_Semrelease 進行的。本篇文章暫時不講調度的具體細節。
Mutex 源碼解析
到這裏鋪墊的差不多了,可以正式看源碼了。
lock 和 unlock 方法爲了內聯操作把簡單狀態的邏輯單獨提出來了。大部頭的邏輯在 lockslow 和 unlockslow 裏面。
這裏有兩張圖表示爲啥提出來把大部頭邏輯提出來的好處:
lock 代碼:
func (m *Mutex) Lock() {
// Fast path
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// slow path
m.lockSlow()
}
lock 函數 fast path 解決的情況:
unlock 代碼:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Slow path
m.unlockSlow(new)
}
}
unlock 函數 fast path 解決的情況:
下面來看一下 lockslow 的源碼
我把這部分源碼分成三部分,整體執行如圖:
第一部分是自旋部分,第二部分是爲搶鎖的 CAS 打鋪墊,給 Mutex 的各種狀態賦值,第三部分是通過 cas 操作給 Mutex 賦值,並根據 CAS 是否成功做一些相關邏輯處理。
來看每個部分內部的一些細節:
第一部分細節
第二部分細節
第三部分細節
接着讓我們通過幾個例子感受一下流程:
Mutex:0001, 新來協程
Mutex:1001, 等待者被喚醒
Mutex:3001, 等待者被喚醒
下面來看一下 unlockslow 的源碼
unlock
unlock 的邏輯比較比較簡單,核心就是喚醒等待者,是否喚醒等待者是根據是否存在等待者決定的,根據 Mutex 是否處於飢餓狀態決定喚醒那個等待者。
Mutex:2101 飢餓時喚醒等待者
Mutex 源碼註釋
下面是我閱讀 go1.15 中src/sync/mutex.go
源碼時寫的一些註釋,對源碼感興趣的同學可以對照着看看,有啥問題歡迎找我交流:
type Mutex struct {
state int32
sema uint32
}
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
// 加鎖時,會有多個協程會同時來搶Mutex,CAS操作可以保證原子性,如果一個協程修改了Mutex狀態後還搶鎖失敗,
// 需要重新去搶鎖,所以這裏用大for循環把邏輯包起來。
for {
// 這個if循環主要判斷waiter是否可以自旋,自旋條件不滿足時,會執行下段代碼
// old&(mutexLocked|mutexStarving) == mutexLocked 這段代碼是看old是否處於飢餓狀態,飢餓狀態沒必要自旋
// 鎖釋放了,或者處於飢餓狀態,或者自旋次數夠了,就不再自旋了
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 設置 mutexWoken 標記,這裏貌似和下面Unlock的代碼有聯動
// 這裏是爲了告知持有鎖的goroutine在釋放鎖時不需要喚醒其他goroutine了,已經有goroutine在等待,以免喚醒太多的等待協程
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 下面四個if判斷分別用於對3種狀態和一個數量進行賦值,都是爲了給new賦值
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 新狀態只有在非飢餓的條件下才可以加鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果old已經處於加鎖或者飢餓狀態,則等待者按照FIFO的順序排隊
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果符合飢餓條件 且鎖還沒有被釋放,則將其設置爲飢餓狀態
// 如果鎖已經釋放了,那就去搶一次鎖。如果進入飢餓模式,那就乖乖去排隊了
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 這裏用來消除 awoke 標記
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// cas成功
// 這裏用於清除Woken標記,因爲後面 goroutine 只會阻塞或者搶鎖成功
// 釋放喚醒標識,當前goroutine都不再是喚醒狀態了,以便其他goroutine進來
// cas失敗
// 丟人,所以需要釋放之前搶到的 mutexWoken 標識
new &^= mutexWoken
}
// 下面這裏嘗試給通過CAS操作把old變成new
// 這裏無非就是加鎖成功,或者去排隊,排隊的話需要看排隊到頭部還是尾部
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 這裏還需要看一下
// 如果原來的old未加鎖,且Mutex不處於飢餓狀態,那goroutine獲取到鎖之後就可以直接退出了
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// queueLifo 爲true,表示其是被喚醒的,排隊時排到頭部
// queueLifo 爲false,表示是第一次排隊,排隊時排到隊列尾部
queueLifo := waitStartTime != 0
// 第一次排隊要記錄排隊時間
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 這裏會阻塞
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 後續代碼是隊列裏的goroutine被runtime_Semrelease喚醒後,從讓出的地方繼續執行
// 如果這個協程處於飢餓狀態 或 等待時間大於1ms,則設置其爲飢餓狀態
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果Mutex處於飢餓狀態
if old&mutexStarving != 0 {
// 這裏是一些異常狀態
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 把Mutex設置爲加鎖,把等待隊列的waiter-1,
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 根據協程的狀態看看是否需要切換至正常模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// goroutine處於正常模式,標記喚醒標識,然後重新自旋去搶鎖
// 執行到這裏,這個goroutine是被喚醒的,需要把awoke標記 標識爲true
awoke = true
iter = 0
} else {
// mutex被其他goroutine用了,繼續回去自旋吧
// 賦值失敗時還原狀態
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 說明state其他位不爲0, 那就直接進入slow path吧
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// mutex處於正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果等待隊列爲空 或者 已經有其他goroutine被喚醒 or 獲得了鎖 or 鎖處於飢餓模式
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
// 不需要喚醒goroutine,直接返回即可
return
}
// 搶佔Woken標示位,獲取喚醒一個goroutine的權利
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 調用這裏喚醒一個goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 搶佔不成功就歸位,然後循環去吧
old = m.state
}
} else {
// 飢餓模式下,後來的goroutine不會爭搶鎖,
// 直接喚醒第一個等待者
runtime_Semrelease(&m.sema, true, 1)
}
}
一個問題
如果讀者朋友覺得看懂了源碼,可以留言回答一下這個問題:
對於 Mutex 直接 Unlock 操作爲什麼會 panic
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/kTlpaV22vaprJSEZy8KPsw