從 Go channel 中批量讀取數據

有時候批量積攢一批數據集中處理,是一個高效的提高程序性能的方法,比如我們可以批量寫入數據庫,批量發送消息到 kafka,批量寫入網絡數據等等。 批量把數據收集出來,我們常用 channel 類型,此時 channel 的功能就是一個 buffer, 多個生產者把數據寫入到 channel 中,消費者從 channel 中讀取數據,但是 Go 的 channel 並沒有提供批量讀取的方法,我們需要自己實現一個。

github.com/smallnest/exp/chanx 庫

當然我已經實現了一個 batch 庫,你可以直接拿來用,本文主要介紹它的功能、使用方法以及設計原理和考量:github.com/smallnest/exp/chanx[1]。

我們可以使用這個庫的Batch方法來批量讀取數據,它的定義如下:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]"T any"))

舉一個例子:

func TestBatch(t *testing.T) {
 ch := make(chan int, 10)
 for i := 0; i < 10; i++ {
  ch <- i
 }

 count := 0
 go Batch[int](context.Background( "int"), ch, 5, func(batch []int) {
  if len(batch) != 5 {
   assert.Fail(t, "expected batch size 5, got %d", len(batch))
  }
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 10, count)
}

這個例子一開始我們把 10 個數據寫入到一個 channel 中,然後我們從 channel 中批量讀取,每次讀取 5 個,然後把這 5 個數據傳遞給一個函數來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數據。

我們還可以使用FlatBatch方法來批量讀取批量數據,它的定義如下:

func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]"T any"))

這個函數和Batch類似,只不過它的 channel 中的數據是一個切片,每次從 channel 中讀取到一個切片後,把這個切片中的數據展開放入到一批數據中,最後再傳遞給處理函數。所以它有FlatBatch兩個功能。

舉一個例子:

func TestFlatBatch(t *testing.T) {
 ch := make(chan []int, 10)
 for i := 0; i < 10; i++ {
  ch <- []int{i, i}
 }

 count := 0
 go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) {
  assert.NotEmpty(t, batch)
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 20, count)
}

在這個例子中,我們把 10 個切片寫入到 channel 中,每個切片中有兩個元素,然後我們從 channel 中批量讀取並展開,放入到一個 batch 中,如果 batch 中的數據大於貨等於 5 個,就把這 5 個數據傳遞給一個函數來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數據。

實現原理和考量

想要從 channel 中批量讀取數據,我們需要考慮以下幾個問題:

  1. 我們需要設定一個批處理的大小,不能無限制的讀取而不處理,否則會把消費者餓死,內存也會爆表

  2. 從 channel 中讀取數據的時候,如果 channel 中沒有數據,我們需要等待,直到 channel 中有數據,或者 channel 被關閉。

  3. 不能無限制的等待,或者長時間的等待,否則消費者會飢餓,而且時延太長業務不允許

我先舉一個簡單但是不太好的實現方式,我們在它的基礎上做優化:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]"T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }

   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  }
 }
}

這個實現中我們使用了一個batch變量來保存從 channel 中讀取的數據,當batch中的數據量達到batchSize時,我們就把這個batch傳遞給處理函數,然後清空batch,繼續讀取數據。

這個實現的一個最大的問題就是,如果 channel 中沒有數據,並且當前 batch 的數量還未達到預期, 我們就會一直等待,直到 channel 中有數據,或者 channel 被關閉,這樣會導致消費者飢餓。

我們可以使用select語句來解決這個問題,我們可以在select語句中加入一個default分支,當 channel 中沒有數據的時候,就會執行default分支以便在 channel 中沒有數據的時候,我們能夠把已讀取到的數據也能交給函數 fn 去處理。

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]"T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        default:
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}

這個實現貌似解決了消費者飢餓的問題,但是也會帶來一個新的問題,如果 channel 中總是沒有數據,那麼我們總是落入default分支中,導致 CPU 空轉,這個 goroutine 可能導致 CPU 佔用 100%, 這樣也不行。

有些人會使用time.After來解決這個問題,我們可以在select語句中加入一個time.After分支,當 channel 中沒有數據的時候,就會執行time.After分支,這樣我們就可以在 channel 中沒有數據的時候,等待一段時間,如果還是沒有數據,就把已讀取到的數據也能交給函數 fn 去處理。

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]"T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        case <-time.After(100 * time.Millisecond):
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}

這樣貌似解決了 CPU 空轉的問題,如果你測試這個實現,生產者在生產數據很慢的時候,程序的 CPU 的確不會佔用 100%。 但是正如有經驗的 Gopher 意識到的那樣,這個實現還是有問題的,如果生產者生產數據的速度很快,而消費者處理數據的速度很慢,那麼我們就會產生大量的Timer, 這些 Timer 不能及時的被回收,可能導致大量的內存佔用,而且如果有大量的 Timer, 也會導致 Go 運行時處理 Timer 的性能。

這裏我提出一個新的解決辦法,在這個庫中實現了,我們不應該使用time.After,因爲time.After既帶來了性能的問題,還可能導致它在休眠的時候不能及時讀取 channel 中的數據,導致業務時延增加。

最終的實現如下:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]"T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }

   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  default:
   if len(batch) > 0 { // partial
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   } else { // empty
    // wait for more
    select {
    case <-ctx.Done():
     if len(batch) > 0 {
      fn(batch)
     }
     return
    case v, ok := <-ch:
     if !ok {
      return
     }

     batch = append(batch, v)
    }

   }
  }
 }
}

這個實現的巧妙之處在於default出來。

如果代碼運行落入到default分支,說明當前 channel 中沒有數據可讀。那麼它會檢查當前的batch中是否有數據,如果有,就把這個batch傳遞給處理函數,然後清空batch,繼續讀取數據。這樣已讀取的數據能夠及時得到處理。

如果當前的batch中沒有數據,那麼它會再次進入select語句,等待 channel 中有數據,或者 channel 被關閉,或者ctx被取消。如果 channel 中沒有數據,那麼它會被阻塞,直到 channel 中有數據,或者 channel 被關閉,或者ctx被取消。這樣就能夠及時的讀取 channel 中的數據,而不會導致 CPU 空轉。

通過在default分支中的特殊處理,我們就可以低時延高效的從 channel 中批量讀取數據了。

參考資料

[1]

github.com/smallnest/exp/chanx: https://github.com/smallnest/exp/blob/master/chanx/batcher.go

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/nHzlQnZJH-9AMC6iXIxgug