sync-Mutex 設計與實現
概述
sync.Mutex
實現了互斥鎖同步原語。
內部實現
我們來探究一下 sync.Mutex
的內部實現,文件路徑爲 $GOROOT/src/sync/mutex.go
,筆者的 Go 版本爲 go1.19 linux/amd64
。
狀態標識變量
const (
mutexLocked = 1 << iota // 1, 互斥鎖已鎖定
mutexWoken // 2, 互斥鎖被喚醒
mutexStarving // 4, 互斥鎖飢餓模式標識
mutexWaiterShift = iota // 3, 互斥鎖上等待的 goroutine 數量值的計算偏移量
starvationThresholdNs = 1e6 // 進入飢餓模式的等待閾值,1 ms
)
兩種工作模式
互斥鎖可以在兩種模式下操作:正常模式和飢餓模式,正常模式的性能要好很多,因爲 goroutine
可以連續多次獲得互斥鎖,即使存在阻塞的 goroutine, 與此同時,飢餓模式是防止尾部的 goroutine
被餓死的重要機制 (高尾延時)。
正常模式
正常模式下,鎖的等待者採用 FIFO
先進先出隊列,但是一個剛被喚醒的 goroutine
並不會直接獲得鎖,而是要與新到達的 goroutine
競爭鎖的使用, 然而 新到達的 goroutine
有一個優勢:它們已經在 CPU 上運行,並且數量可能有很多,結果就是剛被喚醒的 goroutine
大概率會競爭失敗, 所以在這種情況下,失敗的 goroutine
被排在等待隊列最前面,如果等待的 goroutine
超過 1ms 無法獲得鎖,互斥鎖會切換到飢餓模式。
飢餓模式
飢餓模式下,互斥鎖的所有權直接從解鎖的 goroutine
轉移到隊列最前面的 goroutine
(優先級發生變化), 新到達的 goroutines
不會嘗試獲得鎖 (不參與競爭),也不會嘗試自旋等待,它們會自動排在隊列尾部,這樣可以避免已經等待了太久的 goroutine
被餓死。
如果一個 goroutine
獲得了鎖,並且滿足以下兩個條件任意一個時,互斥鎖會切換到正常模式:
-
它是隊列的最後一個 goroutine
-
它等待時間不超過 1 ms
Mutex 對象
Mutex
對象表示互斥鎖,零值狀態下表示未加鎖,Mutex 對象一旦使用後,就不能再複製 (因爲拷貝時無法複製 state, sema 字段的數據,會引起問題)。
state
字段表示鎖的狀態,低 3 位表示 3 種狀態:
sema
字段表示用於控制鎖狀態的信號量。
// 根據 Go 內存模型的約束
// 第 n 次調用 Unlock 方法在第 m 次調用 Lock 方法之前同步,其中 n < m
// 調用 TryLock 方法成功,就相當於調用了 Lock 方法
// 調用 TryLock 方法失敗,不會建立任何同步原語約束關係
type Mutex struct {
state int32
sema uint32
}
Locker 接口
type Locker interface {
Lock()
Unlock()
}
TryLock 方法
TryLock
方法嘗試獲取鎖並且返回是否成功 (調用非阻塞)。
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// 這裏可能有 1 個 goroutine 在等待鎖
// 但是可以嘗試在 goroutine 喚醒之前獲得鎖
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
...
return true
}
在標準庫中的註釋中有這樣一段話:
注意,TryLock 真實的使用場景很少,使用 TryLock 通常說明在互斥鎖的使用場景中,存在更深層次的問題。
官方的言外之意貌似並不鼓勵使用該方法?TryLock
方法添加是社區的強烈意願,官方一開始似乎並不感冒。筆者搜索了一下,發現目前標準庫中幾乎沒有使用到 TryLock
方法。
$ grep -nr "TryLock" "$(dirname $(which go))/../src" | grep -v "\/\/" | grep -v "test"
# 只有 2 個聲明和 1 個調用
.../src/sync/mutex.go:98:func (m *Mutex) TryLock() bool
.../src/sync/rwmutex.go:166:func (rw *RWMutex) TryLock() bool
.../src/sync/rwmutex.go:171: if !rw.w.TryLock()
Lock 方法
Lock
方法表示加鎖操作,如果鎖已經被使用,調用方 goroutine
會陷入阻塞,直到鎖可用。
func (m *Mutex) Lock() {
// 鎖當前狀態是未鎖定 (正好可以完成加鎖,然後直接返回)
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
...
return
}
// slow path 可以代碼內聯
m.lockSlow()
}
lockSlow 方法
lockSlow
方法表示調用 Lock
方法時沒有直接獲取到鎖時,接下來的處理工作 (進入阻塞調用)。
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待時間
starving := false // 是否處於飢餓模式
awoke := false // 是否處於喚醒狀態
iter := 0 // 自旋迭代次數
old := m.state // 鎖當前狀態
for {
// 飢餓模式下不能自旋,因爲所有權轉移到了隊列前面的 goroutine
// 新到達的 goroutine 不參與競爭並且不會嘗試自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果鎖沒有處於飢餓模式並且符合自旋條件,goroutine 開始自旋
// 嘗試設置 mutexWoken 標識
// 這樣 Unlock 釋放鎖時就不用喚醒其他阻塞的 goroutine (直接讓當前 goroutine 獲得鎖)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // goroutine 自旋
iter++ // 增加自旋次數
old = m.state // 更新鎖狀態
continue
}
new := old
// 互斥鎖處於飢餓模式時,新到達的 goroutine 不要試圖獲得鎖,應該去排隊
if old&mutexStarving == 0 {
// 鎖處於非飢餓模式
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 鎖處於飢餓模式或者已加鎖
// 增加等待的 goroutine 數量 (注意偏移量使用方法)
new += 1 << mutexWaiterShift
}
// 當前 goroutine 將鎖切換到飢餓模式 (前提是鎖沒有被釋放,調用了 Unlock 方法)
// 調用 Unlock 時,處於飢餓模式的鎖期望有等待的 goroutine, 但實際情況不一定有
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// 當前 goroutine 處於喚醒狀態,但是鎖的狀態裏並沒有 mutexWoken 標識
// 拋出一個 BUG: 互斥鎖狀態不一致
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// goroutine 被喚醒,重置 mutexWoken 標識
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 當前 goroutine 通過 CAS 操作獲得了鎖
break
}
// 如果已經處於等待狀態,那麼直接排在隊列最前面
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 通過信號量保證鎖只能被 1 個 goroutine 獲取到
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 如果等待時間超過了閾值,那麼就進入飢餓模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 如果當前 goroutine 被喚醒並且鎖處於飢餓模式
// 控制權轉交給了當前 goroutine,但是互斥鎖處於某種不一致的狀態:
// mutexLocked 標識未設置,仍然認爲當前 goroutine 正在等待鎖
// 拋出一個 BUG: mutex 狀態不一致
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 減少等待的 goroutine 數量 (注意偏移量使用方法)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 退出飢餓模式
// 必須要在這裏退出並且考慮等待時間
// 飢餓模式效率很低,一旦 2 個 goroutine 同時將互斥鎖切換到飢餓模式,可能會陷入無限循環
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true // 當前 goroutine 喚醒
iter = 0 // 當前 goroutine 重置自旋次數
} else {
old = m.state
}
}
...
}
runtime_canSpin 方法
runtime_canSpin
方法檢測當前 goroutine
是否可以進行自旋操作,通過鏈接器鏈接到 sync_runtime_canSpin
。
// sync/runtime.go
func runtime_canSpin(i int) bool
運行自旋操作的條件比較苛刻,需要滿足:
-
程序運行在多核機器上且 GOMAXPROCS > 1
-
至少有一個正在運行的 P, 且本地運行隊列爲空
-
自旋次數不超過 4 次
自旋與互斥鎖相反,不做被動等待 (直接主動原地旋轉等待),因爲其可以在全局運行隊列或者其他處理器的運行隊列上面運行。
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
Go 調度器的設計者考慮了 操作的資源利用率 和 頻繁的線程搶佔給操作系統帶來的負載 之後,提出了
自旋線程
的概念。當自旋線程
沒有找到可供其調度執行的 goroutine 時,並不會銷燬該線程,而是採取自旋
的操作保存起來。雖然直觀上看起來浪費了一些資源,但是考慮一下syscall (系統調用)
相關的情景就可以發現,相比自旋
來說,線程間頻繁的搶佔、創建、銷燬等操作帶來的負載會更高。
Unlock 方法
Unlock
方法表示鎖的釋放操作,對未加鎖的互斥鎖執行解鎖操作會引發運行時錯誤。
** 互斥鎖並不是和某個具體的 goroutine 關聯的,完全可以在一個 goroutine 加鎖,在其他的 goroutine 中釋放鎖 (前提是兩者持有的是同一個鎖)**。
func (m *Mutex) Unlock() {
...
// 如果去除 mutexLocked 標識之後正好是 0, 說明當前 goroutine 成功解鎖,直接返回即可
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// slow path 可以代碼內聯
m.unlockSlow(new)
}
}
unlockSlow 方法
unlockSlow
方法表示調用 Unlock
方法時沒有直接釋放鎖時,接下來的處理工作 (進入阻塞調用)。
func (m *Mutex) unlockSlow(new int32) {
// 對未加鎖的互斥鎖執行解鎖操作會引發運行時錯誤
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 如果鎖處於正常模式
// 如果沒有等待 goroutine 或者已經有 goroutine 被喚醒或搶到了鎖,就不用喚醒其他 goroutine 了,直接返回即可
// 如果鎖處於飢餓模式,所有權會直接從喚醒的 goroutine 轉移到隊列最前面的 goroutine
// 如果當前 goroutine 不在這個調用鏈中
// 可能是因爲在 unlock 互斥鎖時,沒有觀察到 mutexStarving 標識,所以直接返回即可
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 鎖有正在等待的 goroutine, 那麼被喚醒的 goroutine 得到鎖
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 如果鎖處於處於飢餓模式
// 將鎖的所有權轉移到下一個等待的 goroutine, 並讓其開始運行
// 注意:mutexLocked 標識不需要被設置,因爲等待的 goroutine 被喚醒後會進行設置
// 如果此時設置了 mutexStarving 標識,鎖仍然被認爲當前處於加鎖狀態,新到達的 goroutine 就無法獲取到鎖了
runtime_Semrelease(&m.sema, true, 1)
}
}
小結
互斥鎖內部實現中將狀態分成了兩種: 正常模式和飢餓模式。在正常模式下,鎖的分配採用競爭機制,在剛被喚醒的和新來的 goroutine
之間, 傾向於將鎖分配給新來的 goroutine
, 競爭失敗的 goroutine
會被放到隊列首部,如果等待的 goroutine
超過 1ms 無法獲得鎖,互斥鎖會切換到飢餓模式。在飢餓模式下,從隊列首部的 goroutine
開始依次獲得鎖,新來的 goroutine
會自動排在隊列尾部,這樣就避免尾部 goroutine
餓死。此外,互斥鎖的加鎖和解鎖過程實現中還涉及到了信號、以及 GMP 調度等概念,本文暫時忽略掉了這些細節,後面有時間了再專門寫一篇。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/D9Zgh2pm6hqqbIOkefrNcw