Go 每日一庫之 bytebufferpool

簡介

在編程開發中,我們經常會需要頻繁創建和銷燬同類對象的情形。這樣的操作很可能會對性能造成影響。這時,常用的優化手段就是使用對象池(object pool)。需要創建對象時,我們先從對象池中查找。如果有空閒對象,則從池中移除這個對象並將其返回給調用者使用。只有在池中無空閒對象時,纔會真正創建一個新對象。另一方面,對象使用完之後,我們並不進行銷燬。而是將它放回到對象池以供後續使用。使用對象池在頻繁創建和銷燬對象的情形下,能大幅度提升性能。同時,爲了避免對象池中的對象佔用過多的內存。對象池一般還配有特定的清理策略。Go 標準庫sync.Pool就是這樣一個例子。sync.Pool中的對象會被垃圾回收清理掉。

在這類對象中,比較特殊的一類是字節緩衝(底層一般是字節切片)。在做字符串拼接時,爲了拼接的高效,我們通常將中間結果存放在一個字節緩衝。在拼接完成之後,再從字節緩衝中生成結果字符串。在收發網絡包時,也需要將不完整的包暫時存放在字節緩衝中。

Go 標準庫中的類型bytes.Buffer封裝字節切片,提供一些使用接口。我們知道切片的容量是有限的,容量不足時需要進行擴容。而頻繁的擴容容易造成性能抖動。bytebufferpool實現了自己的Buffer類型,並使用一個簡單的算法降低擴容帶來的性能損失。bytebufferpool已經在大名鼎鼎的 Web 框架 fasthttp 和靈活的 Go 模塊庫 quicktemplate 得到了應用。實際上,這 3 個庫是同一個作者:valyala😀。

快速使用

本文代碼使用 Go Modules。

創建目錄並初始化:

$ mkdir bytebufferpool && cd bytebufferpool
$ go mod init github.com/darjun/go-daily-lib/bytebufferpool

安裝bytebufferpool庫:

$ go get -u github.com/PuerkitoBio/bytebufferpool

典型的使用方式先通過bytebufferpool提供的Get()方法獲取一個bytebufferpool.Buffer對象,然後調用這個對象的方法寫入數據,使用完成之後再調用bytebufferpool.Put()將對象放回對象池中。例:

package main

import (
  "fmt"

  "github.com/valyala/bytebufferpool"
)

func main() {
  b := bytebufferpool.Get()
  b.WriteString("hello")
  b.WriteByte(',')
  b.WriteString(" world!")

  fmt.Println(b.String())

  bytebufferpool.Put(b)
}

直接調用bytebufferpool包的Get()Put()方法,底層操作的是包中默認的對象池:

// bytebufferpool/pool.go
var defaultPool Pool

func Get() *ByteBuffer { return defaultPool.Get() }
func Put(b *ByteBuffer) { defaultPool.Put(b) }

我們當然可以根據實際需要創建新的對象池,將相同用處的對象放在一起(比如我們可以創建一個對象池用於輔助接收網絡包,一個用於輔助拼接字符串):

func main() {
  joinPool := new(bytebufferpool.Pool)
  b := joinPool.Get()
  b.WriteString("hello")
  b.WriteByte(',')
  b.WriteString(" world!")

  fmt.Println(b.String())

  joinPool.Put(b)
}

bytebufferpool沒有提供具體的創建函數,不過可以使用new創建。

優化細節

在將對象放回池中時,會根據當前切片的容量進行相應的處理。bytebufferpool將大小分爲 20 個區間:

| <2^6 | 2^6 ~ 2^7-1 | ... |> 2^25 |

如果容量小於 2^6,則屬於第一個區間。如果處於 2^6 和 2^7-1 之間,則落在第二個區間。以此類推。執行足夠多的放回次數後,bytebufferpool會重新校準,計算處於哪個區間容量的對象最多。將defaultSize設置爲該區間的上限容量,第一個區間的上限容量爲 2^6,第二區間爲 2^7,最後一個區間爲 2^26。後續通過Get()請求對象時,若池中無空閒對象,創建一個新對象時,直接將容量設置爲defaultSize。這樣基本可以避免在使用過程中的切片擴容,從而提升性能。下面結合代碼來理解:

