深入理解 golang 的互斥鎖

在開始之前,我們需要知道鎖實現的幾種方式。

信號量 操作系統中有 P 和 V 操作。P 操作是將信號量值減去 1,V 操作是將信號量值增加 1. 因此信號量的操作模式爲:

  1. 初始化,給它一個非負整數值。

  2. 在程序試圖進入臨界區之前需要先運行 P 操作,然後會有兩種情況。

  1. 在程序離開臨界區時,需要執行 V 操作。當信號量 S 不是爲負時,之前被阻止的其他進程將允許進臨界區。

信號量和鎖 儘管信號量和鎖看起來相似,例如,當信號量爲 1 時,實現了互斥鎖,但實際上,它們具有不同的含義。

鎖用於保護臨界資源,例如讀和寫不能同時執行場景。

信號量是用於確保進程 (線程或 goroutine) 被調度。比如,三個進程共同計算 c = a+b。首先,a+b 的計算和賦值操作不能同時執行。其次,必須確保首先執行 a+b。c 在賦值之後執行,因此這個位置需要以信號量的形式執行。

此外,可以通過信號量實現鎖,然後 goroutine 可以根據規則阻塞和喚醒鎖;也可以通過自旋的方式實現鎖,goroutine 將持有 CPU,直到解鎖。

這兩種方式之間的區別是是否需要調度 goroutine,但是從本質上講,鎖是爲了確保不會錯誤地訪問臨界資源。

自旋鎖 CAS 理論是一種自旋鎖。

在同一時間只有一個線程獲得鎖,而沒有獲得鎖的線程通常有兩種處理方式:

自旋鎖的原理相對簡單。如果持有鎖的線程可以在短時間內釋放鎖定資源,那麼等待鎖的其他線程不需要在內核態和用戶態之間切換來回切換阻止狀態,他們只需要通過自旋的方式等一會,等待持有鎖的線程釋放鎖,然後獲取鎖,這種方式避免用戶進程在內核切換。

但如果長時間未釋放鎖,那麼自旋鎖的開銷會非常大,它會阻止其他線程的運行和調度。

線程持有鎖的時間越長,持有鎖的線程被 OS 調度中斷的風險就越大。

如果發生中斷,他線程將保持自旋狀態 (反覆嘗試獲取鎖),而持有鎖的線程不打算釋放鎖,這將導致無限延遲,直到持有鎖的線程完成並釋放鎖。

解決上述情況的一個好方法是爲自旋鎖定設置一個自旋時間,並在時間一到就釋放自旋鎖。

#悲觀鎖定和樂觀鎖定 悲觀鎖定是一種悲觀思維。它總是認爲最壞的情況可能會發生。它認爲這些數據很可能被其他人修改過。無論是讀還是寫,悲觀鎖都是在執行操作之前鎖定的。

樂觀鎖定的思想與悲觀鎖定的思想相反。它始終認爲資源和數據不會被別人修改,所以讀取不會被鎖定,但是樂觀鎖定會在寫操作時確定當前數據是否被修改過。

樂觀鎖定的實現方案主要包括 CAS 和版本號機制。樂觀鎖定適用於多讀場景這可以提高吞吐量。

CAS 是一個著名的基於比較和交換無鎖算法。

也就是在不使用鎖的情況下實現多個線程之間的變量同步,(即在不阻塞線程的情況下實現變量同步),所以又稱非阻塞同步。

CAS 涉及三種關係: 指向內存區域的指針 V、舊值 a 和要寫入的新值 B。

由 CAS 實現的樂觀鎖將帶來 ABA 問題。同時,整個樂觀鎖會在數據不一致的情況下觸發等待和重試機制,這對性能有很大的影響。

版本號機制通過版本的值實現版本控制。

有了以上的基礎知識,我們就可以開始分析 golang 是如何實現互斥鎖的了。

Golang 的 Mutex 實現一直在改進,到目前爲止,主要經歷了 4 個版本:

每一次改進都是爲了提高系統的整體性能。這個升級是漸進的、持續的,因此有必要從 V1 版本開始慢慢地看 Mutex 的演變過程。

V1: 簡單實現的版本 在 V1 版本中,互斥的完整源代碼如下。commit[1]

核心代碼如下:

