sync-Mutex 設計與實現

概述

sync.Mutex 實現了互斥鎖同步原語。

內部實現

我們來探究一下 sync.Mutex 的內部實現,文件路徑爲 $GOROOT/src/sync/mutex.go,筆者的 Go 版本爲 go1.19 linux/amd64

狀態標識變量

const (
    mutexLocked = 1 << iota     // 1, 互斥鎖已鎖定
    mutexWoken                  // 2, 互斥鎖被喚醒
    mutexStarving               // 4, 互斥鎖飢餓模式標識

    mutexWaiterShift = iota     // 3, 互斥鎖上等待的 goroutine 數量值的計算偏移量

    starvationThresholdNs = 1e6 // 進入飢餓模式的等待閾值,1 ms
)

兩種工作模式

互斥鎖可以在兩種模式下操作:正常模式和飢餓模式,正常模式的性能要好很多,因爲 goroutine 可以連續多次獲得互斥鎖,即使存在阻塞的 goroutine, 與此同時,飢餓模式是防止尾部的 goroutine 被餓死的重要機制 (高尾延時)。

正常模式

正常模式下,鎖的等待者採用 FIFO 先進先出隊列,但是一個剛被喚醒的 goroutine 並不會直接獲得鎖,而是要與新到達的 goroutine 競爭鎖的使用, 然而 新到達的 goroutine 有一個優勢:它們已經在 CPU 上運行,並且數量可能有很多,結果就是剛被喚醒的 goroutine 大概率會競爭失敗, 所以在這種情況下,失敗的 goroutine 被排在等待隊列最前面,如果等待的 goroutine 超過 1ms 無法獲得鎖,互斥鎖會切換到飢餓模式

飢餓模式

飢餓模式下,互斥鎖的所有權直接從解鎖的 goroutine 轉移到隊列最前面的 goroutine (優先級發生變化), 新到達的 goroutines 不會嘗試獲得鎖 (不參與競爭),也不會嘗試自旋等待,它們會自動排在隊列尾部,這樣可以避免已經等待了太久的 goroutine 被餓死。

如果一個 goroutine 獲得了鎖,並且滿足以下兩個條件任意一個時,互斥鎖會切換到正常模式:

  1. 它是隊列的最後一個 goroutine

  2. 它等待時間不超過 1 ms

Mutex 對象

Mutex 對象表示互斥鎖,零值狀態下表示未加鎖,Mutex 對象一旦使用後,就不能再複製 (因爲拷貝時無法複製 state, sema 字段的數據,會引起問題)。

state 字段表示鎖的狀態,低 3 位表示 3 種狀態:

v2S7FV

sema 字段表示用於控制鎖狀態的信號量。

// 根據 Go 內存模型的約束
// 第 n 次調用 Unlock 方法在第 m 次調用 Lock 方法之前同步,其中 n < m
// 調用 TryLock 方法成功,就相當於調用了 Lock 方法
// 調用 TryLock 方法失敗,不會建立任何同步原語約束關係
type Mutex struct {
 state int32
 sema  uint32
}

Locker 接口

type Locker interface {
 Lock()
 Unlock()
}

TryLock 方法

TryLock 方法嘗試獲取鎖並且返回是否成功 (調用非阻塞)。

func (m *Mutex) TryLock() bool {
 old := m.state
 if old&(mutexLocked|mutexStarving) != 0 {
  return false
 }

 // 這裏可能有 1 個 goroutine 在等待鎖
 // 但是可以嘗試在 goroutine 喚醒之前獲得鎖
 if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
  return false
 }

 ...
 return true
}

在標準庫中的註釋中有這樣一段話:

注意,TryLock 真實的使用場景很少,使用 TryLock 通常說明在互斥鎖的使用場景中,存在更深層次的問題。

官方的言外之意貌似並不鼓勵使用該方法?TryLock 方法添加是社區的強烈意願,官方一開始似乎並不感冒。筆者搜索了一下,發現目前標準庫中幾乎沒有使用到 TryLock 方法。

$ grep -nr "TryLock" "$(dirname $(which go))/../src" | grep -v "\/\/" | grep -v "test"

# 只有 2 個聲明和 1 個調用
.../src/sync/mutex.go:98:func (m *Mutex) TryLock() bool
.../src/sync/rwmutex.go:166:func (rw *RWMutex) TryLock() bool
.../src/sync/rwmutex.go:171:   if !rw.w.TryLock()

