Go 中祕而不宣的數據結構 spmc- 10 倍性能於 channel
Go 標準庫和運行中中,有一些專門針對特定場景優化的數據結構,這些數據結構並沒有暴露出來,_這個系列_就是逐一介紹這些數據結構。
這一次給大家介紹的就是一個 lock-free、高性能的單生產者多消費者的隊列:PoolDequeue 和 PoolChain。 到底是一個還是兩個呢? 主要是 PoolDequeue, 它是一個固定尺寸,使用 ringbuffer (環形隊列) 方式實現的隊列。PoolChain 是在它的基礎上上,實現的一個動態尺寸的隊列。
生產者消費者模式是常見的一種併發模式,根據生產者的數量和消費者的數量,可以分爲四種情況:
-
單生產者 - 單消費者模式: spsc
-
單生產者 - 多消費者模式: spmc
-
多生產者 - 單消費者模式: mpsc
-
多生產者 - 多消費者模式: mpmc
Channel 基本上可以看做是一種多生產者多消費者模式的隊列。可以同時允許多個生產者發送數據,有可以允許多個消費者消費數據,它也可以應用在其他模式的場景,比如 rpc 包中的 oneshot 模式、通知情況下的的單生產者多消費者模式、rpc 和服務端單連接通訊時的消息處理,就是多生產者單消費者模式。
但是 Go 標準庫的 sync 包下,有一個針對單生產者多消費者的數據結構,它是一個 lock-free 的數據結構,針對這個場景做了優化,被使用在 sync.Pool 中。
sync.Pool 採用了一種類似 Go 運行時調度的機制,針對每個 p 有一個 private 的數據,同時還有一個 shared 的數據,如果在本地 private、shared 中沒有數據,就去其他 P 對應的 shared 去偷取。難麼同時可能有多個 P 偷取同一個 shared, 這是多消費者。
同時對 shared 的寫只有它隸屬的 p 執行 Put 的時候纔會發生:
l, _ := p.pin()
if l.private == nil {
l.private = x
} else {
l.shared.pushHead(x)
}
runtime_procUnpin()
這有屬於單生產者模式。sync.Pool 使用了 PoolDequeue 和 PoolChain 來做優化。
首先我們先來了解 poolDequeue。
poolDequeue
poolDequeue 是一個 lock-free 的數據結構,必然會使用 atomic, 同時它要求必須使用單生產者,否則會有併發問題。消費者可以是併發多個,當然你用一個也沒問題。
其中,生產者可以使用下面的方法:
-
pushHead: 在隊列頭部新增加一個數據。如果隊列滿了,增加失敗
-
popHead: 在隊列頭部彈出一個數據。生產者總是彈出新增加的數據,除非隊列爲空
消費者可以使用下面的一個方法:
- popTail: 從隊尾處彈出一個數據,除非隊列爲空。所以消費者總是消費最老的數據,這也正好符合大部分的場景
接下來就是分析代碼了,有點枯燥,你可以跳過。
代碼分析
首先我們看這個 struct 的定義:
type poolDequeue struct {
headTail atomic.Uint64
vals []eface
}
這裏有兩個重要的字段:
-
headTail: 一個atomic.Uint64類型的字段,它的高 32 位是head,低 32 位是tail。head是下一個要填充的位置,tail是最老的數據的位置。 -
vals: 一個eface類型的切片,它是一個環形隊列,大小必須是 2 的冪次方。
生產者增加數據的邏輯如下:
func (d *poolDequeue) pushHead(val any) bool {
ptrs := d.headTail.Load()
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// 隊列滿
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 檢查 head slot 是否被 popTail 釋放
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// 另一個 goroutine 正在清理 tail,所以隊列還是滿的
return false
}
// 如果值爲空,那麼設置一個特殊值
if val == nil {
val = dequeueNil(nil)
}
// 隊列頭是空的,將數據寫入 slot
*(*any)(unsafe.Pointer(slot)) = val // ①
// 增加 head,這樣 popTail 就可以消費這個 slot 了
// 同時也是一個 store barrier,保證了 slot 的寫入
d.headTail.Add(1 << dequeueBits)
return true
}
① 處會有併發問題嗎?萬一有兩個 goroutine 同時執行到這裏,會不會有問題?這裏沒有問題,因爲要求只有一個生產者,不會有另外一個 goroutine 同時寫這個槽位。
注意它還實現了pack和unpack方法,用於將 head 和 tail 打包到一個 uint64 中,或者從 uint64 中解包出 head 和 tail。
消費者消費數據的邏輯如下:
func (d *poolDequeue) popTail() (any, bool) {
var slot *eface
for { // ②
ptrs := d.headTail.Load()
head, tail := d.unpack(ptrs)
if tail == head {
// 隊列爲空
return nil, false
}
// 確認頭部和尾部(用於我們之前的推測性檢查),並遞增尾部。如果成功,那麼我們就擁有了尾部的插槽。
ptrs2 := d.pack(head, tail+1)
if d.headTail.CompareAndSwap(ptrs, ptrs2) {
// 成功讀取了一個 slot
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// 剩下來就是讀取槽位的值
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) { // 如果本身就存儲的nil
val = nil
}
// 釋放 slot,這樣 pushHead 就可以繼續寫入這個 slot 了
slot.val = nil // ③
atomic.StorePointer(&slot.typ, nil) // ④
return val, true
}
② 處是一個 for 循環,這是一個自旋的過程,直到成功讀取到一個 slot 爲止。在有大量的 goroutine 的時候,這裏可能會是一個瓶頸點,但是少量的消費者應該還不算大問題。
③ 和 ④ 處是釋放 slot 的過程,這樣生產者就可以繼續寫入這個 slot 了。
生產者還可以調用popHead方法,用來彈出剛剛壓入還沒有消費的數據:
func (d *poolDequeue) popHead() (any, bool) {
var slot *eface
for {
ptrs := d.headTail.Load()
head, tail := d.unpack(ptrs)
if tail == head {
// 隊列爲空
return nil, false
}
// 確認頭部和尾部(用於我們之前的推測性檢查),並遞減頭部。如果成功,那麼我們就擁有了頭部的插槽。
head--
ptrs2 := d.pack(head, tail)
if d.headTail.CompareAndSwap(ptrs, ptrs2) {
// 成功取回了一個 slot
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 釋放 slot,這樣 pushHead 就可以繼續寫入這個 slot 了
*slot = eface{}
return val, true
}
這是一個固定大小的隊列,如果隊列滿了,生產者就會失敗。這個隊列的大小是 2 的冪次方,這樣可以用 & 來取模,而不用 %,這樣可以提高性能。
PoolChain
PoolChain 是在 PoolDequeue 的基礎上實現的一個動態尺寸的隊列,它的實現和 PoolDequeue 類似,只是增加了一個 headTail 的鏈表,用於存儲多個 PoolDequeue。
type poolChain struct {
// head 是生產者用來push的 poolDequeue。只有生產者訪問,所以不需要同步
head *poolChainElt
// tail 是消費者用來pop的 poolDequeue。消費者訪問,所以需要原子操作
tail atomic.Pointer[poolChainElt]
}
type poolChainElt struct {
poolDequeue
// next由生產者原子寫入,消費者原子讀取。它只能從nil轉換爲非nil。
// prev由消費者原子寫入,生產者原子讀取。它只能從非nil轉換爲nil。
next, prev atomic.Pointer[poolChainElt]
}
考慮到文章中代碼過多,大家就會感覺很枯燥了,我就不具體展示代碼了,你可以在 https://github.com/golang/go/blob/master/src/sync/poolqueue.go#L220-L302 查看具體的實現。 整體的思想就是將多個poolDequeue串聯起來,生產者在head處增加數據,消費者在tail處消費數據,當tail的poolDequeue爲空時,就從head處獲取一個poolDequeue。 當head滿了的時候,就增加一個新的poolDequeue。 這樣就實現了動態尺寸的隊列。
sync.Pool中就是使用的PoolChain來實現的,它是一個單生產者多消費者的隊列,可以同時有多個消費者消費數據,但是隻有一個生產者生產數據。
爲了能將這個數據結構暴露出來使用,我把相關的代碼複製到 https://github.com/smallnest/exp/blob/master/gods/poolqueue.go , 增加了單元測試和性能測試的代碼。
你可以學到這個方法,使用類似的技術,創建一個 look-free 無線長度的 byte buffer。在一些 Go 的網絡優化庫中就使用這種方法,避免頻繁的 grow 和 copy 既有數據。
與 channel 的性能比較
我們來看一下poolDequeue、PoolChain和channel的性能對比。 我們使用一個 goroutine 進行寫入,10 個 goroutine 進行讀取:
package gods
import (
"sync"
"testing"
)
func BenchmarkPoolDequeue(b *testing.B) {
const size = 1024
pd := NewPoolDequeue(size)
var wg sync.WaitGroup
// Producer
go func() {
for i := 0; i < b.N; i++ {
pd.PushHead(i)
}
wg.Done()
}()
// Consumers
numConsumers := 10
wg.Add(numConsumers + 1)
for i := 0; i < numConsumers; i++ {
go func() {
for {
if _, ok := pd.PopTail(); !ok {
break
}
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkPoolChain(b *testing.B) {
pc := NewPoolChain()
var wg sync.WaitGroup
// Producer
go func() {
for i := 0; i < b.N; i++ {
pc.PushHead(i)
}
wg.Done()
}()
// Consumers
numConsumers := 10
wg.Add(numConsumers + 1)
for i := 0; i < numConsumers; i++ {
go func() {
for {
if _, ok := pc.PopTail(); !ok {
break
}
}
wg.Done()
}()
}
wg.Wait()
}
func BenchmarkChannel(b *testing.B) {
ch := make(chan interface{}, 1024)
var wg sync.WaitGroup
// Producer
go func() {
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
wg.Done()
}()
// Consumers
numConsumers := 10
wg.Add(numConsumers + 1)
for i := 0; i < numConsumers; i++ {
go func() {
for range ch {
}
wg.Done()
}()
}
wg.Wait()
}
運行這個 benchmark, 我們可以看到poolDequeue和PoolChain的性能要比channel高很多,大約是channel的 10 倍。poolDequeue 比 PoolChain 要好一些,性能是後者的兩倍。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/fj87oGZPkFKQiGZxhrYRVQ