Go Sync 包:併發的 6 個關鍵概念
1.sync.Mutex 和 sync.RWMutex
要知道,mutex(互斥)對於我們 gopher 來說就像一個老夥計。在處理 goroutine 時,確保它們不會同時訪問資源是非常重要的,而 mutex 可以幫助我們做到這一點。
sync.Mutex
看看這個簡單的例子,我沒有使用互斥鎖來保護我們的變量 a
:
var a = 0
func Add() {
a++
}
func main() {
for i := 0; i < 500; i++ {
go Add()
}
time.Sleep(5 * time.Second)
fmt.Println(a)
}
此代碼的結果是不可預測的。如果幸運的話,您可能會得到 500,但通常結果會小於 500。現在,讓我們使用互斥體增強我們的 Add 函數:
var mtx = sync.Mutex{}
func Add() {
mtx.Lock()
defer mtx.Unlock()
a++
}
現在,代碼提供了預期的結果。但是使用 sync.RWMutex 呢?
爲什麼使用 sync.RWMutex?
想象一下,您正在檢查 a 變量,但其他 goroutines 也在調整它。您可能會得到過時的信息。那麼,解決這個問題的方法是什麼?
讓我們退後一步,使用我們的舊方法,將 sync.Mutex 添加到我們的 Get() 函數中:
func Add() {
mtx.Lock()
defer mtx.Unlock()
a++
}
func Get() int {
mtx.Lock()
defer mtx.Unlock()
return a
}
但這裏的問題是,如果您的服務或程序調用 Get() 數百萬次而只調用 Add() 幾次,那麼我們實際上是在浪費資源,因爲我們大部分時間甚至都沒有修改它而將所有內容都鎖定了。
這就是 sync.RWMutex 突然出現來拯救我們的一天,這個聰明的小工具旨在幫助我們處理同時讀取和寫入的情況。
var mtx = sync.RWMutex{}
func Add() {
mtx.Lock()
defer mtx.Unlock()
a++
}
func Look() {
mtx.RLock()
defer mtx.RUnlock()
fmt.Println(a)
}
那麼,RWMutex 有什麼了不起的呢?好吧,它允許數百萬次併發讀取,同時確保一次只能進行一次寫入。讓我澄清一下它是如何工作的:
-
寫入時,讀取被鎖定。
-
讀取時,寫入被鎖定。
-
多次讀取不會相互鎖定。
sync.Locker
哦對了,Mutex 和 RWMutex 都實現了 sync.Locker 接口 {},簽名是這樣的:
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
如果你想創建一個接受 Locker 的函數,你可以將這個函數與你的自定義 locker 或同步互斥鎖一起使用:
func Add(mtx sync.Locker) {
mtx.Lock()
defer mtx.Unlock()
a++
}
2. sync.WaitGroup
您可能已經注意到我使用了 time.Sleep(5 * time.Second) 來等待所有 goroutine 完成,但老實說,這是一個非常醜陋的解決方案。
這就是 sync.WaitGroup 出現的地方:
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 500; i++ {
wg.Add(1)
go func() {
defer wg.Done()
Add()
}()
}
wg.Wait()
fmt.Println(a)
}
sync.WaitGroup 有 3 個主要方法:Add、Done 和 Wait。
首先是 Add(delta int):此方法將 WaitGroup 計數器增加 delta 的值。你通常會在生成 goroutine 之前調用它,表示有一個額外的任務需要完成。
如果我們將 WaitGroup 放在 go func() {} 中,您認爲會發生什麼?
go func() {
wg.Add(1)
defer wg.Done()
Add()
}()
我的編譯器喊道,“應該在啓動 goroutine 之前調用 wg.Add(1) 以避免競爭”,我的運行時出現恐慌,“panic: sync: WaitGroup is reused before previous Wait has returned”。
其他兩種方法非常簡單:
-
當一個 goroutine 結束它的任務時, Done 被調用。
-
Wait 會阻塞調用者,直到 WaitGroup 計數器歸零,這意味着所有派生的 goroutine 都已完成它們的任務。
3. sync.Once
假設您在一個包中有一個 CreateInstance() 函數,但您需要確保它在使用前已初始化。所以你在不同的地方多次調用它,你的實現看起來像這樣:
var i = 0
var _isInitialized = false
func CreateInstance() {
if _isInitialized {
return
}
i = GetISomewhere()
_isInitialized = true
}
但是如果有多個 goroutine 調用這個方法呢?i = GetISomeWhere 行會運行多次,即使您爲了穩定性只希望它執行一次。
您可以使用我們之前討論過的互斥鎖,但同步包提供了一種更方便的方法:sync.Once
var i = 0
var once = &sync.Once{}
func CreateInstance() {
once.Do(func() {
i = GetISomewhere()
})
}
使用 sync.Once,你可以確保一個函數只執行一次,不管它被調用了多少次或者有多少 goroutines 同時調用它。
4. sync.Pool
想象一下,你有一個池,裏面有一堆你想反覆使用的對象。這可以減輕垃圾收集器的一些壓力,尤其是在創建和銷燬這些資源的成本很高的情況下。
所以,無論何時你需要一個對象,你都可以從池中取出它。當您使用完它時,您可以將它放回池中以備日後重複使用。
var pool = sync.Pool{
New: func() interface{} {
return 0
},
}
func main() {
pool.Put(1)
pool.Put(2)
pool.Put(3)
a := pool.Get().(int)
b := pool.Get().(int)
c := pool.Get().(int)
fmt.Println(a, b, c) // Output: 1, 3, 2 (order may vary)
}
請記住,將對象放入池中的順序不一定是它們出來的順序,即使多次運行上述代碼時順序也是隨機。
讓我分享一些使用 sync.Pool 的技巧:
-
它非常適合長期存在並且有多個實例需要管理的對象,例如數據庫連接(1000 個連接?)、worker goroutine,甚至緩衝區。
-
在將對象返回池之前始終重置對象的狀態。這樣,您可以避免任何無意的數據泄漏或奇怪的行爲。
-
不要指望池中已經存在的對象,因爲它們可能會意外釋放。
5. sync.Map
當您同時使用 map 時,有點像使用 RWMutex。您可以同時進行多次讀取,但不能進行多次讀寫或寫入。如果存在衝突,您的服務將崩潰而不是覆蓋數據或導致意外行爲。
這就是 sync.Map 派上用場的地方,因爲它可以幫助我們避免這個問題。讓我們仔細看看 sync.Map 給我們提供什麼:
-
CompareAndDelete (go 1.20):如果值匹配則刪除鍵的條目;如果不存在值或舊值爲 nil,則返回 false。
-
CompareAndSwap(go 1.20):如果新舊值匹配,則交換一個鍵,只要確保舊值是可比較的。
-
Swap (go 1.20):交換鍵的值並返回舊值(如果存在)。
-
LoadOrStore:獲取當前鍵值或保存並返回提供的值(如果不存在)
-
Range (f func(key, value any):遍歷映射,將函數 f 應用於每個鍵值對。如果 f 說返回 false,它會停止。
-
Store
-
Delete
-
Load
-
LoadAndDelete
Q: 我們爲什麼不使用帶有 Mutex 的常規 map 呢?
我通常選擇帶有 RWMutex 的 map,但在某些情況下認識到 sync.Map 的強大功能很重要。那麼,它真正發光的地方在哪裏呢?
如果您有許多 goroutines 訪問 map 中的單獨鍵,則具有單個互斥鎖的常規 map 可能會導致爭用,因爲它僅針對單個寫操作鎖定整個 map。
另一方面,sync.Map 使用更完善的鎖定機制,有助於最大限度地減少此類場景中的爭用。
6. sync.Cond
將 sync.Cond 視爲支持多個 goroutine 等待和相互交互的條件變量。爲了更好地理解,讓我們看看如何使用它。
首先,我們需要創建帶有 Locker 的 sync.Cond:
var mtx sync.Mutex
var cond = sync.NewCond(&mtx)
goroutine 調用 cond.Wait 並等待來自其他地方的信號以繼續執行:
func dummyGoroutine(id int) {
cond.L.Lock()
defer cond.L.Unlock()
fmt.Printf("Goroutine %d is waiting...\n", id)
cond.Wait()
fmt.Printf("Goroutine %d received the signal.\n", id)
}
然後,另一個 goroutine(就像主 goroutine)調用 cond.Signal(),讓我們等待的 goroutine 繼續:
func main() {
go dummyGoroutine(1)
time.Sleep(1 * time.Second)
fmt.Println("Sending signal...")
cond.Signal()
time.Sleep(1 * time.Second)
}
結果如下所示:
Goroutine 1 is waiting...
Sending signal...
Goroutine 1 received the signal.
如果有多個 goroutines 在等待我們的信號怎麼辦?這就是我們可以使用廣播的時候:
func main() {
go dummyGoroutine(1)
go dummyGoroutine(2)
time.Sleep(1 * time.Second)
cond.Broadcast() // broadcast to all goroutines
time.Sleep(1 * time.Second)
}
結果如下所示:
Goroutine 1 is waiting...
Goroutine 2 is waiting...
Goroutine 2 received the signal.
Goroutine 1 received the signal.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/iPpWd8vjyaN2sJFwxzN9Bg