聽說 Mutex 源碼是出名的不好看,我不信,來試一下

如何實現 Mutex

MVP 方案

Mutex 需要兩個變量:key 表示鎖的使用情況,value 爲 0 表示鎖未被持有,1 表示鎖被持有 且 沒有等待者,n 表示鎖被持有,有 n-1 個等待者;sema 表示等待隊列的信號量,sema 是個先進先出的隊列,用來阻塞、喚醒協程。

type Mutex struct {
  key int32
  sema int32
}

對 Mutex 加鎖本質是用 CAS 操作設置 key+1,如果鎖未被持有,則協程持有鎖;如果鎖被持有,則協程排隊等待;

對 Mutex 解鎖本質是用 CAS 操作設置 key-1,如果 key 爲 0,代表沒有等待者,直接返回;如果 key 大於 0,代表有協程被阻塞,則按照 FIFO 的順序喚醒協程。

把加鎖、解鎖過程看成排隊上廁所,可參考下圖,加鎖:

解鎖:

這個 mvp 版本只是單純的能用,離好用還差很遠。比如如果 CPU 上有協程來搶鎖,但是隻能嚴格按照 FIFO 順序排隊的話,這樣協程併發量不高。因爲喚醒協程需要做上下文切換,而此時 CPU 上正運行着其他協程,理論上如果處於 CPU 時間片上的協程搶到鎖的話會有更好的性能。

解決調度公平性

於是 Go 團隊更改了調度順序,CPU 上的協程也有機會搶鎖。

這就好比排隊上廁所一樣,CPU 上的人離廁所門只有 1m 的距離,而被喚醒的人離廁所門可能有 10m 的距離,從全局最優的角度考慮,離門近的人進入廁所可以有更高的吞吐。

具體是把 Mutex 結構體中key由兩種含義拓然成了三種含義,並改名爲 state:

type Mutex struct {
  state int32
  sema uint32
}

這次改進把 state 中的第一位作爲 MutexLocked 專屬位,表示這個 Mutex 是否已經被協程持有。

第二位作爲 MutexWoken 專屬位,表示 Mutex 是否處於喚醒狀態,喚醒操作是一個比較耗性能的操作,由於每次搶鎖只會有一個協程獲取鎖,理論上不需要喚醒太多協程,當前執行喚醒操作時只會喚醒一個協程,且如果已有喚醒的協程就不會再執行喚醒操作了。

剩餘位爲 MutexWaiterShift 的位置,用來表示等待者的數量。

這個版本在加鎖時,CPU 上正在執行操作的協程可以和被喚醒的協程同時搶鎖,而不用嚴格執行先進先出。解鎖時 Mutex 要去除 MutexLocked 標記,並根據是否有等待者或是否有已經被喚醒的協程,去決定是否去喚醒協程。

然而這個版本還有一些問題,協程在搶鎖時只會執行一次,如果 Mutex 沒有被釋放,這個新來的協程可能就被阻塞排隊去了。

加入 spin 機制

這就好比上廁所一樣,你排了好久隊,好不容易排到了第一位可以去搶廁所了,由於廁所被鎖了,不得不重新排隊,是不是有點蛋疼。

此時如果你在廁所門口轉幾圈,理論上如果廁所裏面的人方便的比較快,你轉圈的時候它正好出來了,那你就可以進去了,這就是所謂的 spin 機制。

於是 Go 團隊繼續優化 Mutex,這次爲 Mutex 加入了自旋機制,協程檢測鎖是否被釋放時,如果鎖沒釋放會旋轉 (底層是執行若干次 PAUSE 指令),以期望在這個過程中鎖被釋放。自旋不是無限制的,協程旋轉幾圈後,如果還沒搶到鎖,那它只能乖乖去隊列尾部排隊了。

