Go 中祕而不宣的數據結構 spmc- 10 倍性能於 channel

Go 標準庫和運行中中,有一些專門針對特定場景優化的數據結構,這些數據結構並沒有暴露出來,_這個系列_就是逐一介紹這些數據結構。

這一次給大家介紹的就是一個 lock-free、高性能的單生產者多消費者的隊列:PoolDequeuePoolChain。 到底是一個還是兩個呢? 主要是 PoolDequeue, 它是一個固定尺寸,使用 ringbuffer (環形隊列) 方式實現的隊列。PoolChain 是在它的基礎上上,實現的一個動態尺寸的隊列。

生產者消費者模式是常見的一種併發模式,根據生產者的數量和消費者的數量,可以分爲四種情況:

Channel 基本上可以看做是一種多生產者多消費者模式的隊列。可以同時允許多個生產者發送數據,有可以允許多個消費者消費數據,它也可以應用在其他模式的場景,比如 rpc 包中的 oneshot 模式、通知情況下的的單生產者多消費者模式、rpc 和服務端單連接通訊時的消息處理,就是多生產者單消費者模式。

但是 Go 標準庫的 sync 包下,有一個針對單生產者多消費者的數據結構,它是一個 lock-free 的數據結構,針對這個場景做了優化,被使用在 sync.Pool 中。

sync.Pool 採用了一種類似 Go 運行時調度的機制,針對每個 p 有一個 private 的數據,同時還有一個 shared 的數據,如果在本地 privateshared 中沒有數據,就去其他 P 對應的 shared 去偷取。難麼同時可能有多個 P 偷取同一個 shared, 這是多消費者。

同時對 shared 的寫只有它隸屬的 p 執行 Put 的時候纔會發生:

 l, _ := p.pin()
 if l.private == nil {
  l.private = x
 } else {
  l.shared.pushHead(x)
 }
 runtime_procUnpin()

這有屬於單生產者模式。sync.Pool 使用了 PoolDequeuePoolChain 來做優化。

首先我們先來了解 poolDequeue

poolDequeue

poolDequeue 是一個 lock-free 的數據結構,必然會使用 atomic, 同時它要求必須使用單生產者,否則會有併發問題。消費者可以是併發多個,當然你用一個也沒問題。

其中,生產者可以使用下面的方法:

消費者可以使用下面的一個方法:

接下來就是分析代碼了,有點枯燥,你可以跳過。

代碼分析

首先我們看這個 struct 的定義:

type poolDequeue struct {
 headTail atomic.Uint64
 vals []eface
}

這裏有兩個重要的字段:

生產者增加數據的邏輯如下:

func (d *poolDequeue) pushHead(val any) bool {
 ptrs := d.headTail.Load()
 head, tail := d.unpack(ptrs)
 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  // 隊列滿
  return false
 }
 slot := &d.vals[head&uint32(len(d.vals)-1)]

    // 檢查 head slot 是否被 popTail 釋放
 typ := atomic.LoadPointer(&slot.typ)
 if typ != nil {
        // 另一個 goroutine 正在清理 tail,所以隊列還是滿的
  return false
 }

    // 如果值爲空,那麼設置一個特殊值
 if val == nil {
  val = dequeueNil(nil)
 }
    // 隊列頭是空的,將數據寫入 slot
 *(*any)(unsafe.Pointer(slot)) = val // ①

    // 增加 head,這樣 popTail 就可以消費這個 slot 了
    // 同時也是一個 store barrier,保證了 slot 的寫入
 d.headTail.Add(1 << dequeueBits)
 return true
}

① 處會有併發問題嗎?萬一有兩個 goroutine 同時執行到這裏,會不會有問題?這裏沒有問題,因爲要求只有一個生產者,不會有另外一個 goroutine 同時寫這個槽位。

注意它還實現了packunpack方法,用於將 headtail 打包到一個 uint64 中,或者從 uint64 中解包出 headtail

消費者消費數據的邏輯如下:

func (d *poolDequeue) popTail() (any, bool) {
 var slot *eface
 for { // ②
  ptrs := d.headTail.Load()
  head, tail := d.unpack(ptrs)
  if tail == head {
   // 隊列爲空
   return nil, false
  }

        // 確認頭部和尾部(用於我們之前的推測性檢查),並遞增尾部。如果成功,那麼我們就擁有了尾部的插槽。
  ptrs2 := d.pack(head, tail+1)
  if d.headTail.CompareAndSwap(ptrs, ptrs2) {
   // 成功讀取了一個 slot
   slot = &d.vals[tail&uint32(len(d.vals)-1)]
   break
  }
 }

 // 剩下來就是讀取槽位的值
 val := *(*any)(unsafe.Pointer(slot))
 if val == dequeueNil(nil) { // 如果本身就存儲的nil
  val = nil
 }

    // 釋放 slot,這樣 pushHead 就可以繼續寫入這個 slot 了
 slot.val = nil // ③
 atomic.StorePointer(&slot.typ, nil) // ④

 return val, true
}

② 處是一個 for 循環,這是一個自旋的過程,直到成功讀取到一個 slot 爲止。在有大量的 goroutine 的時候,這裏可能會是一個瓶頸點,但是少量的消費者應該還不算大問題。

③ 和 ④ 處是釋放 slot 的過程,這樣生產者就可以繼續寫入這個 slot 了。

