用 Go 語言講解死鎖、活鎖、飢餓、自旋鎖

死鎖

指兩個或多個進程因互相持有對方所需的資源而處於等待狀態,從而導致程序停止運行的現象。簡單來說,在一個系統中,如果進程之間形成了一個循環依賴關係,那麼就會發生死鎖。

💡 本文所有代碼實現語言爲 Go, 其他語言的讀者可以使用讀僞代碼的思路來閱讀代碼,主要目的在於理清邏輯主幹,不必過度陷入細節。

四個必要條件

  1. 互斥: 同一時刻只能有一個進程佔用資源,如果其他進程想要訪問該資源必須等待

  2. 佔有等待:進程已經持有了至少一個資源,並且正在等待其他資源。這意味着當一個進程被阻塞時,它仍然在持有至少一個資源

  3. 不可搶佔:資源不能被強制性地釋放,只能由佔用它的進程主動釋放

  4. 循環等待:存在一個等待循環隊列,其中每個進程都在等待下一個進程所持有的資源

示例代碼

死鎖應該是日常開發中遇到的最多的 “鎖異常” 場景,例如在一個 goroutine 內部對同一個 channel 同時進行讀寫:

package main

func main() {
 ch := make(chan bool)

 ch <- true
 <-ch

 close(ch)
}

更多可能產生死鎖的場景,可以參考 之前的這篇文章 [1]。

死鎖處理方法

主要分爲被動策略和主動策略。

被動策略

鴕鳥策略

像鴕鳥一樣,把頭埋在沙子裏,假裝什麼都沒發生。

因爲要解決死鎖問題付出的代價很高,鴕鳥策略這種不採取任務措施的方案反而會獲得更高的性能。 當發生死鎖時不會對用戶造成多大影響,或發生死鎖的概率很低,可以採用鴕鳥策略。 大多數操作系統,包括 Unix,Linux 和 Windows,處理死鎖問題的辦法僅僅是忽略它。

主動策略

預防死鎖

針對死鎖發生的四個必要條件,我們可以逆向操作,只要保證四個必要條件不同時發生,即可預防死鎖的發生。

1. 破壞互斥條件

同一時刻允許多個線程佔用某些資源,但是有互斥限制的場景中,資源往往就是需要獨佔的,例如打印機、全局計數器等。問題與解決方案形成了悖論,所以該方案並無實用性。

2. 破壞佔有等待

實行資源預先分配策略,線程在運行前一次性地向系統申請它所需要的全部資源。如果某個線程所需的全部資源無法得到滿足,則不分配任何資源,此線程不會運行。 當系統能夠滿足線程的全部資源請求時,一次性地將所申請的資源全部分配到線程,此時線程擁有它運行所需的全部資源,所以不會發生佔有資源同時又申請等待資源的現象,避免了死鎖的發生。

3. 破壞不可搶佔

允許線程搶佔資源,當一個線程申請新的資源但不能立即被滿足時,它必須釋放佔有的全部資源,之後再重新申請。 它所釋放的資源可以分配給其它線程,這就相當於該線程佔有的資源被隱性搶佔了,該方法實現較爲困難並且會降低系統性能。

4. 破壞循環等待

實行資源有序分配,把所有資源先進行分類、編號、分配,使線程在申請資源時不會形成環形等待。 所有線程對資源的請求必須嚴格按資源序號順序分配,這樣多個線程之間就不會產生資源循環等待 (因爲每個線程都可以獲取執行所需的全部資源),從而預防了發生死鎖。

避免死鎖

在程序執行時,通過將各項操作轉換爲安全的序列化操作等方法來判斷是否存在死鎖,並在判斷可能會發生死鎖時採取預防措施,避免死鎖的發生。

安全狀態

如圖所示,每個表格第二列表示線程已擁有的資源數,第三列表示線程需要的所有資源數,“空閒數” 表示當前可以使用的資源數。

  1. 初始狀態下有 3 個可分配資源;

  2. 給 B 分配 2 個資源,這樣 B 就擁有執行所需的全部資源,此時剩餘資源爲 3 - 2 = 1;

  3. B 執行完成後釋放其全部資源,此時剩餘資源爲 1 + 4 = 5;

  4. 給 C 分配 5 個資源,這樣 C 就擁有執行所需的全部資源,此時剩餘資源爲 5 - 5 = 0;

  5. C 執行完成後釋放其全部資源,此時剩餘資源爲 0 + 7 = 7;

  6. 給 A 分配 7 個資源,讓 A 執行,程序結束;