// bytebufferpool/pool.go
const (
  minBitSize = 6 // 2**6=64 is a CPU cache line size
  steps      = 20

  minSize = 1 << minBitSize
  maxSize = 1 << (minBitSize + steps - 1)

  calibrateCallsThreshold = 42000
  maxPercentile           = 0.95
)

type Pool struct {
  calls       [steps]uint64
  calibrating uint64

  defaultSize uint64
  maxSize     uint64

  pool sync.Pool
}

我們可以看到,bytebufferpool內部使用了標準庫中的對象sync.Pool

這裏的steps就是上面所說的區間,一共 20 份。calls數組記錄放回的對象容量落在各個區間的次數。

調用Pool.Get()將對象放回時,首先計算切片容量落在哪個區間,增加calls數組中相應元素的值:

// bytebufferpool/pool.go
func (p *Pool) Put(b *ByteBuffer) {
  idx := index(len(b.B))

  if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
    p.calibrate()
  }

  maxSize := int(atomic.LoadUint64(&p.maxSize))
  if maxSize == 0 || cap(b.B) <= maxSize {
    b.Reset()
    p.pool.Put(b)
  }
}

如果calls數組該元素超過指定值calibrateCallsThreshold=42000(說明距離上次校準,放回對象到該區間的次數已經達到閾值了,42000 應該就是個經驗數字),則調用Pool.calibrate()執行校準操作:

// bytebufferpool/pool.go
func (p *Pool) calibrate() {
  // 避免併發放回對象觸發 `calibrate`
  if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
    return
  }

  // step 1.統計並排序
  a := make(callSizes, 0, steps)
  var callsSum uint64
  for i := uint64(0); i < steps; i++ {
    calls := atomic.SwapUint64(&p.calls[i], 0)
    callsSum += calls
    a = append(a, callSize{
      calls: calls,
      size:  minSize << i,
    })
  }
  sort.Sort(a)

  // step 2.計算 defaultSize 和 maxSize
  defaultSize := a[0].size
  maxSize := defaultSize

  maxSum := uint64(float64(callsSum) * maxPercentile)
  callsSum = 0
  for i := 0; i < steps; i++ {
    if callsSum > maxSum {
      break
    }
    callsSum += a[i].calls
    size := a[i].size
    if size > maxSize {
      maxSize = size
    }
  }

  // step 3.保存對應值
  atomic.StoreUint64(&p.defaultSize, defaultSize)
  atomic.StoreUint64(&p.maxSize, maxSize)

  atomic.StoreUint64(&p.calibrating, 0)
}

step 1. 統計並排序

calls數組記錄了放回對象到對應區間的次數。按照這個次數從大到小排序。注意:minSize << i表示區間i的上限容量。

step 2. 計算defaultSizemaxSize

defaultSize很好理解,取排序後的第一個size即可。maxSize值記錄放回次數超過 95% 的多個對象容量的最大值。它的作用是防止將使用較少的大容量對象放回對象池,從而佔用太多內存。這裏就可以理解Pool.Put()方法後半部分的邏輯了:

// 如果要放回的對象容量大於 maxSize,則不放回
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(b.B) <= maxSize {
  b.Reset()
  p.pool.Put(b)
}

step 3. 保存對應值

後續通過Pool.Get()獲取對象時,若池中無空閒對象,新創建的對象默認容量爲defaultSize。這樣的容量能滿足絕大多數情況下的使用,避免使用過程中的切片擴容。

// bytebufferpool/pool.go
func (p *Pool) Get() *ByteBuffer {
  v := p.pool.Get()
  if v != nil {
    return v.(*ByteBuffer)
  }
  return &ByteBuffer{
    B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
  }
}

其他一些細節:

當然這個庫缺點也很明顯,由於大部分使用的容量都小於defaultSize,會有部分內存浪費。

總結

去掉註釋,空行,bytebufferpool只用了 150 行左右的代碼就實現了一個高性能的Buffer對象池。其中細節值得細細品味。閱讀高質量的代碼,學習編碼細節有助於提升自己的編碼能力。強烈建議細細品讀!!!

大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. bytebufferpool GitHub:https://github.com/valyala/bytebufferpool

  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

歡迎關注我的微信公衆號【GoUpUp】,共同學習,一起進步~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/SVqpZubE_X4W8uAwmNV79w