Go sync-Pool 保姆級教程
Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists. A Pool is safe for use by multiple goroutines simultaneously.
sync.Pool 是一個併發安全的緩存池,能夠併發且安全地存儲、獲取元素 / 對象。常用於對象實例創建會佔用較多資源的場景。但是它不具有嚴格的緩存作用,因爲 Pool 中的元素 / 對象的釋放時機是隨機的。
作爲緩存的一種姿勢,sync.Pool 能夠避免元素 / 對象的申請內存操作和初始化過程,以提高性能。當然,這裏有個 trick,釋放元素 / 對象的操作是直接將元素 / 對象放回池子,從而免去了真正釋放的操作。
另外,不考慮內存浪費和初始化消耗的情況下,“使用 sync.Pool 管理多個對象”和 “直接 New 多個對象” 兩者的區別在於後者會創建出更多的對象,併發高時會給 GC 帶來非常大的負擔,進而影響整體程序的性能。因爲 Go 申請內存是程序員觸發的,而回收卻是 Go 內部 runtime GC 回收器來執行的。即,使用 sync.Pool 還可以減少 GC 次數。
總結一下:sync.Pool 是以緩存池的形式,在創建對象等場景下,減少 GC 次數、提高程序穩定性。後文將稱之爲 Pool 池,池子中的元素統一稱爲對象。
1.2、接口使用
1、創建一個 Pool 實例
Pool 池的模式是通用型的 (存儲對象的類型爲 interface{}),所有的類型的對象都可以進行使用。注意的是,作爲使用方不能對 Pool 裏面的對象個數做假定,同時也無法獲取 Pool 池中對象個數。
pool := &sync.Pool{}
複製代碼
2、Put 和 Get 接口
(1) Put(interface{}):將一個對象加入到 Pool 池中
- 注意的是這僅僅是把對象放入池子,池子中的對象真正釋放的時機是不受外部控制的。 (2) Get() interface{}:返回 Pool 池中存在的對象
- 注意的是 Get() 方法是隨機取出對象,無法保證以固定的順序獲取 Pool 池中存儲的對象。
3、爲 Pool 實例配置 New 方法
沒有配置 New 方法時,如果 Get 操作多於 Put 操作,繼續 Get 會得到一個 nil interface{} 對象,所以需要代碼進行兼容。
配置 New 方法後,Get 獲取不到對象時(Pool 池中已經沒有對象了),會調用自定義的 New 方法創建一個對象並返回。
pool := &sync.Pool {
New: func() interface {} {
return struct{}{}
}
}
複製代碼
注意的是,sync.Pool 本身數據結構是併發安全的,但是 Pool.New 函數(用戶自定義的)不一定是線程安全的。並且 Pool.New 函數可能會被併發調用,如果 New 函數里面的實現邏輯是非併發安全的,那就會有問題。
4、關於 sync.Pool 的性能優勢,可以試一下 go/src/sync/pool_test.go 中的幾個壓測函數。
二、使用 & 實踐
2.1、使用場景
1、增加臨時對象的重用率(sync.Pool 本質用途)。在高併發業務場景下出現 GC 問題時,可以使用 sync.Pool 減少 GC 負擔。
2、不適合存儲帶狀態的對象,因爲獲取對象是隨機的(Get 到的對象可能是剛創建的,也可能是之前創建並 cache 住的),並且緩存對象的釋放策略完全由 runtime 內部管理。像 socket 長連接、數據庫連接,有人說適合有人說不適合。
3、不適合需要控制緩存元素個數的場景,因爲無法知道 Pool 池裏面的對象個數,以及對象的釋放時機也是未知的。
4、典型的應用場景之一是在網絡包收取發送的時候,用 sync.Pool 會有奇效,可以大大降低 GC 壓力。
2.2、實踐
1、使用規範:作爲對象生成器
(a) 初始化 sync.Pool 實例,並需要配置併發安全的 New 方法;
(b) 創建對象的地方,通過 Pool.Get() 獲取;
(c) 在 Get 後馬上進行 defer Pool.Put(x);
2、使用建議:不要對 Pool 池中的對象做任何假定
即使用前需要清理緩存對象,有兩種方案:在調用 Pool.Put 前進行 memset 對象操作;在 Pool.Get 操作後對對象進行 memset 操作。
3、demo:重用一個緩衝區,用於文件寫入數據的存儲
func writeFile(pool *sync.Pool, filename string) error {
buf := pool.Get().(*bytes.Buffer) // 如果是第一個調用,則創建一個緩衝區
defer pool.Put(buf) // 將緩衝區放回 sync.Pool中
buf.Reset() // Reset 緩存區,不然會連接上次調用時保存在緩存區裏的內容
buf.WriteString("Demo")
return ioutil.WriteFile(filename, buf.Bytes(), 0644)
}
複製代碼
4、幾個真實的使用場景
- gin 框架在 gin.go:L144/L346 中,通過 sync.Pool 對 Context 做了緩存;
- fmt.Printf 中通過 sync.Pool 緩存了 pp struct(newPrinter 函數中);
- logrus 框架中緩存了 logger entry 對象。
2.3、思考
1、先 Put,或者主動 Put 會造成什麼?
首先,Put 任意類型的元素都不會報錯,因爲存儲的是 interface{} 對象,且內部沒有進行類型的判斷和斷言。如果 Put 在業務上不做限制,那麼 Pool 池中就可能存在各種類型的數據,就導致在 Get 後的代碼會非常繁瑣(需要進行類型判斷,否則有 panic 隱患)。另外,Get 得到的對象是隨機的,緩存對象的回收也是隨機的,所以先 Put 一個對象根本就沒有實際作用。
綜上,sync.Pool 本質上可以是一個雜貨鋪,支持存放任何類型,所以 Get 出來和 Put 進去的對象類型要業務自己把控。
2、如果只調用 Get 不調用 Put 會怎麼樣?
首先就算不調用 Pool.Put,GC 也會去釋放 Get 獲取的對象(當沒有人去再用它時)。但是隻進行 Get 操作的話,就相當於一直在生成新的對象,Pool 池也失去了它最本質的功能。
三、源碼分析
基於 go 1.14 version
2.1、Pool 的數據結構
type Pool struct {
// 用於檢測 Pool 池是否被 copy,因爲 Pool 不希望被 copy。用這個字段可以在 go vet 工具中檢測出被 copy(在編譯期間就發現問題)
noCopy noCopy // A Pool must not be copied after first use.
// 實際指向 []poolLocal,數組大小等於 P 的數量;每個 P 一一對應一個 poolLocal
local unsafe.Pointer
localSize uintptr // []poolLocal 的大小
// GC 時,victim 和 victimSize 會分別接管 local 和 localSize;
// victim 的目的是爲了減少 GC 後冷啓動導致的性能抖動,讓分配對象更平滑;
victim unsafe.Pointer
victimSize uintptr
// 對象初始化構造方法,使用方定義
New func() interface{}
}
複製代碼
1、noCopy 字段是爲了檢測 Pool 池的 copy 行爲。但無法阻止編譯(用戶進行 copy 行爲也能成功運行程序),只能通過 go vet 檢查出用戶的 copy 行爲。noCopy 是 go1.7 引入的一個靜態檢查機制,對用戶代碼有效。
2、local 和 localSize 這兩個字段實現了一個數組,數組元素爲 poolLocal 結構體。每個 Processor(P) 都會對應數組中的一個元素,訪問時,P 的 id 就是 [P]poolLocal 下標索引。通過這樣的設計,多個 goroutine 使用同一個 Pool 時,減少了數據競爭。
3、victim 和 victimSize 在 (GC)poolCleanup 流程裏賦值爲 local 和 localSize。victim 機制是把 Pool 池的清理由一輪 GC 改成兩輪 GC,進而提高對象的複用率,減少抖動。
4、函數變量 New 對外暴露,使用者定義對象初始化構造行爲。todo,沒找到 New 變量初始化的地方。
2.2、poolLocal 的數據結構
// Pool.local 指向的數組元素類型
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // 雙鏈表結構,用於掛接 cache 元素
}
複製代碼
1、Pool.local 指針指向的就是 poolLocal 數組。
2、poolLocal struct 中真實有用的只有 poolLocalInternal struct。其中的 pad 字段是用於內存填充,對齊 cache line,防止僞共享 (false sharing) 的性能問題。
3、並不是一個緩存對象就對應了一個 poolLocal 數組中的一個元素(一個 poolLocalInternal struct 對象),而是有多少個 P(Processor) 就有多少個 poolLocalInternal 對象。可以把 poolLocalInternal 對象理解成一個數據桶,每個 P 下面掛載了一個數據桶。
4、poolLocalInternal struct 中的 shared poolChain 成員就是數據桶的作用;private 成員算是一個 vip 緩存位,每次都最先被訪問。poolChain struct 由頭尾鏈表指針構成,其實就是一個雙向鏈表。
type poolChain struct {
head *poolChainElt // 頭指針
tail *poolChainElt // 尾指針
}
type poolChainElt struct {
poolDequeue // 本質是個數組內存空間,管理成 ringbuffer 的模式;
next, prev *poolChainElt // 前向、後向指針
}
複製代碼
2.3、數據桶 (poolChain + poolDequeue)
雙鏈表的節點 poolChainElt struct 對象,仍然不是 Pool 池中的對象。poolChainElt struct 中的 poolDequeue struct 是一段數組空間,類似於 ringbuffer,是一個無鎖環形隊列。Pool 池管理的對象存儲在 poolDequeue 的 vals[] 數組裏,即緩存對象存儲在環形隊列 poolDequeue 中。
poolDequeue 是單生產者多消費者的固定大小的無鎖環形隊列。生產者可以從 head 寫 / 插入、從 head 讀 / 刪除,而消費者僅可從 tail 讀 / 刪除。headTail 變量指向了隊列的頭和尾,通過位運算區分 head 和 tail 的值(高 32 位爲 head 的索引下標,低 32 位爲 tail 的索引下標)。利用 atomic 和 CAS 對 headTail 值進行原子操作,從而實現了無鎖機制。
type poolDequeue struct {
headTail uint64
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
複製代碼
1、入隊 pushHead
- 如果雙向鏈表 poolChain 爲 nil 則先進行初始化;
- 將對象放入 head 位置上的環形隊列;
- 如果 head 位置的環形隊列 poolDequeue 滿了,則新建一個雙倍容量的鏈表節點(初始節點的隊列長度爲 8)。環形隊列最大容量爲 (1<<32)/4 =1073741824,注意的是,達到上限後,再生成的隊列容量都將是 (1<<32)/4。
- 新建節點並移動 head 後,將對象放入新 head 位置上的環形隊列 poolDequeue 中。
func (c *poolChain) pushHead(val interface{}) {
d := c.head
// Initialize the chain.
if d == nil {
const initSize = 8 // 初始長度爲8
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice the size.
newSize := len(d.vals) * 2
if newSize >= dequeueLimit { // Can't make it any bigger.
newSize = dequeueLimit
}
// 拼接鏈表
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// Ring式隊列,頭尾相等則隊列已滿
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
複製代碼
2、出隊 popHead
- 從 head 位置獲取對象,如果該環形隊列中還有數據則會返回 true;
- 如果 head 位置的環形隊列空了,會定位到 prev 節點繼續嘗試獲取對象;
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判斷隊列是否爲空
if tail == head {
return nil, false
}
head-- // head位置是隊頭的前一個位置,所以此處要先退一位
ptrs2 := d.pack(head, tail) // 新的 headTail 值
// 通過 CAS 判斷當前沒有併發修改就拿到數據
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 取出數據
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 重置slot,typ和val均爲nil
*slot = eface{}
return val, true
}
複製代碼
3、從隊尾獲取元素 popTail
假如在鏈表長度爲 3 的情況下。尾部節點指向的無鎖隊列裏緩存對象被偷光了。那麼尾部節點會沿着 next 指針前移,把舊的無鎖隊列內存釋放掉。此時鏈表長度變爲 2,這就是鏈表的收縮策略。最小時也剩下一個節點,不會收縮成空鏈表。
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
}
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
複製代碼
2.4、Get() interface{}
Get 是從 Pool 池裏獲取一個對象,它會進行逐步嘗試:先嚐試從本 P 對應的 poolLocal 中的 private 區獲取;再嘗試從本 P 的 shared 隊列中獲取元素;然後嘗試從其他 P 的 shared 隊列中獲取元素;接着嘗試從 victim cache 裏獲取元素;最後初始化得到一個新的元素。(下圖來自碼農桃花源)
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
//1. Pool 池中有元素
l, pid := p.pin() // l是一個 poolLocal 對象 // pin()的邏輯看後文
x := l.private // 從 private 區裏獲取
l.private = nil
if x == nil {
// 從 shared 隊列裏獲取
x, _ = l.shared.popHead()
// 嘗試從獲取其他 P 的隊列裏取元素,或者嘗試從 victim cache 裏取元素
if x == nil {
x = p.getSlow(pid)
}
}
// G-M 鎖定解除;上鎖的邏輯在 pin() 中
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
//2. Pool 池中沒有元素,需要當場初始化得到一個
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
複製代碼
1、Pool.pin() 函數的工作是:
- 鎖住當前的 P;
- pinSlow 中先存儲 local 再存儲 localSize,這裏是反順加載兩者,因爲鎖住 P 後就不會再發生 GC,保證 local 的大小至少跟 localSize 一樣即可;
- Pool 實例如果是第一次執行 Get() 操作,那麼初始化 Pool.local 數組,即執行 pinSlow();(這句話,劃重點)
- 返回當前 P 所對應的 poolLocal(poolLocalInternal) 對象,和 Processor.ID;
(a)runtime_procPin() 就是封裝了 procPin,禁止當前執行 G 所在的 MP 被搶佔調度
func procPin() int {
_g_ := getg()
mp := _g_.m
mp.locks++
return int(mp.p.ptr().id) // 並返回當前所在 P 的 ID
}
複製代碼
procPin 函數的目的是把當前 G 鎖住在當前 M(聲明當前 M 不能被搶佔,也可以認爲是 G 綁定到 P)。具體的操作就是執行 mp.locks++。因爲在 newstack 裏會對此條件做判斷:
if preempt {
// 已經打了搶佔標識了,但是還需要判斷條件滿足才能讓出執行權;
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
複製代碼
相對的,後面的 runtime_procUnpin() 操作就是執行了 mp.locks-- 操作。
(b)鎖住 PG 後爲什麼會無法觸發 GC?
觸發 GC 都會調用 gcStart(trigger gcTrigger) 函數,其大致邏輯是:先調用 preemptall() 嘗試搶佔所有的 P,然後停掉當前 P,遍歷所有的 P,如果 P 處於系統調用則直接 stop 掉;然後處理空閒的 P;最後檢查是否存在需要等待處理的 P,如果有則循環等待,並嘗試調用 preemptall()。
preemptall() 函數中會調用 preemptone(p),該邏輯中也會檢查 m.locks 標識符。
GMP、GC 相關的詳細介紹可以參考:mp.weixin.qq.com/s/vR44Y6AGK…
(c)Pool.pin() 函數中如果 Processor.ID 小於 Pool.localSize(如果大於,其實就是 localSize=0、Pool 實例未初始化),就返回 Pool.local 數組中的第 Processor.ID 個元素和 Processor.ID。
而 Pool.local 數組大小就是 P 的數量,詳細的見 local 初始化過程。
(d)Pool.pinSlow() 函數:初始化 Pool.local 數組
Pool 實例第一次調用 Get 的時候纔會進入該邏輯(注意,是每個 P 都有第一次 Get 調用的概念,但是隻有一個 P 上的 G 才能執行這個操作,因爲有 allPoolsMu 鎖互斥)。
func (p *Pool) pinSlow() (*poolLocal, int) {
// G-M 先解鎖
runtime_procUnpin()
// 全局鎖 allPoolsMu,控制了只有一個 PG 能夠執行初始化邏輯
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 再加鎖,因爲後面還有解鎖操作
pid := runtime_procPin()
// runtime_procPin() 後不會發生 GC,所以獲取當前的localSize和local值
s := p.localSize
l := p.local
if uintptr(pid) < s { // 已經被初始化
return indexLocal(l, pid), pid
}
// 第一次時進行註冊
if p.local == nil {
allPools = append(allPools, p)
}
size := runtime.GOMAXPROCS(0) // P 的個數
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
複製代碼
- Pool 把自己註冊進 allPools 數組;
- Pool.local 數組按照 runtime.GOMAXPROCS(0) 的大小進行分配。如果是默認的,那麼 P 個數就是 CPU 的個數;
2、shared.popHead() 函數是從無鎖隊列 poolDequeue.Head 處獲取數據
- 當前 P 上的 G 取緩存對象時,只從頭部鏈表節點指向的無鎖隊列裏取。取不到,沿着 prev 指針到下一個無鎖隊列上重複操作,也沒有的話。就到別的 P 上竊取。
- 盜竊者 G 在偷緩存對象時,只從尾部鏈表節點指向的無鎖隊列裏取。取不到,沿着 next 指針到下一個無鎖隊列上重複操作,也沒有的話。就到別的 P 那繼續嘗試偷,直到都偷不着,就調用 New 方法。
3、Pool.getSlow(pid) 從其他 P 中竊取對象,或者從本 P 的二級緩存 victim 中進行搜索。
(a)用 popTail() 從其他 P 那竊取對象
回顧一下:從當前 P 的 poolLocal 中取對象使用的是 popHead 方法,而從其他 P 的 poolLocal 中竊取對象時使用的是 popTail 方法,再回到上文中對 poolDequeue 的定義,可以知道,當前 P 對本地 poolLocal 是生產者,對其他 P 的 poolLocal 而言是消費者。
注意的是,popTail() 對成員變量的操作與 pushHead() 是反着來的,避免兩個操作有交叉環節。
func (p *Pool) getSlow(pid int) interface{} {
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 遍歷其他 P 對應的數據桶(雙向鏈表+環形隊列),從隊尾嘗試獲取對象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 嘗試從 victim cache 中取對象
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
// 先 private 區
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
// 後 shared 區
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果沒有找到對象,則清空victim cache
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
複製代碼
(b)victim cache 翻譯爲 “受害者緩存”
受害者緩存是由 Norman Jouppi 提出的一種提高緩存性能的硬件技術。如他的論文所述:
Miss caching places a fully-associative cache between cache and its re-fill path. Misses in the cache that hit in the miss cache have a one cycle penalty, as opposed to a many cycle miss penalty without the miss cache. Victim Caching is an improvement to miss caching that loads the small fully-associative cache with victim of a miss and not the requested cache line.
其實可以理解爲二級緩存,在一級緩存中搜索不到對象後,再下降到二級繼續搜索。
注意的是,如果在二級緩存 victim 依舊沒有找到元素的話,就把 victimSize 置 0,防止後來的 “人” 再到 victim 裏找。
2.5、Put(x interface{})
Put 就是將緩存對象放入當前 P 對應的數據桶中,它也有漸進的邏輯:優先將變量放入 private 區中,對應的在 Get 時會先取這個位置上的對象;private 區如果已經被佔用了,就放入 shared 雙向鏈表中。
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 { // Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 初始化;禁用搶佔; 獲取對應的 poolLocal(poolLocalInternal) 對象
l, _ := p.pin()
// 嘗試放到最快的位置,這個位置也跟 Get 請求的順序是一一對應的;
if l.private == nil {
l.private = x
x = nil
}
if x != nil { // 複用 x 變量來控制邏輯
l.shared.pushHead(x) // 放到 shared 雙向鏈表中
}
runtime_procUnpin() // G-M 鎖定解除,pin中有加鎖
if race.Enabled {
race.Enable()
}
}
複製代碼
注意的是,Put 操作也會調用 Pool.pin() ,所以上文的 “第一次執行 Get() 操作時初始化 Pool.local 數組” 是不嚴謹的(Pool.local 也可能會在這裏創建)。
2.6、清理動作在 GC 時
1、三個全局變量
var (
allPoolsMu Mutex // 全局鎖
allPools []*Pool // 存放程序運行期間所有的 Pool 實例地址
oldPools []*Pool // 配合 victim 機制用的
)
複製代碼
2、在 GC 開始的時候,gcStart() 函數中會調用 clearpools() -> poolCleanup() 函數。也就是說,每一輪 GC 都是對所有的 Pool 做一次清理
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup()
}
....
複製代碼
3、poolCleanup() 批量清理 allPools 裏的元素
func poolCleanup() {
// 清理 oldPools 上的 victim 的元素
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 把 local 數組裏的元素遷移到 victim 上
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 清理掉 allPools 和 oldPools 中所有的 Pool 實例
oldPools, allPools = allPools, nil
}
複製代碼
victim 將刪除緩存的動作由一次操作變爲了兩次操作,每次清理的是上上次的緩存內容,本次只是將緩存內容轉移至 victim 中。對應的在 Get 操作中,也會去 victim 成員中嘗試獲取元素。
這樣的設計,不僅提高了緩存的時間、緩存對象的命中率,並且對 shared 區的 O(n) 複雜度的遍歷,變成 O(1) 操作,避免了性能波動。
注意的是,清除只是解除對象的引用,真正的清理由 GC 完成。
4、從清理操作來看,“第一次執行 Get() 操作會初始化 Pool.local 數組” 這句話還有另一個錯誤,正確的是在每次 GC 後 local 都會賦值爲 nil,都需要重新初始化。
2.7、思考 & 總結
1、Pool 是如何實現併發安全的?
某一時刻 P 只會調度一個 G,那麼對於生產者而言,當前 P 操作的都是本 P 的 poolLocal。相當於是通過隔離避免了數據競爭,即調用 pushHead 和 popHead 並不需要加鎖。
當消費者是其他 P(竊取時),會進行 popTail 操作,這時會和 pushHead/popHead 操作形成數據競爭。pushHead 的流程是先取 slot,再判斷是否可插入,最後修改 headTail;而 popTail 的流程是先修改 headTail,再取 slot,然後重置 slot。pushHead 修改 head 位置,popTail 修改 tail 位置,並且都是用的公共字段 headTail,這樣的話使用 CAS 原子操作就可以避免讀寫衝突。
2、pinSlow() 函數中爲什麼先執行了 runtime_procUnpin,隨後又執行了 runtime_procPin?
目的是避免獲取不到全局鎖 allPoolsMu 卻一直佔用當前 P 的情況。而 runtime_procUnpin 和 runtime_procPin 之間,本 G 可能已經切換到其他 P 上,而 []poolLocal 也可能已經被初始化。
3、爲什麼需要將 poolDequeue 串成鏈表?
因爲 poolDequeue 的實現是固定大小的。
4、爲什麼要禁止 copy sync.Pool 實例?
因爲 copy 後,對於同一個 Pool 實例中的 cache 對象,就有了兩個指向來源。原 Pool 清空之後,copy 的 Pool 沒有清理掉,那麼裏面的對象就全都泄露了。並且 Pool 的無鎖設計的基礎是多個 Goroutine 不會操作到同一個數據結構,Pool 拷貝之後則不能保證這點(因爲存儲的成員都是指針)。
四、版本迭代
當前 sync.Pool 的實現,是在 1.13 版本中進行大改動後的結果,其通過無鎖機制和兩輪迴收機制大大提高了緩存對象的使用效率。優秀的實現都是在不斷的迭代中慢慢進步的,而在 version1.12 及之前 sync.Pool 存在如下一些問題:
- 每次 GC 都回收所有緩存對象,如果緩存對象數量太大,會導致 STW1 階段的耗時增加;並且會導致緩存對象命中率下降,New 方法的執行造成額外的內存分配消耗。
- Pool.Get 和 Pool.Put 方法都是通過 mutex 鎖實現的併發安全,併發高的情況下效率低。如果本 P 數據桶中沒有緩存會逐個去其他數據桶進行竊取,極端情況下,最多嘗試 P 次搶鎖並查詢緩存。
爲此,1.13 版本在併發和緩存時間上進行了優化。
相關代碼(version1.12)如下,簡單的看過就可以發現導致上述各個問題的原因:
1、數據結構
type Pool struct {
noCopy noCopy
// local fixed-size per-P pool, actual type is [P]poolLocal
local unsafe.Pointer
localSize uintptr // size of the local array
New func() interface{}
}
type poolLocal struct {
poolLocalInternal
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
// vip位置,即緩存的第一層
private interface{}
// 數據桶 ,每個 P 都對應一個。因爲別的 P 上的 G 可以進行竊取,所以要加鎖。
shared []interface{}
Mutex
}
複製代碼
同樣的,也是每個 P 對應了一個數據桶,local 成員就是 [P]poolLocal 數組,用 Processor.ID 區分各自的桶。數據桶裏有 private 區和 shared 區(這個結構比 version1.13 的簡單很多)。
private 區只能存放一個對象,作爲緩存的第一層,每次優先 Get/Put private 區。因爲每個 P 在任意時刻只運行一個 G,並且每個 P 只會操作自己的 private 區,所以在 private 區上寫入和取出對象是不用加鎖的。
shared 區是一個 slice,可以存放多個對象。進 shared 區就是 append 操作,出 shared 區就截取 slice[:last-1] 操作。shared 區上寫入和取出對象要加鎖,因爲別的 PG 可能來竊取。
2、加鎖 Get,並有輪詢竊取的可能
查詢緩存順序爲:先看當前 P 的 private 區是否有數據;再加鎖,嘗試從當前 P 的 shared 區獲取數據;循環遍歷其他 P 的 share 區並加鎖,嘗試獲取數據;如果都獲取不到數據就 New 一個
func (p *Pool) Get() interface{} {
// 獲取本 P 對應的數據桶
l := p.pin()
// 先看當前 P 的 private 區是否有數據
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
// 再加鎖,嘗試從當前 P 的 shared 區獲取數據
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
// 循環遍歷其他 P 的 share 區並加鎖,嘗試獲取數據
if x == nil {
x = p.getSlow()
}
}
.......
}
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
pid := runtime_procPin()
runtime_procUnpin()
// 遍歷一次其他 P 的 share 區,嘗試進行竊取
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size)) // 定位到某個P上的shared區
l.Lock()
last := len(l.shared) - 1
// 如果有緩存對象,就返回,並解鎖
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock() // 沒有緩存對象,解鎖,繼續遍歷下一個P
}
return x
}
複製代碼
由於這裏的各種加鎖邏輯(其實 Put 操作裏也有鎖),就會導致併發效率問題。
爲此,version1.13 引入了單生產者多消費者的雙端無鎖環形隊列。
3、清理緩存
同樣的,sync.Pool 在 init() 中向 runtime 註冊了一個 cleanup 方法。它在 STW1 階段被調用的,如果執行時間過久,就會硬生生延長 STW1 階段耗時。
同樣的,cleanup 就是遍歷清空當前註冊的全部 Pool 實例,很容易造成 Pool 池中沒有數據。
func poolCleanup() {
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
複製代碼
爲了解決相應問題,version1.13 就是引入了 victim 成員,實現了兩輪迴收機制,將實際回收的時間線拉長,單位時間內 GC 的開銷減小。
參考
深入淺出 sync.Pool ,圍觀最全的使用姿勢,理解最深刻的原理
sync.Pool 源碼級原理剖析
sync.Pool 的缺點和優化之路
深入剖析 sync.Pool(文中有各種引申的計算機原理、GMP 調度、GC 知識,值得好好學習)
深度解密 Go 語言之 sync.pool
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://juejin.cn/post/6989798306440282148