func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
// The structure of the mutex, containing two fields
type Mutex struct {
  key int32 // Indication of whether the lock is held
  sema int32 // Semaphore dedicated to block/wake up goroutine
}
// Guaranteed to successfully increment the value of delta on val
func xadd(val *int32, delta int32) (new int32) {
    for {
        v := *val
        if cas(val, v, v+delta) {
            return v + delta
     }
    }
    panic("unreached")
}
// request lock
func (m *Mutex) Lock() {
    if xadd(&m.key, 1) == 1 { // Add 1 to the ID, if it is equal to 1, the lock is successfully acquired
    return
}
    semacquire(&m.sema) // Otherwise block waiting
}
func (m *Mutex) Unlock() {
    if xadd(&m.key, -1) == 0 { // Subtract 1 from the flag, if equal to 0, there are no other waiters
return
}
    semrelease(&m.sema) // Wake up other blocked goroutines
}

code here[2]

首先,互斥鎖的結構非常簡單,包括兩個字段,key 和 sema

key 表示有幾個 gorutines 正在使用或準備使用該鎖。如果 key==0,表示當前互斥鎖已解鎖,否則,key>0 表示當前互斥鎖已鎖定。

sema 是實際導致 goroutine 阻塞和喚醒的信號量。

xadd 是一個基於 cas 的加減法函數。

Lock 和 Unlock 是鎖定當前互斥鎖的核心函數,但邏輯非常簡單。

Unlock 使用 xadd 方法對 key 進行 -1 操作。如果結果不是 0,則意味着 goroutine 當前正在等待,需要調用 semrelease 來喚醒 goroutine。

想象一下這樣一個場景: 被阻塞的 goroutine(命名爲 g1) 不能佔用 CPU,因此需要在 g1 被喚醒後執行上下文切換。如果此時出現一個新的 goroutine (g2),它就擁有 CPU 資源。

如果把鎖給 g2,那麼它可以立即執行並返回結果 (而不是等待 g1 的上下文切換),這樣整體效率就可以提高到更高的水平。

V2: 新的 goroutine 參加鎖競爭.

完整的源代碼地址 https://github.com/golang/go/blob/weekly.2011-07-07/src/pkg/sync/mutex.go

在 V2 版本中,核心特性是 goroutine 在被喚醒後不會立即執行任務,而是重複搶佔鎖的過程,以便新的 goroutine 有機會獲得鎖,這就是給後來者機會。

互斥鎖結構和常量的定義如下:

// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

雖然互斥鎖結構的定義基本不變,但從 V1 的 key 到 V2 的 state,它們的內部結構卻有很大的不同。

第 0 位表示鎖定狀態 (L),即 0 表示解鎖,1 表示鎖定; 第 1 位表示喚醒狀態 (W),第 2 位到第 31 位表示阻塞等待的計數。

mutexLocked 的值是 0x1, mutexawake 的值是 0x2, mutexWaiterShift 的值是 0x2, mutexWaiterShift 表示任何表示阻塞等待數量的數組都需要左移兩位。

V2 版本的主要在於 Lock 方法的改進,代碼如下:

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
 // Fast path: grab unlocked mutex.
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  return
 }

 awoke := false
 for {
  old := m.state
  new := old | mutexLocked
  if old&mutexLocked != 0 {
   new = old + 1<<mutexWaiterShift
  }
  if awoke {
   // The goroutine has been woken from sleep,
   // so we need to reset the flag in either case.
   new &^= mutexWoken
  }
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&mutexLocked == 0 {
    break
   }
   runtime.Semacquire(&m.sema)
   awoke = true
  }
 }
}

code here[3]

第 2 行和第 4 行中的代碼邏輯應用於解鎖狀態,goroutine 通過 CAS 將 L 位從 0 設置爲 1 獲得鎖。

此時不存在任何爭用,因此獲取鎖幾乎不需要花費任何成本。

然後,goroutine 進入一個循環,其中 CAS 方法可確保正確疊加新狀態。

由於更改不是原子的,它可能會導致舊 state 的值變得無效。

同樣,即使當前狀態被鎖定,由於等待者的數量,舊狀態也將過時。

因此,您需要繼續通過循環獲得新的舊狀態。

在舊狀態未過時並覆蓋新狀態後,進入真正的鎖定步驟。

如果舊狀態未解鎖,則直接獲得鎖,否則,通過信號量機制阻塞當前 goroutine。

與 V1 不同的是,當前進程在被喚醒後,仍然會陷入 for 循環再次獲取鎖,這主要是給新的 goroutine 機會

Unlock 方法相對簡單。代碼如下:

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
 // Fast path: drop lock bit.
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if (new+mutexLocked)&mutexLocked == 0 {
  panic("sync: unlock of unlocked mutex")
 }

 old := new
 for {
  // If there are no waiters or a goroutine has already
  // been woken or grabbed the lock, no need to wake anyone.
  if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
   return
  }
  // Grab the right to wake someone.
  new = (old - 1<<mutexWaiterShift) | mutexWoken
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   runtime.Semrelease(&m.sema)
   return
  }
  old = m.state
  }
}

code here[4]

