從 Go channel 中批量讀取數據
有時候批量積攢一批數據集中處理,是一個高效的提高程序性能的方法,比如我們可以批量寫入數據庫,批量發送消息到 kafka,批量寫入網絡數據等等。 批量把數據收集出來,我們常用 channel 類型,此時 channel 的功能就是一個 buffer, 多個生產者把數據寫入到 channel 中,消費者從 channel 中讀取數據,但是 Go 的 channel 並沒有提供批量讀取的方法,我們需要自己實現一個。
github.com/smallnest/exp/chanx 庫
當然我已經實現了一個 batch 庫,你可以直接拿來用,本文主要介紹它的功能、使用方法以及設計原理和考量:github.com/smallnest/exp/chanx[1]。
我們可以使用這個庫的Batch
方法來批量讀取數據,它的定義如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))
-
第一個參數是
Context
, 可以讓調用者主動取消或者超時控制 -
第二個參數是 channel,我們從這個 channel 中讀取數據。channel 可以在外部被關閉
-
第三個參數是批處理的大小,也就是我們從 channel 中讀取一批數據的最大量
-
第四個參數是一個函數,我們把從 channel 中讀取的一批數據傳遞給這個函數,由這個函數來處理這批數據
舉一個例子:
func TestBatch(t *testing.T) {
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
count := 0
go Batch[int](context.Background( "int"), ch, 5, func(batch []int) {
if len(batch) != 5 {
assert.Fail(t, "expected batch size 5, got %d", len(batch))
}
count += len(batch)
})
time.Sleep(time.Second)
close(ch)
assert.Equal(t, 10, count)
}
這個例子一開始我們把 10 個數據寫入到一個 channel 中,然後我們從 channel 中批量讀取,每次讀取 5 個,然後把這 5 個數據傳遞給一個函數來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數據。
我們還可以使用FlatBatch
方法來批量讀取批量數據,它的定義如下:
func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))
這個函數和Batch
類似,只不過它的 channel 中的數據是一個切片,每次從 channel 中讀取到一個切片後,把這個切片中的數據展開放入到一批數據中,最後再傳遞給處理函數。所以它有Flat
和Batch
兩個功能。
舉一個例子:
func TestFlatBatch(t *testing.T) {
ch := make(chan []int, 10)
for i := 0; i < 10; i++ {
ch <- []int{i, i}
}
count := 0
go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) {
assert.NotEmpty(t, batch)
count += len(batch)
})
time.Sleep(time.Second)
close(ch)
assert.Equal(t, 20, count)
}
在這個例子中,我們把 10 個切片寫入到 channel 中,每個切片中有兩個元素,然後我們從 channel 中批量讀取並展開,放入到一個 batch 中,如果 batch 中的數據大於貨等於 5 個,就把這 5 個數據傳遞給一個函數來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數據。
實現原理和考量
想要從 channel 中批量讀取數據,我們需要考慮以下幾個問題:
-
我們需要設定一個批處理的大小,不能無限制的讀取而不處理,否則會把消費者餓死,內存也會爆表
-
從 channel 中讀取數據的時候,如果 channel 中沒有數據,我們需要等待,直到 channel 中有數據,或者 channel 被關閉。
-
不能無限制的等待,或者長時間的等待,否則消費者會飢餓,而且時延太長業務不允許
我先舉一個簡單但是不太好的實現方式,我們在它的基礎上做優化:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}
這個實現中我們使用了一個batch
變量來保存從 channel 中讀取的數據,當batch
中的數據量達到batchSize
時,我們就把這個batch
傳遞給處理函數,然後清空batch
,繼續讀取數據。
這個實現的一個最大的問題就是,如果 channel 中沒有數據,並且當前 batch 的數量還未達到預期, 我們就會一直等待,直到 channel 中有數據,或者 channel 被關閉,這樣會導致消費者飢餓。
我們可以使用select
語句來解決這個問題,我們可以在select
語句中加入一個default
分支,當 channel 中沒有數據的時候,就會執行default
分支以便在 channel 中沒有數據的時候,我們能夠把已讀取到的數據也能交給函數 fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
default:
if len(batch) > 0 {
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}
這個實現貌似解決了消費者飢餓的問題,但是也會帶來一個新的問題,如果 channel 中總是沒有數據,那麼我們總是落入default
分支中,導致 CPU 空轉,這個 goroutine 可能導致 CPU 佔用 100%, 這樣也不行。
有些人會使用time.After
來解決這個問題,我們可以在select
語句中加入一個time.After
分支,當 channel 中沒有數據的時候,就會執行time.After
分支,這樣我們就可以在 channel 中沒有數據的時候,等待一段時間,如果還是沒有數據,就把已讀取到的數據也能交給函數 fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
case <-time.After(100 * time.Millisecond):
if len(batch) > 0 {
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}
這樣貌似解決了 CPU 空轉的問題,如果你測試這個實現,生產者在生產數據很慢的時候,程序的 CPU 的確不會佔用 100%。 但是正如有經驗的 Gopher 意識到的那樣,這個實現還是有問題的,如果生產者生產數據的速度很快,而消費者處理數據的速度很慢,那麼我們就會產生大量的Timer
, 這些 Timer 不能及時的被回收,可能導致大量的內存佔用,而且如果有大量的 Timer, 也會導致 Go 運行時處理 Timer 的性能。
這裏我提出一個新的解決辦法,在這個庫中實現了,我們不應該使用time.After
,因爲time.After
既帶來了性能的問題,還可能導致它在休眠的時候不能及時讀取 channel 中的數據,導致業務時延增加。
最終的實現如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
default:
if len(batch) > 0 { // partial
fn(batch)
batch = make([]T, 0, batchSize) // reset
} else { // empty
// wait for more
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok {
return
}
batch = append(batch, v)
}
}
}
}
}
這個實現的巧妙之處在於default
出來。
如果代碼運行落入到default
分支,說明當前 channel 中沒有數據可讀。那麼它會檢查當前的batch
中是否有數據,如果有,就把這個batch
傳遞給處理函數,然後清空batch
,繼續讀取數據。這樣已讀取的數據能夠及時得到處理。
如果當前的batch
中沒有數據,那麼它會再次進入select
語句,等待 channel 中有數據,或者 channel 被關閉,或者ctx
被取消。如果 channel 中沒有數據,那麼它會被阻塞,直到 channel 中有數據,或者 channel 被關閉,或者ctx
被取消。這樣就能夠及時的讀取 channel 中的數據,而不會導致 CPU 空轉。
通過在default
分支中的特殊處理,我們就可以低時延高效的從 channel 中批量讀取數據了。
參考資料
[1]
github.com/smallnest/exp/chanx: https://github.com/smallnest/exp/blob/master/chanx/batcher.go
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/nHzlQnZJH-9AMC6iXIxgug