生產者還可以調用popHead方法,用來彈出剛剛壓入還沒有消費的數據:

func (d *poolDequeue) popHead() (any, bool) {
 var slot *eface
 for {
  ptrs := d.headTail.Load()
  head, tail := d.unpack(ptrs)
  if tail == head {
   // 隊列爲空
   return nil, false
  }

        // 確認頭部和尾部(用於我們之前的推測性檢查),並遞減頭部。如果成功,那麼我們就擁有了頭部的插槽。
  head--
  ptrs2 := d.pack(head, tail)
  if d.headTail.CompareAndSwap(ptrs, ptrs2) {
            // 成功取回了一個 slot
   slot = &d.vals[head&uint32(len(d.vals)-1)]
   break
  }
 }

 val := *(*any)(unsafe.Pointer(slot))
 if val == dequeueNil(nil) {
  val = nil
 }

    // 釋放 slot,這樣 pushHead 就可以繼續寫入這個 slot 了
 *slot = eface{}
 return val, true
}

這是一個固定大小的隊列,如果隊列滿了,生產者就會失敗。這個隊列的大小是 2 的冪次方,這樣可以用 & 來取模,而不用 %,這樣可以提高性能。

PoolChain

PoolChain 是在 PoolDequeue 的基礎上實現的一個動態尺寸的隊列,它的實現和 PoolDequeue 類似,只是增加了一個 headTail 的鏈表,用於存儲多個 PoolDequeue

type poolChain struct {
    // head 是生產者用來push的 poolDequeue。只有生產者訪問,所以不需要同步
 head *poolChainElt

    // tail 是消費者用來pop的 poolDequeue。消費者訪問,所以需要原子操作
 tail atomic.Pointer[poolChainElt]
}

type poolChainElt struct {
 poolDequeue

    // next由生產者原子寫入,消費者原子讀取。它只能從nil轉換爲非nil。
    // prev由消費者原子寫入,生產者原子讀取。它只能從非nil轉換爲nil。
 next, prev atomic.Pointer[poolChainElt]
}

考慮到文章中代碼過多,大家就會感覺很枯燥了,我就不具體展示代碼了,你可以在 https://github.com/golang/go/blob/master/src/sync/poolqueue.go#L220-L302 查看具體的實現。 整體的思想就是將多個poolDequeue串聯起來,生產者在head處增加數據,消費者在tail處消費數據,當tailpoolDequeue爲空時,就從head處獲取一個poolDequeue。 當head滿了的時候,就增加一個新的poolDequeue。 這樣就實現了動態尺寸的隊列。

sync.Pool中就是使用的PoolChain來實現的,它是一個單生產者多消費者的隊列,可以同時有多個消費者消費數據,但是隻有一個生產者生產數據。

爲了能將這個數據結構暴露出來使用,我把相關的代碼複製到 https://github.com/smallnest/exp/blob/master/gods/poolqueue.go , 增加了單元測試和性能測試的代碼。

你可以學到這個方法,使用類似的技術,創建一個 look-free 無線長度的 byte buffer。在一些 Go 的網絡優化庫中就使用這種方法,避免頻繁的 grow 和 copy 既有數據。

與 channel 的性能比較

我們來看一下poolDequeuePoolChainchannel的性能對比。 我們使用一個 goroutine 進行寫入,10 個 goroutine 進行讀取:

package gods

import (
 "sync"
 "testing"
)

func BenchmarkPoolDequeue(b *testing.B) {
 const size = 1024
 pd := NewPoolDequeue(size)
 var wg sync.WaitGroup

 // Producer
 go func() {
  for i := 0; i < b.N; i++ {
   pd.PushHead(i)
  }
  wg.Done()
 }()

 // Consumers
 numConsumers := 10
 wg.Add(numConsumers + 1)
 for i := 0; i < numConsumers; i++ {
  go func() {
   for {
    if _, ok := pd.PopTail(); !ok {
     break
    }
   }
   wg.Done()
  }()
 }

 wg.Wait()
}

func BenchmarkPoolChain(b *testing.B) {
 pc := NewPoolChain()
 var wg sync.WaitGroup

 // Producer
 go func() {
  for i := 0; i < b.N; i++ {
   pc.PushHead(i)
  }
  wg.Done()
 }()

 // Consumers
 numConsumers := 10
 wg.Add(numConsumers + 1)
 for i := 0; i < numConsumers; i++ {
  go func() {
   for {
    if _, ok := pc.PopTail(); !ok {
     break
    }
   }
   wg.Done()
  }()
 }

 wg.Wait()
}

func BenchmarkChannel(b *testing.B) {
 ch := make(chan interface{}, 1024)
 var wg sync.WaitGroup

 // Producer
 go func() {
  for i := 0; i < b.N; i++ {
   ch <- i
  }
  close(ch)
  wg.Done()
 }()

 // Consumers
 numConsumers := 10
 wg.Add(numConsumers + 1)
 for i := 0; i < numConsumers; i++ {
  go func() {
   for range ch {
   }
   wg.Done()
  }()
 }

 wg.Wait()
}

運行這個 benchmark, 我們可以看到poolDequeuePoolChain的性能要比channel高很多,大約是channel的 10 倍。poolDequeuePoolChain 要好一些,性能是後者的兩倍。 

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/fj87oGZPkFKQiGZxhrYRVQ