那麼到這裏 Mutex 是否完美了呢,其實這裏還有一個比較大的問題:如果你是第一次來搶 Mutex,搶不到讓你去隊尾排隊,這個看起來合乎常理,但是如果你已經排了半天隊,好不容易排到隊頭可以搶廁所了,卻被 CPU 上的協程搶走了鎖,還讓你去隊尾排隊有點殘忍,有可能造成你次次搶不到廁所,然後反覆到隊尾排隊,最後活活憋死了。

解決飢餓問題

於是 Go 團隊接着去解決這個飢餓問題。他們爲 Mutex 又新增了一個狀態:mutexStarving,這個狀態表示 Mutex 是否處於飢餓狀態。如果一個協程等待時長超過 1ms,那 Mutex 會進入飢餓狀態。在飢餓狀態下,鎖的執行權會由解鎖的協程直接交給隊列頭部的協程,  各個協程不再搶鎖,也不再自旋,而是嚴格按照 FIFO 的順序執行。這次改進中,還修復了被喚醒的協程需重新去隊尾排隊的問題:如果協程被喚醒後搶鎖失敗,會被放到隊列頭部等待喚醒。

這就是目前 Mutex 的實現現狀~

協程的狀態

前面介紹了 Mutex 的 3 種狀態和一種計數,協程內部也有幾個狀態,這幾個狀態會影響 Mutex 的狀態,讓我們分別來看一下:

iter

iter 用於記錄協程的自旋次數,在 go1.15 版本時,超過 4 次就不再自旋了。

awoke

awoke 表示協程是否喚醒。協程在自旋時,相當於 CPU 上已經有在等鎖的協程。爲避免 Mutex 解鎖時再喚醒其他協程,自旋時要嘗試把 Mutex 置爲喚醒狀態,Mutex 處於喚醒狀態後 要把本協程的 awoke 也置爲 true。

在搶鎖前如果協程處於喚醒狀態,那就需要把 Mutex 的 mutexWoken 狀態清空,以便於 Mutex 在解鎖時做喚醒操作。

starving

starving 表示協程的等待時間,如果等待時長超過 1ms,starving 置爲 true,後續操作會把 Mutex 也標記爲飢餓狀態。

Mutex 的狀態內部影響

前面介紹了 Mutex 的 3 種狀態和一種計數,也介紹了協程內部的幾個狀態,下面來看看搶鎖、解鎖時,這些狀態對一些操作的具體影響:

自旋操作

自旋操作是 協程發現鎖被佔用時等鎖釋放的方式。若鎖處於飢餓狀態,協程不再自旋而是直接去排隊;自旋操作能否執行和 協程的旋轉次數 iter 以及一些機器的 CPU 信息相關。

源碼中在執行自旋操作時要判斷 Mutex 是否處於飢餓或者加鎖狀態:如果鎖已經釋放,或者 Mutex 處於飢餓狀態,就沒必要執行自旋操作了。

加鎖操作

協程搶鎖操作是在 Mutex 非飢餓情況下進行的,如果 Mutex 處於飢餓狀態,協程會直接排隊。

等待者操作

增加等待者:若協程自旋操作完成後,Mutex 仍然處於加鎖或者飢餓狀態,那新來的協程只好乖乖去排隊了,此時等待者數量加 1。

減少等待者:解鎖時要喚醒協程,此時需要把 MutexWoken 標記爲 1 表示已有喚醒協程,並減少一個等待者。

若 Mutex 處於飢餓狀態時,調度協程時需要把 Mutex 標記爲加鎖,並減少一個等待者。

飢餓操作

什麼時候會飢餓:

Mutex 的 mutexStarving 標記是由協程的 starving 狀態計算的,如果一個協程被喚醒後發現距離第一次排隊時已經超過了 1ms, 下次 for 循環在嘗試加鎖時會把 Mutex 標記爲飢餓狀態。

什麼時候會解除飢餓:

Mutex 處於飢餓狀態後,如果發現等待隊列中只有一個協程,或者這個協程等待時長小於 1ms,就需要把 Mutex 轉換爲正常模式。