上述執行過程中的第 4 步,如果不是給 C 分配資源,而是給 A 分配資源,那麼就無法保證安全狀態從而進入死鎖,感興趣的讀者可以改變表格數據來驗證結果。

銀行家算法

一個小城鎮的銀行家,向一羣客戶分別承諾了一定的貸款額度,算法要做的事情是:

判斷貸款請求通過後,是否會進入不安全狀態,如果不會進入不安全狀態,就同意貸款請求,否則拒絕貸款請求

銀行家算法

具體的資源分配過程如下:

  1. 初始狀態下有 10 個可分配資源;

  2. 給 A, B, C, D 分配資源後,此時剩餘資源爲 10 - 8 = 2;

  3. 此時不會進入不安全狀態的唯一選擇是將剩餘的 2 個資源分配給 C ;

  4. 如果分配到其他進程 / 線程,就會進入不安全狀態,產生死鎖;

  5. 銀行家算法必須能夠正確檢測到繼續分配是否會產生死鎖,並且提前拒絕分配 (如圖 b 到圖 c 的過程);

在現實世界中,很少有操作系統使用銀行家算法來避免死鎖,因爲很少有進程在運行前就知道其運行所需資源的最大值,而且進程數量也不是固定的,處於不斷變化之中, 除此之外,資源也可能變爲不可用狀態 (例如打印機離線、磁盤損壞等)。

檢測死鎖

在程序執行時,通過週期性地掃描資源分配情況來檢測死鎖的發生,並採取相應的措施消除死鎖。

不試圖阻止死鎖,而應當檢測到死鎖發生時,採取措施進行恢復。

1. 每種類型一個資源的死鎖檢測

死鎖檢測

如圖所示的資源分配中,方框表示資源,圓圈表示進程,資源指向進程表示該資源已經分配給該進程,進程指向資源表示進程請求獲取該資源

圖 b 是從圖 a 剝離出來的環路圖,滿足了死鎖條件中的循環,因此會發生死鎖。

檢測算法概述:通過檢測是否存在有向環路來實現,從一個節點出發進行 DFS (深度優先搜索),對訪問過的節點進行標記,如果訪問了已經標記的節點,就表示有向圖存在環,也就是檢測到了死鎖。

2. 每種類型多個資源的死鎖檢測

##     A        B        C        D
##   磁帶機    繪圖儀    掃描儀     光驅

E = (  4        2        3        1  )

A = (  2        1        0        0  )

     _                              _
    |  0        0        1        0  |
C = |  2        0        0        1  |
    |  0        1        2        0  |
     -                              -

     _                              _
    |  2        0        0        1  |
R = |  1        0        1        0  |
    |  2        1        0        0  |
     -                              -

上面的代碼示例中,有 3 個進程和 4 種資源:

通過對進程和資源的統計,可以獲取到當前系統的狀態:

檢測算法概述

每個進程初始時都不被標記,執行過程中有可能被標記,當算法結束時,任何沒有被標記的進程說明內部產生了死鎖。

  1. 尋找一個還未標記的進程 Pi,它所請求的資源小於等於 A;

  2. 如果找到相應的進程,那麼將矩陣 C 的第 i 行向量加到 A 中,標記該進程,並返回到第一步繼續執行;

  3. 如果沒有找到相應的進程,算法執行結束;

解除死鎖

在死鎖發生時,可以採用 kill 進程、搶佔資源和回滾 等方法來解除死鎖。

活鎖

指線程無法取得需要的資源而一直重試的現象。與死鎖不同,活鎖中的線程不會被阻塞,它們會一直嘗試獲取資源,但是卻一直失敗,最終導致程序無法正常執行。

舉個生活中的小例子,兩個人相向過馬路,如果兩人同時向一邊謙讓,那麼兩個人都過不去,緊接着兩人同時又移到另一邊,此時兩個人依然過不去。 如果不受其他因素干擾,兩個人一直在同步移動,那麼最終的結果就是兩個人都沒有前進,產生了活鎖 (這個詞很形象,兩個人都被對象活活地鎖住了)。

處理方法

1. 引入隨機性

在代碼中引入一定的隨機性可以避免發生活鎖,例如在重試的過程中,引入隨機的休眠時間來中斷死循環,讓線程有機會釋放資源並且重新獲取資源。

2. 引入系統時間戳

通過比較系統時間戳來決定線程是否需要繼續等待,因爲多個線程獲取到的時間戳 (系統時鐘) 不可能完全一致,可以在時間戳的基礎上進行優先級排序,最後通過排序後的線程順序進行調度。

