Go Sync 包:併發的 6 個關鍵概念

1.sync.Mutex 和 sync.RWMutex

要知道,mutex(互斥)對於我們 gopher 來說就像一個老夥計。在處理 goroutine 時,確保它們不會同時訪問資源是非常重要的,而 mutex 可以幫助我們做到這一點。

sync.Mutex

看看這個簡單的例子,我沒有使用互斥鎖來保護我們的變量 a

var a = 0

func Add() {
  a++
}

func main() {
  for i := 0; i < 500; i++ {
    go Add()
  }

  time.Sleep(5 * time.Second)
  fmt.Println(a)
}

此代碼的結果是不可預測的。如果幸運的話,您可能會得到 500,但通常結果會小於 500。現在,讓我們使用互斥體增強我們的 Add 函數:

var mtx = sync.Mutex{}

func Add() {
  mtx.Lock()
  defer mtx.Unlock()
  a++
}

現在,代碼提供了預期的結果。但是使用 sync.RWMutex 呢?

爲什麼使用 sync.RWMutex?

想象一下,您正在檢查 a 變量,但其他 goroutines 也在調整它。您可能會得到過時的信息。那麼,解決這個問題的方法是什麼?

讓我們退後一步,使用我們的舊方法,將 sync.Mutex 添加到我們的 Get() 函數中:

func Add() {
  mtx.Lock()
  defer mtx.Unlock()

  a++
}

func Get() int {
  mtx.Lock()
  defer mtx.Unlock()

  return a
}

但這裏的問題是,如果您的服務或程序調用 Get() 數百萬次而只調用 Add() 幾次,那麼我們實際上是在浪費資源,因爲我們大部分時間甚至都沒有修改它而將所有內容都鎖定了。

這就是 sync.RWMutex 突然出現來拯救我們的一天,這個聰明的小工具旨在幫助我們處理同時讀取和寫入的情況。

var mtx = sync.RWMutex{}

func Add() {
  mtx.Lock()
  defer mtx.Unlock()
  
  a++
}

func Look() {
  mtx.RLock()
  defer mtx.RUnlock()
  
  fmt.Println(a)
}

那麼,RWMutex 有什麼了不起的呢?好吧,它允許數百萬次併發讀取,同時確保一次只能進行一次寫入。讓我澄清一下它是如何工作的:

sync.Locker

哦對了,Mutex 和 RWMutex 都實現了 sync.Locker 接口 {},簽名是這樣的:

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
  Lock()
  Unlock()
}

如果你想創建一個接受 Locker 的函數,你可以將這個函數與你的自定義 locker 或同步互斥鎖一起使用:

func Add(mtx sync.Locker) {
  mtx.Lock()
  defer mtx.Unlock()
  
  a++
}

2. sync.WaitGroup

您可能已經注意到我使用了 time.Sleep(5 * time.Second) 來等待所有 goroutine 完成,但老實說,這是一個非常醜陋的解決方案。

這就是 sync.WaitGroup 出現的地方:

func main() {
  wg := sync.WaitGroup{}
  for i := 0; i < 500; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      Add()
    }()
  }
  
  wg.Wait()
  fmt.Println(a)
}

sync.WaitGroup 有 3 個主要方法:Add、Done 和 Wait。

首先是 Add(delta int):此方法將 WaitGroup 計數器增加 delta 的值。你通常會在生成 goroutine 之前調用它,表示有一個額外的任務需要完成。

如果我們將 WaitGroup 放在 go func() {} 中,您認爲會發生什麼?

go func() {
  wg.Add(1)
  defer wg.Done()
  Add()
}()

我的編譯器喊道,“應該在啓動 goroutine 之前調用 wg.Add(1) 以避免競爭”,我的運行時出現恐慌,“panic: sync: WaitGroup is reused before previous Wait has returned”。

其他兩種方法非常簡單:

3. sync.Once

假設您在一個包中有一個 CreateInstance() 函數,但您需要確保它在使用前已初始化。所以你在不同的地方多次調用它,你的實現看起來像這樣:

var i = 0
var _isInitialized = false