Lock 方法

Lock 方法表示加鎖操作,如果鎖已經被使用,調用方 goroutine 會陷入阻塞,直到鎖可用。

func (m *Mutex) Lock() {
 // 鎖當前狀態是未鎖定 (正好可以完成加鎖,然後直接返回)
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  ...
  return
 }
 // slow path 可以代碼內聯
 m.lockSlow()
}

lockSlow 方法

lockSlow 方法表示調用 Lock 方法時沒有直接獲取到鎖時,接下來的處理工作 (進入阻塞調用)。

                func (m *Mutex) lockSlow() {
 var waitStartTime int64 // 等待時間
 starving := false       // 是否處於飢餓模式
 awoke := false          // 是否處於喚醒狀態
 iter := 0               // 自旋迭代次數
 old := m.state          // 鎖當前狀態

 for {
  // 飢餓模式下不能自旋,因爲所有權轉移到了隊列前面的 goroutine
  // 新到達的 goroutine 不參與競爭並且不會嘗試自旋

  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // 如果鎖沒有處於飢餓模式並且符合自旋條件,goroutine 開始自旋
   // 嘗試設置 mutexWoken 標識
   // 這樣 Unlock 釋放鎖時就不用喚醒其他阻塞的 goroutine (直接讓當前 goroutine 獲得鎖)
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }

   runtime_doSpin() // goroutine 自旋
   iter++          // 增加自旋次數
   old = m.state   // 更新鎖狀態
   continue
  }

  new := old

  // 互斥鎖處於飢餓模式時,新到達的 goroutine 不要試圖獲得鎖,應該去排隊

  if old&mutexStarving == 0 {
   // 鎖處於非飢餓模式
   new |= mutexLocked
  }
  if old&(mutexLocked|mutexStarving) != 0 {
            // 鎖處於飢餓模式或者已加鎖
   // 增加等待的 goroutine 數量 (注意偏移量使用方法)
   new += 1 << mutexWaiterShift
  }

  // 當前 goroutine 將鎖切換到飢餓模式 (前提是鎖沒有被釋放,調用了 Unlock 方法)
  // 調用 Unlock 時,處於飢餓模式的鎖期望有等待的 goroutine, 但實際情況不一定有
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }
  if awoke {
   // 當前 goroutine 處於喚醒狀態,但是鎖的狀態裏並沒有 mutexWoken 標識
   // 拋出一個 BUG: 互斥鎖狀態不一致
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   // goroutine 被喚醒,重置 mutexWoken 標識
   new &^= mutexWoken
  }

  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&(mutexLocked|mutexStarving) == 0 {
    // 當前 goroutine 通過 CAS 操作獲得了鎖
    break
   }

   // 如果已經處於等待狀態,那麼直接排在隊列最前面
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }
   // 通過信號量保證鎖只能被 1 個 goroutine 獲取到
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)

   // 如果等待時間超過了閾值,那麼就進入飢餓模式
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   old = m.state
   if old&mutexStarving != 0 {
    // 如果當前 goroutine 被喚醒並且鎖處於飢餓模式
    // 控制權轉交給了當前 goroutine,但是互斥鎖處於某種不一致的狀態:
    //      mutexLocked 標識未設置,仍然認爲當前 goroutine 正在等待鎖
    //      拋出一個 BUG: mutex 狀態不一致
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }

    // 減少等待的 goroutine 數量 (注意偏移量使用方法)
    delta := int32(mutexLocked - 1<<mutexWaiterShift)
    if !starving || old>>mutexWaiterShift == 1 {
     // 退出飢餓模式
     // 必須要在這裏退出並且考慮等待時間

     // 飢餓模式效率很低,一旦 2 個 goroutine 同時將互斥鎖切換到飢餓模式,可能會陷入無限循環
     delta -= mutexStarving
    }
    atomic.AddInt32(&m.state, delta)
    break
   }
   awoke = true // 當前 goroutine 喚醒
   iter = 0     // 當前 goroutine 重置自旋次數
  } else {
   old = m.state
  }
 }

 ...
}

runtime_canSpin 方法

runtime_canSpin 方法檢測當前 goroutine 是否可以進行自旋操作,通過鏈接器鏈接到 sync_runtime_canSpin