示例代碼

剛纔的小例子轉換成代碼如下:

package main

import (
 "fmt"
 "sync"
 "sync/atomic"
 "time"
)

var (
 // 使用一個互斥信號量來同步
 cond = sync.NewCond(&sync.Mutex{})

 // 分別表示左右兩個方向的計數器 (默認值爲 0)
 // 也就是說,兩個人碰面時,爲了給對方讓路,會向左或向右移動
 leftCnt, rightCnt int32
)

// 信號量加鎖操作
// 兩個人在移動方向時必須加鎖
func takeStep() {
 cond.L.Lock()
 cond.Wait()
 cond.L.Unlock()
}

// 方向移動
func move(name, dir string, cnt *int32) bool {
 fmt.Printf("%s 走到了 %v\n", name, dir)

 // 當前方向計數器 + 1
 atomic.AddInt32(cnt, 1)

 takeStep()

 // 如果當前計數器只被一個人修改過
 // 說明這個人移動了方向,但是對方未移動,此時可以讓對方先走,程序直接返回即可
 if atomic.LoadInt32(cnt) == 1 {
  // 因爲活鎖
  // 所以代碼永遠執行不到這裏
  fmt.Printf("%s 給對方讓路成功 \n", name)
  return true
 }

 takeStep()

 // 當前方向計數器 - 1
 atomic.AddInt32(cnt, -1)

 return false
}

func giveWay(name string) {
 fmt.Printf("%s 嘗試給對方讓路 ... \n", name)

 // 模擬三次雙方互相讓路
 for i := 0; i < 3; i++ {
  if move(name, "左邊"&leftCnt) || move(name, "右邊"&rightCnt) {
   return
  }
 }

 fmt.Printf("%v 無奈地說: 咱們可以停止互相給對方讓路了,你先過!\n", name)
}

func main() {
 go func() {
  // 1 毫秒之後發出通知,釋放鎖
  for range time.Tick(1 * time.Millisecond) {
   cond.Broadcast()
  }
 }()

 var wg sync.WaitGroup
 // 模擬兩個人
 // 小明和小紅
 wg.Add(2)

 go func() {
  defer wg.Done()
  giveWay("小明")
 }()

 go func() {
  defer wg.Done()
  giveWay("小紅")
 }()

 wg.Wait()
}

運行上面的代碼,可以看到輸出結果和描述的活鎖產生過程是一致的 (筆者加了一些分割線輔助閱讀):

# go run main.go

# 你的輸出和這裏可能不完全一樣,但是邏輯是一致的

# 雙方第一次讓路
小紅 嘗試給對方讓路 ...
小紅 走到了 左邊
小明 嘗試給對方讓路 ...
小明 走到了 左邊
--------------------------------
小紅 走到了 右邊
小明 走到了 右邊
--------------------------------

# 雙方第二次讓路

小紅 走到了 左邊
小明 走到了 左邊
小紅 走到了 右邊
小明 走到了 右邊
--------------------------------

# 雙方第三次讓路

小紅 走到了 左邊
小明 走到了 左邊
小紅 走到了 右邊
小明 走到了 右邊
--------------------------------

# 雙方完成了第三次讓路

小紅 無奈地說: 咱們可以停止互相給對方讓路了,你先過!
小明 無奈地說: 咱們可以停止互相給對方讓路了,你先過!

自旋鎖

是一種互斥鎖的實現方式,當線程嘗試獲得一個鎖時,如果發現這個鎖已經被其他線程佔用,它會不斷地重複嘗試獲取鎖,而不是放棄 CPU 的控制權。這個過程被稱爲自旋,它能夠有效地減少線程切換的開銷,提高鎖的性能。 自旋鎖同時避免了進程上下文的調度開銷,因此對於短時間內的線程阻塞場景是有效的。

示例代碼

最簡單的實現就是直接在循環中不斷嘗試獲取鎖,代碼如下:

package main

import "sync"

func main() {
 mu :=  sync.Mutex{}

 for !mu.TryLock() {
  // 獲取到鎖之後
  // 執行某些操作

  mu.Unlock()
 }
}

當然,這個簡單的代碼性能實在堪憂,我們可以將其優化一下:

package main

import (
 "fmt"
 "sync"
 "sync/atomic"
)

type spinLock uint32

// 獲取自旋鎖
func (sl *spinLock) lock() {
 for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
  // 獲取到自旋鎖
 }
}