兩個先決函數

協程的阻塞操作是調用 runtime_SemacquireMutex 函數執行的,喚醒操作是調用 runtime_Semrelease 進行的。本篇文章暫時不講調度的具體細節。

Mutex 源碼解析

到這裏鋪墊的差不多了,可以正式看源碼了。

lock 和 unlock 方法爲了內聯操作把簡單狀態的邏輯單獨提出來了。大部頭的邏輯在 lockslow 和 unlockslow 裏面。

這裏有兩張圖表示爲啥提出來把大部頭邏輯提出來的好處:

lock 代碼:

func (m *Mutex) Lock() {
  // Fast path
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
  // slow path
 m.lockSlow()
}

lock 函數 fast path 解決的情況:

unlock 代碼:

func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }
  // Fast path: drop lock bit.
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
    // Slow path
  m.unlockSlow(new)
 }
}

unlock 函數 fast path 解決的情況:

下面來看一下 lockslow 的源碼

我把這部分源碼分成三部分,整體執行如圖:

第一部分是自旋部分,第二部分是爲搶鎖的 CAS 打鋪墊,給 Mutex 的各種狀態賦值,第三部分是通過 cas 操作給 Mutex 賦值,並根據 CAS 是否成功做一些相關邏輯處理。

來看每個部分內部的一些細節:

第一部分細節

第二部分細節

第三部分細節

接着讓我們通過幾個例子感受一下流程:

Mutex:0001, 新來協程

Mutex:1001, 等待者被喚醒

Mutex:3001, 等待者被喚醒

下面來看一下 unlockslow 的源碼

unlock

unlock 的邏輯比較比較簡單,核心就是喚醒等待者,是否喚醒等待者是根據是否存在等待者決定的,根據 Mutex 是否處於飢餓狀態決定喚醒那個等待者。

Mutex:2101 飢餓時喚醒等待者

Mutex 源碼註釋

下面是我閱讀 go1.15 中src/sync/mutex.go源碼時寫的一些註釋,對源碼感興趣的同學可以對照着看看,有啥問題歡迎找我交流:

type Mutex struct {
 state int32
 sema  uint32
}

type Locker interface {
 Lock()
 Unlock()
}

const (
 mutexLocked = 1 << iota // mutex is locked
 mutexWoken
 mutexStarving
 mutexWaiterShift = iota
 starvationThresholdNs = 1e6
)

func (m *Mutex) Lock() {
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 m.lockSlow()
}