// sync/runtime.go

func runtime_canSpin(i int) bool

運行自旋操作的條件比較苛刻,需要滿足:

  1. 程序運行在多核機器上且 GOMAXPROCS > 1

  2. 至少有一個正在運行的 P, 且本地運行隊列爲空

  3. 自旋次數不超過 4 次

自旋與互斥鎖相反,不做被動等待 (直接主動原地旋轉等待),因爲其可以在全局運行隊列或者其他處理器的運行隊列上面運行

//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
  return false
 }
 if p := getg().m.p.ptr(); !runqempty(p) {
  return false
 }
 return true
}

Go 調度器的設計者考慮了 操作的資源利用率 和 頻繁的線程搶佔給操作系統帶來的負載 之後,提出了 自旋線程 的概念。當 自旋線程 沒有找到可供其調度執行的 goroutine 時,並不會銷燬該線程,而是採取 自旋 的操作保存起來。雖然直觀上看起來浪費了一些資源,但是考慮一下 syscall (系統調用) 相關的情景就可以發現,相比 自旋 來說,線程間頻繁的搶佔、創建、銷燬等操作帶來的負載會更高。

Unlock 方法

Unlock 方法表示鎖的釋放操作,對未加鎖的互斥鎖執行解鎖操作會引發運行時錯誤。

** 互斥鎖並不是和某個具體的 goroutine 關聯的,完全可以在一個 goroutine 加鎖,在其他的 goroutine 中釋放鎖 (前提是兩者持有的是同一個鎖)**。

func (m *Mutex) Unlock() {
    ...

 // 如果去除 mutexLocked 標識之後正好是 0, 說明當前 goroutine 成功解鎖,直接返回即可
 new := atomic.AddInt32(&m.state, -mutexLocked)

 if new != 0 {
  // slow path 可以代碼內聯
  m.unlockSlow(new)
 }
}

unlockSlow 方法

unlockSlow 方法表示調用 Unlock 方法時沒有直接釋放鎖時,接下來的處理工作 (進入阻塞調用)。

func (m *Mutex) unlockSlow(new int32) {
 // 對未加鎖的互斥鎖執行解鎖操作會引發運行時錯誤
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }

 if new&mutexStarving == 0 {
  old := new
  for {
   // 如果鎖處於正常模式
   //    如果沒有等待 goroutine 或者已經有 goroutine 被喚醒或搶到了鎖,就不用喚醒其他 goroutine 了,直接返回即可

   // 如果鎖處於飢餓模式,所有權會直接從喚醒的 goroutine 轉移到隊列最前面的 goroutine
   // 如果當前 goroutine 不在這個調用鏈中
   //      可能是因爲在 unlock 互斥鎖時,沒有觀察到 mutexStarving 標識,所以直接返回即可
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }

   // 鎖有正在等待的 goroutine, 那麼被喚醒的 goroutine 得到鎖
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   old = m.state
  }
 } else {
  // 如果鎖處於處於飢餓模式
  // 將鎖的所有權轉移到下一個等待的 goroutine, 並讓其開始運行
  // 注意:mutexLocked 標識不需要被設置,因爲等待的 goroutine 被喚醒後會進行設置
  //      如果此時設置了 mutexStarving 標識,鎖仍然被認爲當前處於加鎖狀態,新到達的 goroutine 就無法獲取到鎖了
  runtime_Semrelease(&m.sema, true, 1)
 }
}

小結

互斥鎖內部實現中將狀態分成了兩種: 正常模式和飢餓模式。在正常模式下,鎖的分配採用競爭機制,在剛被喚醒的和新來的 goroutine 之間, 傾向於將鎖分配給新來的 goroutine, 競爭失敗的 goroutine 會被放到隊列首部,如果等待的 goroutine 超過 1ms 無法獲得鎖,互斥鎖會切換到飢餓模式。在飢餓模式下,從隊列首部的 goroutine 開始依次獲得鎖,新來的 goroutine 會自動排在隊列尾部,這樣就避免尾部 goroutine 餓死。此外,互斥鎖的加鎖和解鎖過程實現中還涉及到了信號、以及 GMP 調度等概念,本文暫時忽略掉了這些細節,後面有時間了再專門寫一篇。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/D9Zgh2pm6hqqbIOkefrNcw