Go 語言有緩衝 channel 和無緩衝 channel

Go 中的 channel 十分強大,理解 channel 的內部機制後再去使用它可以發揮出更大威力。另外,選擇使用有緩衝 channel 還是無緩衝 channel 會影響到我們程序的行爲表現,以及性能。

無緩衝 channel

無緩衝 channel 在消息發送時需要接收者就緒。聲明無緩衝 channel 的方式是不指定緩衝大小。以下是一個列子:

package main

import (
 "sync"
 "time"
)

func main() {
 c := make(chan string)

 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
  defer wg.Done()
  c <- `foo`
 }()

 go func() {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  println(`Message: `+ <-c)
 }()

 wg.Wait()
}

第一個協程會在發送消息foo時阻塞,原因是接收者還沒有就緒:這個特性在標準文檔中描述如下:

如果緩衝大小設置爲 0 或者不設置,channel 爲無緩衝類型,通信成功的前提是發送者和接收者都處於就緒狀態。

effective Go 文檔也有相應的描述:

無緩衝 channel,發送者會阻塞直到接收者接收了發送的值。

爲了更好的理解 channel 的特性,接下來我們分析 channel 的內部結構。

內部結構

channel 的結構體hchan被定義在runtime包中的chan.go文件中。以下是無緩衝 channel 的內部結構(本小節先介紹無緩衝 channel,所以暫時忽略了hchan結構體中和緩衝相關的屬性):

channel 中持有兩個鏈表,接收者鏈表recvq和發送者鏈表sendq,它們的類型是waitq。鏈表中的元素爲sudog結構體類型,它包含了發送者或接收者的協程相關的信息。通過這些信息,Go 可以在發送者不存在時阻塞住接收者,反之亦然。

以下是我們前一個例子的流程:

  1. 創建一個發送者列表和接收者列表都爲空的 channel。

  2. 第一個協程向 channel 發送foo變量的值,第 16 行。

  3. channel 從池中獲取一個sudog結構體變量,用於表示發送者。sudog 結構體會保持對發送者所在協程的引用,以及foo的引用。

  4. 發送者加入sendq隊列。

  5. 發送者協程進入等待狀態。

  6. 第二個協程將從 channel 中讀取一個消息,第 23 行。

  7. channel 將sendq列表中等待狀態的發送者出隊列。

  8. chanel 使用memmove函數將發送者要發送的值進行拷貝,包裝入sudog結構體,再傳遞給 channel 接收者的接收變量。

  9. 在第五步中被掛起的第一個協程將恢復運行並釋放第三步中獲取的sudog結構體。

如流程所描述,發送者協程阻塞直至接收者就緒。但是,必要的時候,我們可以使用有緩衝 channel 來避免這種阻塞。

有緩衝 channel

簡單修改前面的例子,爲 channel 添加緩衝,如下:

package main

import (
 "sync"
 "time"
)

func main() {
 c := make(chan string, 2)

 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
  defer wg.Done()

  c <- `foo`
  c <- `bar`
 }()

 go func() {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  println(`Message: `+ <-c)
  println(`Message: `+ <-c)
 }()

 wg.Wait()
}

通過這個例子,我們來分析hchan結構體中與緩衝相關的屬性:

緩衝相關的五個屬性:

通過sendxrecvx,緩衝區工作機制類似於環形隊列:

環形隊列使得我們可以保證緩衝區有序,並且不需要在每次取出元素時對緩衝區重新排序。

當緩衝區滿了時,向緩衝區添加元素的協程將被加入sender鏈表中,並且切換到等待狀態,就像我們在上一節描述的那樣。之後,當程序讀取緩衝區時,recvx位置的元素將被返回,等待狀態的協程將恢復執行,它要發送的值將被存入緩衝區。這使得 channel 能夠保證先進先出的特性。

緩存區不足引起的延時

創建 channel 時指定的緩衝區大小,可能會對性能造成巨大的影響。下面是對不同緩衝區大小的 channel 做的壓力測試代碼:

package bench

import (
 "sync"
 "sync/atomic"
 "testing"
)

func BenchmarkWithNoBuffer(b *testing.B) {
 benchmarkWithBuffer(b, 0)
}

func BenchmarkWithBufferSizeOf1(b *testing.B) {
 benchmarkWithBuffer(b, 1)
}

func BenchmarkWithBufferSizeEqualsToNumberOfWorker(b *testing.B) {
 benchmarkWithBuffer(b, 5)
}

func BenchmarkWithBufferSizeExceedsNumberOfWorker(b *testing.B) {
 benchmarkWithBuffer(b, 25)
}

func benchmarkWithBuffer(b *testing.B, size int) {
 for i := 0; i < b.N; i++ {
  c := make(chan uint32, size)

  var wg sync.WaitGroup
  wg.Add(1)

  go func() {
   defer wg.Done()

   for i := uint32(0); i < 1000; i++ {
    c <- i%2
   }
   close(c)
  }()

  var total uint32
  for w := 0; w < 5; w++ {
   wg.Add(1)
   go func() {
    defer wg.Done()

    for {
     v, ok := <-c
     if !ok {
      break
     }
     atomic.AddUint32(&total, v)
    }
   }()
  }

  wg.Wait()
 }
}

在這個測試程序中,包含一個生產者,向 channel 中發送整型元素;包含多個消費者,從 channel 中讀取數據,並將它們原子的加入變量total中。

運行這個測試十次,並通過benchstat分析結果:

name                                    time/op
WithNoBuffer-8                          306µs ± 3%
WithBufferSizeOf1-8                     248µs ± 1%
WithBufferSizeEqualsToNumberOfWorker-8  183µs ± 4%
WithBufferSizeExceedsNumberOfWorker-8   134µs ± 2%

說明合適的緩衝區大小確實會使得程序執行得更快!讓我們來分析測試程序以確認耗時反生在何處。

追蹤耗時

通過 Go 工具 trace 中的synchronization blocking profile來查看測試程序被同步原語阻塞所消耗的時間。接收時的耗時對比:無緩衝 channel 爲 9 毫秒,緩衝大小爲 50 的 channel 爲 1.9 毫秒。

發送時的耗時對比:有緩衝 channel 將耗時縮小了五倍。

可以得出結論,緩衝區的大小確實在程序性能方面扮演了重要角色。

轉自:

https://zhuanlan.zhihu.com/p/101063277

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/fibHNVcoofepVWE9C9nzHQ