有兩個主要的解鎖邏輯:

V3:給新的 goroutines 更多機會 這個版本的優化,關鍵提交是 https://github.com/golang/go/commit/edcad8639a902741dc49f77d000ed62b0cc6956f

在 V2 的基礎上,如何進一步提高性能? 在大多數情況下,協程互斥鎖定期間的數據操作的耗時是非常低的,這可能比喚醒 + 切換上下文的耗時更低。

想象一個場景: GoroutineA 擁有 CPU 資源,GoroutineB 位於阻塞隊列的頭部。然後,當 GoroutineA 試圖獲取鎖時,它發現當前鎖已被佔用。根據 V2 策略,GoroutineA 立即被阻塞,假設鎖此時處於阻塞狀態。如果釋放,則 GoroutineB 將按計劃被喚醒,即整個運行時間包括 GoroutineB 的喚醒 + 上下文切換時間。

在 V3 版本的實現中,新的 Goroutine (GoroutineA) 被允許通過旋轉等待一段時間。如果在等待時間內釋放鎖,新的 Goroutine 立即獲取鎖資源,避免了舊 Goroutine 喚醒 + 上下文切換的耗時,提高了整體工作效率。

同樣,V3 的改進主要集中在 Lock 方法上,代碼如下:

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if raceenabled {
            raceAcquire(unsafe.Pointer(m))
        }
        return
    }

    awoke := false
    iter := 0
    for {
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 {
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime_Semacquire(&m.sema)
            awoke = true
            iter = 0
        }
    }

    if raceenabled {
        raceAcquire(unsafe.Pointer(m))
    }
}

code here[5]

與 V2 的實現代碼相比,它主要集中在第 14-25 行,多了兩個自旋鎖的核心函數 runtime_canSpin 和 runtime_doSpin

第一個是 runtime_canSpin 函數。傳入參數是迭代器(代表當前旋轉的數量)。runtime_canSpin 函數的實現功能是確定是否可以輸入當前的旋轉等待狀態。

如前所述,旋轉鎖在等待時不會釋放 CPU 資源,也不會消耗上下文切換,但如果旋轉時間過長,則會導致無意義的 CPU 消耗,這將進一步影響性能。

因此,在使用自旋鎖時,必須嚴格控制自旋鎖的進入過程。

最後一個是 runtime_doSpin 函數,可以簡單地理解爲 CPU 空閒一段時間,這就是自旋過程。

整個過程非常清晰。所有持有 CPU 的 goroutine 在 runtime_canSpin 函數通過檢查後執行自旋操作。如果在旋轉操作完成後,它們仍然沒有持有鎖,則 Goroutine 將被阻塞。其他邏輯與 V2 相同。

V4:解決老 Goroutine 的飢餓問題。

源地址下: https://github.com/golang/go/blob/go1.15.5/src/sync/mutex.go

從 V2 到 V3 的改進是爲新的 goroutines 提供更多的機會,這導致了老的 goroutine 飢餓問題,即新的獲取鎖的機會不會讓給老的 goroutines,因此這個問題主要集中在 V4 中改進。

首先是互斥鎖結構的 State 字段有了新的變化,增加了飢餓指示器 (S)。

0 表示沒有發生飢餓,1 表示發生了飢餓。(圖片中的 S 標誌位)

在新的定義中,常數的定義也發生了一些變化:

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving // separate out a starvation token from the state field
    mutexWaiterShift = iota
    starvationThresholdNs = 1e6
)

Lock 方法的邏輯如下所示:

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
 // Fast path: grab unlocked mutex.
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 // Slow path (outlined so that the fast path can be inlined)
 m.lockSlow()
}

func (m *Mutex) lockSlow() {
 var waitStartTime int64
 starving := false
 awoke := false
 iter := 0
 old := m.state
 for {
  // Don't spin in starvation mode, ownership is handed off to waiters
  // so we won't be able to acquire the mutex anyway.
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // Active spinning makes sense.
   // Try to set mutexWoken flag to inform Unlock
   // to not wake other blocked goroutines.
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   runtime_doSpin()
   iter++
   old = m.state
   continue
  }
  new := old
  // Don't try to acquire starving mutex, new arriving goroutines must queue.
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
  if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
  }
  // The current goroutine switches mutex to starvation mode.
  // But if the mutex is currently unlocked, don'do the switch.
  // Unlock expects that starving mutex has waiters, which will not
  // be true in this case.
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }
  if awoke {
   // The goroutine has been woken from sleep,
   // so we need to reset the flag in either case.
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   new &^= mutexWoken
  }
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&(mutexLocked|mutexStarving) == 0 {
    break // locked the mutex with CAS
   }
   // If we were already waiting before, queue at the front of the queue.
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   old = m.state
   if old&mutexStarving != 0 {
    // If this goroutine was woken and mutex is in starvation mode,
    // ownership was handed off to us but mutex is in somewhat
    // inconsistent state: mutexLocked is not set and we are still
    // accounted as waiter. Fix that.
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }
    delta := int32(mutexLocked - 1<<mutexWaiterShift)
    if !starving || old>>mutexWaiterShift == 1 {
     // Exit starvation mode.
     // Critical to do it here and consider wait time.
     // Starvation mode is so inefficient, that two goroutines
     // can go lock-step infinitely once they switch mutex
     // to starvation mode.
     delta -= mutexStarving
    }
    atomic.AddInt32(&m.state, delta)
    break
   }
   awoke = true
   iter = 0
  } else {
   old = m.state
  }
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}

