Go 每日一庫之 bytebufferpool
簡介
在編程開發中,我們經常會需要頻繁創建和銷燬同類對象的情形。這樣的操作很可能會對性能造成影響。這時,常用的優化手段就是使用對象池(object pool)。需要創建對象時,我們先從對象池中查找。如果有空閒對象,則從池中移除這個對象並將其返回給調用者使用。只有在池中無空閒對象時,纔會真正創建一個新對象。另一方面,對象使用完之後,我們並不進行銷燬。而是將它放回到對象池以供後續使用。使用對象池在頻繁創建和銷燬對象的情形下,能大幅度提升性能。同時,爲了避免對象池中的對象佔用過多的內存。對象池一般還配有特定的清理策略。Go 標準庫sync.Pool
就是這樣一個例子。sync.Pool
中的對象會被垃圾回收清理掉。
在這類對象中,比較特殊的一類是字節緩衝(底層一般是字節切片)。在做字符串拼接時,爲了拼接的高效,我們通常將中間結果存放在一個字節緩衝。在拼接完成之後,再從字節緩衝中生成結果字符串。在收發網絡包時,也需要將不完整的包暫時存放在字節緩衝中。
Go 標準庫中的類型bytes.Buffer
封裝字節切片,提供一些使用接口。我們知道切片的容量是有限的,容量不足時需要進行擴容。而頻繁的擴容容易造成性能抖動。bytebufferpool
實現了自己的Buffer
類型,並使用一個簡單的算法降低擴容帶來的性能損失。bytebufferpool
已經在大名鼎鼎的 Web 框架 fasthttp 和靈活的 Go 模塊庫 quicktemplate 得到了應用。實際上,這 3 個庫是同一個作者:valyala😀。
快速使用
本文代碼使用 Go Modules。
創建目錄並初始化:
$ mkdir bytebufferpool && cd bytebufferpool
$ go mod init github.com/darjun/go-daily-lib/bytebufferpool
安裝bytebufferpool
庫:
$ go get -u github.com/PuerkitoBio/bytebufferpool
典型的使用方式先通過bytebufferpool
提供的Get()
方法獲取一個bytebufferpool.Buffer
對象,然後調用這個對象的方法寫入數據,使用完成之後再調用bytebufferpool.Put()
將對象放回對象池中。例:
package main
import (
"fmt"
"github.com/valyala/bytebufferpool"
)
func main() {
b := bytebufferpool.Get()
b.WriteString("hello")
b.WriteByte(',')
b.WriteString(" world!")
fmt.Println(b.String())
bytebufferpool.Put(b)
}
直接調用bytebufferpool
包的Get()
和Put()
方法,底層操作的是包中默認的對象池:
// bytebufferpool/pool.go
var defaultPool Pool
func Get() *ByteBuffer { return defaultPool.Get() }
func Put(b *ByteBuffer) { defaultPool.Put(b) }
我們當然可以根據實際需要創建新的對象池,將相同用處的對象放在一起(比如我們可以創建一個對象池用於輔助接收網絡包,一個用於輔助拼接字符串):
func main() {
joinPool := new(bytebufferpool.Pool)
b := joinPool.Get()
b.WriteString("hello")
b.WriteByte(',')
b.WriteString(" world!")
fmt.Println(b.String())
joinPool.Put(b)
}
bytebufferpool
沒有提供具體的創建函數,不過可以使用new
創建。
優化細節
在將對象放回池中時,會根據當前切片的容量進行相應的處理。bytebufferpool
將大小分爲 20 個區間:
| <2^6 | 2^6 ~ 2^7-1 | ... |> 2^25 |
如果容量小於 2^6,則屬於第一個區間。如果處於 2^6 和 2^7-1 之間,則落在第二個區間。以此類推。執行足夠多的放回次數後,bytebufferpool
會重新校準,計算處於哪個區間容量的對象最多。將defaultSize
設置爲該區間的上限容量,第一個區間的上限容量爲 2^6,第二區間爲 2^7,最後一個區間爲 2^26。後續通過Get()
請求對象時,若池中無空閒對象,創建一個新對象時,直接將容量設置爲defaultSize
。這樣基本可以避免在使用過程中的切片擴容,從而提升性能。下面結合代碼來理解:
// bytebufferpool/pool.go
const (
minBitSize = 6 // 2**6=64 is a CPU cache line size
steps = 20
minSize = 1 << minBitSize
maxSize = 1 << (minBitSize + steps - 1)
calibrateCallsThreshold = 42000
maxPercentile = 0.95
)
type Pool struct {
calls [steps]uint64
calibrating uint64
defaultSize uint64
maxSize uint64
pool sync.Pool
}
我們可以看到,bytebufferpool
內部使用了標準庫中的對象sync.Pool
。
這裏的steps
就是上面所說的區間,一共 20 份。calls
數組記錄放回的對象容量落在各個區間的次數。
調用Pool.Get()
將對象放回時,首先計算切片容量落在哪個區間,增加calls
數組中相應元素的值:
// bytebufferpool/pool.go
func (p *Pool) Put(b *ByteBuffer) {
idx := index(len(b.B))
if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
p.calibrate()
}
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(b.B) <= maxSize {
b.Reset()
p.pool.Put(b)
}
}
如果calls
數組該元素超過指定值calibrateCallsThreshold=42000
(說明距離上次校準,放回對象到該區間的次數已經達到閾值了,42000 應該就是個經驗數字),則調用Pool.calibrate()
執行校準操作:
// bytebufferpool/pool.go
func (p *Pool) calibrate() {
// 避免併發放回對象觸發 `calibrate`
if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
return
}
// step 1.統計並排序
a := make(callSizes, 0, steps)
var callsSum uint64
for i := uint64(0); i < steps; i++ {
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
a = append(a, callSize{
calls: calls,
size: minSize << i,
})
}
sort.Sort(a)
// step 2.計算 defaultSize 和 maxSize
defaultSize := a[0].size
maxSize := defaultSize
maxSum := uint64(float64(callsSum) * maxPercentile)
callsSum = 0
for i := 0; i < steps; i++ {
if callsSum > maxSum {
break
}
callsSum += a[i].calls
size := a[i].size
if size > maxSize {
maxSize = size
}
}
// step 3.保存對應值
atomic.StoreUint64(&p.defaultSize, defaultSize)
atomic.StoreUint64(&p.maxSize, maxSize)
atomic.StoreUint64(&p.calibrating, 0)
}
step 1. 統計並排序
calls
數組記錄了放回對象到對應區間的次數。按照這個次數從大到小排序。注意:minSize << i
表示區間i
的上限容量。
step 2. 計算defaultSize
和maxSize
defaultSize
很好理解,取排序後的第一個size
即可。maxSize
值記錄放回次數超過 95% 的多個對象容量的最大值。它的作用是防止將使用較少的大容量對象放回對象池,從而佔用太多內存。這裏就可以理解Pool.Put()
方法後半部分的邏輯了:
// 如果要放回的對象容量大於 maxSize,則不放回
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(b.B) <= maxSize {
b.Reset()
p.pool.Put(b)
}
step 3. 保存對應值
後續通過Pool.Get()
獲取對象時,若池中無空閒對象,新創建的對象默認容量爲defaultSize
。這樣的容量能滿足絕大多數情況下的使用,避免使用過程中的切片擴容。
// bytebufferpool/pool.go
func (p *Pool) Get() *ByteBuffer {
v := p.pool.Get()
if v != nil {
return v.(*ByteBuffer)
}
return &ByteBuffer{
B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
}
}
其他一些細節:
-
容量最小值取 2^6 = 64,因爲這就是 64 位計算機上 CPU 緩存行的大小。這個大小的數據可以一次性被加載到 CPU 緩存行中,再小就無意義了。
-
代碼中多次使用
atomic
原子操作,避免加鎖導致性能損失。
當然這個庫缺點也很明顯,由於大部分使用的容量都小於defaultSize
,會有部分內存浪費。
總結
去掉註釋,空行,bytebufferpool
只用了 150 行左右的代碼就實現了一個高性能的Buffer
對象池。其中細節值得細細品味。閱讀高質量的代碼,學習編碼細節有助於提升自己的編碼能力。強烈建議細細品讀!!!
大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
參考
-
bytebufferpool GitHub:https://github.com/valyala/bytebufferpool
-
Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
我
我的博客:https://darjun.github.io
歡迎關注我的微信公衆號【GoUpUp】,共同學習,一起進步~
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/SVqpZubE_X4W8uAwmNV79w