func CreateInstance() {
  if _isInitialized {
    return
  }
  
  i = GetISomewhere()
  _isInitialized = true
}

但是如果有多個 goroutine 調用這個方法呢?i = GetISomeWhere 行會運行多次,即使您爲了穩定性只希望它執行一次。

您可以使用我們之前討論過的互斥鎖,但同步包提供了一種更方便的方法:sync.Once

var i = 0
var once = &sync.Once{}

func CreateInstance() {
  once.Do(func() {
    i = GetISomewhere()
  })
}

使用 sync.Once,你可以確保一個函數只執行一次,不管它被調用了多少次或者有多少 goroutines 同時調用它。

4. sync.Pool

想象一下,你有一個池,裏面有一堆你想反覆使用的對象。這可以減輕垃圾收集器的一些壓力,尤其是在創建和銷燬這些資源的成本很高的情況下。

所以,無論何時你需要一個對象,你都可以從池中取出它。當您使用完它時,您可以將它放回池中以備日後重複使用。

var pool = sync.Pool{
  New: func() interface{} {
    return 0
  },
}

func main() {
  pool.Put(1)
  pool.Put(2)
  pool.Put(3)
  
  a := pool.Get().(int)
  b := pool.Get().(int)
  c := pool.Get().(int)
  
  fmt.Println(a, b, c) // Output: 1, 3, 2 (order may vary)
}

請記住,將對象放入池中的順序不一定是它們出來的順序,即使多次運行上述代碼時順序也是隨機。

讓我分享一些使用 sync.Pool 的技巧:

5. sync.Map

當您同時使用 map 時,有點像使用 RWMutex。您可以同時進行多次讀取,但不能進行多次讀寫或寫入。如果存在衝突,您的服務將崩潰而不是覆蓋數據或導致意外行爲。

這就是 sync.Map 派上用場的地方,因爲它可以幫助我們避免這個問題。讓我們仔細看看 sync.Map 給我們提供什麼:

Q: 我們爲什麼不使用帶有 Mutex 的常規 map 呢?

我通常選擇帶有 RWMutex 的 map,但在某些情況下認識到 sync.Map 的強大功能很重要。那麼,它真正發光的地方在哪裏呢?

如果您有許多 goroutines 訪問 map 中的單獨鍵,則具有單個互斥鎖的常規 map 可能會導致爭用,因爲它僅針對單個寫操作鎖定整個 map。

另一方面,sync.Map 使用更完善的鎖定機制,有助於最大限度地減少此類場景中的爭用。

6. sync.Cond

將 sync.Cond 視爲支持多個 goroutine 等待和相互交互的條件變量。爲了更好地理解,讓我們看看如何使用它。

首先,我們需要創建帶有 Locker 的 sync.Cond:

var mtx sync.Mutex
var cond = sync.NewCond(&mtx)

goroutine 調用 cond.Wait 並等待來自其他地方的信號以繼續執行:

func dummyGoroutine(id int) {
  cond.L.Lock()
  defer cond.L.Unlock()
  fmt.Printf("Goroutine %d is waiting...\n", id)
  cond.Wait()
  fmt.Printf("Goroutine %d received the signal.\n", id)
}

然後,另一個 goroutine(就像主 goroutine)調用 cond.Signal(),讓我們等待的 goroutine 繼續:

func main() {
  go dummyGoroutine(1)
  
  time.Sleep(1 * time.Second)

  fmt.Println("Sending signal...")
  cond.Signal()

  time.Sleep(1 * time.Second)
}

結果如下所示:

Goroutine 1 is waiting...
Sending signal...
Goroutine 1 received the signal.

如果有多個 goroutines 在等待我們的信號怎麼辦?這就是我們可以使用廣播的時候:

func main() {
  go dummyGoroutine(1)
  go dummyGoroutine(2)
  
  time.Sleep(1 * time.Second)
  cond.Broadcast() // broadcast to all goroutines
  time.Sleep(1 * time.Second)
}

結果如下所示:

Goroutine 1 is waiting...
Goroutine 2 is waiting...
Goroutine 2 received the signal.
Goroutine 1 received the signal.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/iPpWd8vjyaN2sJFwxzN9Bg