func (m *Mutex) lockSlow() {
 var waitStartTime int64
 starving := false
 awoke := false
 iter := 0
 old := m.state

 // 加鎖時,會有多個協程會同時來搶Mutex,CAS操作可以保證原子性,如果一個協程修改了Mutex狀態後還搶鎖失敗,
 // 需要重新去搶鎖,所以這裏用大for循環把邏輯包起來。
 for {
  // 這個if循環主要判斷waiter是否可以自旋,自旋條件不滿足時,會執行下段代碼
  // old&(mutexLocked|mutexStarving) == mutexLocked 這段代碼是看old是否處於飢餓狀態,飢餓狀態沒必要自旋
  // 鎖釋放了,或者處於飢餓狀態,或者自旋次數夠了,就不再自旋了
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // 設置 mutexWoken 標記,這裏貌似和下面Unlock的代碼有聯動
   // 這裏是爲了告知持有鎖的goroutine在釋放鎖時不需要喚醒其他goroutine了,已經有goroutine在等待,以免喚醒太多的等待協程
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   runtime_doSpin()
   iter++
   old = m.state
   continue
  }

  // 下面四個if判斷分別用於對3種狀態和一個數量進行賦值,都是爲了給new賦值
  new := old
  // Don't try to acquire starving mutex, new arriving goroutines must queue.
  // 新狀態只有在非飢餓的條件下才可以加鎖
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
  // 如果old已經處於加鎖或者飢餓狀態,則等待者按照FIFO的順序排隊
  if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
  }

  // 如果符合飢餓條件 且鎖還沒有被釋放,則將其設置爲飢餓狀態
  // 如果鎖已經釋放了,那就去搶一次鎖。如果進入飢餓模式,那就乖乖去排隊了
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }

  // 這裏用來消除 awoke 標記
  if awoke {
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   // cas成功
   // 這裏用於清除Woken標記,因爲後面 goroutine 只會阻塞或者搶鎖成功
   // 釋放喚醒標識,當前goroutine都不再是喚醒狀態了,以便其他goroutine進來

   // cas失敗
   // 丟人,所以需要釋放之前搶到的 mutexWoken 標識
   new &^= mutexWoken
  }

  // 下面這裏嘗試給通過CAS操作把old變成new
  // 這裏無非就是加鎖成功,或者去排隊,排隊的話需要看排隊到頭部還是尾部
  if atomic.CompareAndSwapInt32(&m.state, old, new) {

   // 這裏還需要看一下
   // 如果原來的old未加鎖,且Mutex不處於飢餓狀態,那goroutine獲取到鎖之後就可以直接退出了
   if old&(mutexLocked|mutexStarving) == 0 {
    break // locked the mutex with CAS
   }

   // queueLifo 爲true,表示其是被喚醒的,排隊時排到頭部
   // queueLifo 爲false,表示是第一次排隊,排隊時排到隊列尾部
   queueLifo := waitStartTime != 0

   // 第一次排隊要記錄排隊時間
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }
   // 這裏會阻塞
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)

   // 後續代碼是隊列裏的goroutine被runtime_Semrelease喚醒後,從讓出的地方繼續執行
   
   // 如果這個協程處於飢餓狀態 或 等待時間大於1ms,則設置其爲飢餓狀態
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

   old = m.state
   // 如果Mutex處於飢餓狀態
   if old&mutexStarving != 0 {
    // 這裏是一些異常狀態
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }

    // 把Mutex設置爲加鎖,把等待隊列的waiter-1,
    delta := int32(mutexLocked - 1<<mutexWaiterShift)

    // 根據協程的狀態看看是否需要切換至正常模式
    if !starving || old>>mutexWaiterShift == 1 {

     delta -= mutexStarving
    }
    atomic.AddInt32(&m.state, delta)
    break
   }

   // goroutine處於正常模式,標記喚醒標識,然後重新自旋去搶鎖
   // 執行到這裏,這個goroutine是被喚醒的,需要把awoke標記 標識爲true
   awoke = true
   iter = 0
  } else {
   // mutex被其他goroutine用了,繼續回去自旋吧
   // 賦值失敗時還原狀態
   old = m.state
  }
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}


func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }

 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  // 說明state其他位不爲0, 那就直接進入slow path吧
  m.unlockSlow(new)
 }
}

func (m *Mutex) unlockSlow(new int32) {
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
 // mutex處於正常模式
 if new&mutexStarving == 0 {
  old := new
  for {
   // 如果等待隊列爲空 或者 已經有其他goroutine被喚醒 or 獲得了鎖 or 鎖處於飢餓模式
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    // 不需要喚醒goroutine,直接返回即可
    return
   }
   // 搶佔Woken標示位,獲取喚醒一個goroutine的權利
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // 調用這裏喚醒一個goroutine
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   // 搶佔不成功就歸位,然後循環去吧
   old = m.state
  }
 } else {
  // 飢餓模式下,後來的goroutine不會爭搶鎖,
  // 直接喚醒第一個等待者
  runtime_Semrelease(&m.sema, true, 1)
 }
}

一個問題

如果讀者朋友覺得看懂了源碼,可以留言回答一下這個問題:

對於 Mutex 直接 Unlock 操作爲什麼會 panic

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/kTlpaV22vaprJSEZy8KPsw