// 釋放自旋鎖
func (sl *spinLock) unlock() {
 atomic.StoreUint32((*uint32)(sl), 0)
}

func main() {
 var sl spinLock
 var wg sync.WaitGroup

 for i := 0; i < 10; i++ {
  wg.Add(1)
  go func(index int) {
   defer wg.Done()

   sl.lock()

   fmt.Printf("index %d got spin lock\n", index)

   sl.unlock()
  }(i)
 }

 wg.Wait()
}

除此之外,還可以瞭解一下標準庫中的做法,在比如我們之前在 sync.Mutex 設計與實現一文中 [2] 摘錄的標準庫中的自旋鎖實現代碼:

// $GOROOT/src/runtime/proc.go

func sync_runtime_canSpin(i int) bool {
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
  return false
 }
 if p := getg().m.p.ptr(); !runqempty(p) {
  return false
 }
 return true
}

餓死 (Starvation)

餓死(Starvation)也稱爲飢餓,是指某個進程因無法獲取所需資源而無法執行,一直處於等待狀態的情況。

餓死與死鎖的差別在於: 死鎖是由於多個進程 / 線程互相競爭資源造成的,餓死則是某個進程 / 線程無法獲取資源造成的。

處理方法

1. 公平調度

使用公平性調度算法可以確保每個進程 / 線程都有機會獲得所需資源,尤其是對於 CPU 調度來說,可採用先到先服務或者時間片輪轉的方式, 避免某個進程 / 線程一直佔用資源,而其他進程 / 線程無法得到執行的情況。

2. 優先級反轉

當一個低優先級的進程 / 線程佔用了高優先級進程 / 線程所需資源時,高優先級進程 / 線程就會被迫等待,從而可能導致其餓死,可以使用 “優先級繼承” 或 “優先級反轉” 方法來避免這種情況發生。

優先級反轉的作用在於避免優先級繼承可能引發的問題。

下面舉個小例子來說明這種情況:

優先級翻轉的解決方法是:

3. 限制等待時間

當一個進程 / 線程等待的時間達到一定閾值之後,系統可以強制釋放該進程 / 線程所佔用的資源,以確保其他進程 / 線程也有機會獲得這些資源。

示例代碼

我們可以使用兩個 goroutine 來模擬 餓死 現象:

package main

import (
 "fmt"
 "sync"
 "time"
)

const (
 // 單個 goroutine 執行總時長
 runtime = 1 * time.Second
)

func main() {
 var wg sync.WaitGroup
 wg.Add(2)

 // 佔用資源過多的 goroutine
 go func() {
  defer wg.Done()

  count := 0
  for begin := time.Now(); time.Since(begin) <= runtime; {
   count++

   // 休眠時間 1 毫秒, 模擬佔用資源多
   time.Sleep(1 * time.Millisecond)
  }

  fmt.Printf("佔用資源過多的 goroutine 執行了 %d 次\n", count)
 }()

 // 佔用資源很少的 goroutine
 go func() {
  defer wg.Done()

  count := 0
  for begin := time.Now(); time.Since(begin) <= runtime; {
   count++

   // 休眠時間 10 微秒, 模擬佔用資源少
   time.Sleep(1 * time.Nanosecond)
  }

  fmt.Printf("佔用資源很少的 goroutine 執行了 %d 次\n", count)
 }()

 wg.Wait()
}

運行上面的代碼,我們可以看到佔用資源很少的 goroutine 獲得資源 (執行的次數) 要遠遠低於佔用資源過多的 goroutine

# go run main.go

佔用資源很少的 goroutine 執行了 3951200 次
佔用資源過多的 goroutine 執行了 997 次

Reference

鏈接

[1]

之前的這篇文章: https://dbwu.tech/posts/golang_goroutine_leak/

[2]

sync.Mutex 設計與實現一文中: https://dbwu.tech/posts/golang_sync_mutex/

[3]

現代操作系統: https://book.douban.com/subject/27096665/

[4]

Concurrency in Go: https://book.douban.com/subject/26994591/

[5]

Deadlock: https://en.wikipedia.org/wiki/Deadlock

[6]

Spinlock: https://en.wikipedia.org/wiki/Spinlock

[7]

如何理解互斥鎖、條件鎖、讀寫鎖以及自旋鎖?: https://www.zhihu.com/question/66733477

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s?__biz=MzAxMTA4Njc0OQ==&amp;mid=2651454472&amp;idx=1&amp;sn=8e46d1238dd09199839abeba6fd6955f&amp;scene=21#wechat_redirect