code here[6]

我們主要關注 lockSlow 方法。在深入研究代碼之前,我們首先需要理解在什麼情況下當前鎖被認爲是飢餓的:

所以核心是記錄當前 Goroutine 開始等待的時間: 對於第一次進入鎖的 Goroutine,開始等待時間爲 0。對於場景 1,判斷標準爲開始等待時間是否爲 0。如果它不是 0,就意味着它以前被阻塞過。通過 (第 45 行)。

對於場景 2,判斷標準爲當前時間與開始等待時間的差值是否超過閾值。如果是這樣,這意味着 Goroutine 已經等待太久,應該進入飢餓狀態 (第 50 行)。

進一步,當我們知道如何判斷飢餓狀態時,飢餓模式和非飢餓模式的區別是什麼

首先,如果當前鎖被耗盡,任何新的 goroutine 都不會旋轉 (第 15 行)。

其次,如果當前 Goroutine 處於飢餓狀態,那麼當它被阻塞時,它將被添加到等待隊列的頭部 (下一個喚醒操作肯定會喚醒當前處於飢餓狀態的 Goroutine,第 45 行和第 49 行)。

最後,允許飢餓的 goroutine 在被喚醒後立即持有鎖,而不必與其他 goroutine 重新競爭鎖 (比較 V2,第 52 行和 62 行)。

V4 對應的 Unlock 也根據飢餓狀態進行了調整。代碼如下:

func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }

 // Fast path: drop lock bit.
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  // Outlined slow path to allow inlining the fast path.
  // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
  m.unlockSlow(new)
 }
}

func (m *Mutex) unlockSlow(new int32) {
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
 if new&mutexStarving == 0 {
  old := new
  for {
   // If there are no waiters or a goroutine has already
   // been woken or grabbed the lock, no need to wake anyone.
   // In starvation mode ownership is directly handed off from unlocking
   // goroutine to the next waiter. We are not part of this chain,
   // since we did not observe mutexStarving when we unlocked the mutex above.
   // So get off the way.
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }
   // Grab the right to wake someone.
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   old = m.state
  }
 } else {
  // Starving mode: handoff mutex ownership to the next waiter, and yield
  // our time slice so that the next waiter can start to run immediately.
  // Note: mutexLocked is not set, the waiter will set it after wakeup.
  // But mutex is still considered locked if mutexStarving is set,
  // so new coming goroutines won't acquire it.
  runtime_Semrelease(&m.sema, true, 1)
 }
}

code here[7]

與上鎖操作相比,解鎖操作更容易理解。

-END-

相關鏈接:

[1]https://github.com/golang/go/blob/d90e7cbac65c5792ce312ee82fbe03a5dfc98c6f/src/pkg/sync/mutex.go

[2] https://gist.github.com/tengergou/3851ef5ae21e4530272618ec096990ee#file-20220921-1-go

[3] https://gist.github.com/tengergou/77d6f4ab3acf0a951114fe045e1429a2#file-20220921-3-go

[4] https://gist.github.com/tengergou/53d9b51646b23369ae9e030631aca9b5#file-20220921-4-go

[5] https://gist.github.com/tengergou/13e1eff9a75821c5441cab49345e7c5d#file-20220921-5-go

[6] https://gist.github.com/tengergou/a10a675929d7ae203816c65cbf564672#file-20220921-6-go

[7]https://gist.github.com/tengergou/0ff186760513855ace82c56a68c6bc58#file-20220921-7-go

原文地址:

https://levelup.gitconnected.com/deep-understanding-of-golang-mutex-9964b02c56e9

原文作者:

Dwen

本文永久鏈接:

https:/github.com/gocn/translator/blob/master/2022/w47_Deep_Understanding_of_Golang_Mutex.md

譯者:小超人

校對:zxmfke

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/i1N9bmVSW1lGfOezvhcD7g