Go sync-Pool 保姆級教程

Pool's purpose is to cache allocated but unused items for later reuse, relieving pressure on the garbage collector. That is, it makes it easy to build efficient, thread-safe free lists. However, it is not suitable for all free lists. A Pool is safe for use by multiple goroutines simultaneously.

sync.Pool 是一個併發安全的緩存池,能夠併發且安全地存儲、獲取元素 / 對象。常用於對象實例創建會佔用較多資源的場景。但是它不具有嚴格的緩存作用,因爲 Pool 中的元素 / 對象的釋放時機是隨機的。
作爲緩存的一種姿勢,sync.Pool 能夠避免元素 / 對象的申請內存操作和初始化過程,以提高性能。當然,這裏有個 trick,釋放元素 / 對象的操作是直接將元素 / 對象放回池子,從而免去了真正釋放的操作。
另外,不考慮內存浪費和初始化消耗的情況下,“使用 sync.Pool 管理多個對象”和 “直接 New 多個對象” 兩者的區別在於後者會創建出更多的對象,併發高時會給 GC 帶來非常大的負擔,進而影響整體程序的性能。因爲 Go 申請內存是程序員觸發的,而回收卻是 Go 內部 runtime GC 回收器來執行的。即,使用 sync.Pool 還可以減少 GC 次數。

總結一下:sync.Pool 是以緩存池的形式,在創建對象等場景下,減少 GC 次數、提高程序穩定性。後文將稱之爲 Pool 池,池子中的元素統一稱爲對象。

1.2、接口使用

1、創建一個 Pool 實例
Pool 池的模式是通用型的 (存儲對象的類型爲 interface{}),所有的類型的對象都可以進行使用。注意的是,作爲使用方不能對 Pool 裏面的對象個數做假定,同時也無法獲取 Pool 池中對象個數。

pool := &sync.Pool{}
複製代碼

2、Put 和 Get 接口
(1) Put(interface{}):將一個對象加入到 Pool 池中

3、爲 Pool 實例配置 New 方法
沒有配置 New 方法時,如果 Get 操作多於 Put 操作,繼續 Get 會得到一個 nil interface{} 對象,所以需要代碼進行兼容。
配置 New 方法後,Get 獲取不到對象時(Pool 池中已經沒有對象了),會調用自定義的 New 方法創建一個對象並返回。

pool := &sync.Pool {
    New: func() interface {} {
        return struct{}{}
    }
}
複製代碼

注意的是,sync.Pool 本身數據結構是併發安全的,但是 Pool.New 函數(用戶自定義的)不一定是線程安全的。並且 Pool.New 函數可能會被併發調用,如果 New 函數里面的實現邏輯是非併發安全的,那就會有問題。

4、關於 sync.Pool 的性能優勢,可以試一下 go/src/sync/pool_test.go 中的幾個壓測函數。

二、使用 & 實踐

2.1、使用場景

1、增加臨時對象的重用率(sync.Pool 本質用途)。在高併發業務場景下出現 GC 問題時,可以使用 sync.Pool 減少 GC 負擔。

2、不適合存儲帶狀態的對象,因爲獲取對象是隨機的(Get 到的對象可能是剛創建的,也可能是之前創建並 cache 住的),並且緩存對象的釋放策略完全由 runtime 內部管理。像 socket 長連接、數據庫連接,有人說適合有人說不適合。

3、不適合需要控制緩存元素個數的場景,因爲無法知道 Pool 池裏面的對象個數,以及對象的釋放時機也是未知的。

4、典型的應用場景之一是在網絡包收取發送的時候,用 sync.Pool 會有奇效,可以大大降低 GC 壓力。

2.2、實踐

1、使用規範:作爲對象生成器
(a) 初始化 sync.Pool 實例,並需要配置併發安全的 New 方法;
(b) 創建對象的地方,通過 Pool.Get() 獲取;
(c) 在 Get 後馬上進行 defer Pool.Put(x);

2、使用建議:不要對 Pool 池中的對象做任何假定
即使用前需要清理緩存對象,有兩種方案:在調用 Pool.Put 前進行 memset 對象操作;在 Pool.Get 操作後對對象進行 memset 操作。

3、demo:重用一個緩衝區,用於文件寫入數據的存儲

func writeFile(pool *sync.Pool, filename string) error {
    buf := pool.Get().(*bytes.Buffer)  // 如果是第一個調用,則創建一個緩衝區
    defer pool.Put(buf) // 將緩衝區放回 sync.Pool中

    buf.Reset()  // Reset 緩存區,不然會連接上次調用時保存在緩存區裏的內容
    buf.WriteString("Demo")
    return ioutil.WriteFile(filename, buf.Bytes(), 0644)
}
複製代碼

4、幾個真實的使用場景

2.3、思考

1、先 Put,或者主動 Put 會造成什麼?
首先,Put 任意類型的元素都不會報錯,因爲存儲的是 interface{} 對象,且內部沒有進行類型的判斷和斷言。如果 Put 在業務上不做限制,那麼 Pool 池中就可能存在各種類型的數據,就導致在 Get 後的代碼會非常繁瑣(需要進行類型判斷,否則有 panic 隱患)。另外,Get 得到的對象是隨機的,緩存對象的回收也是隨機的,所以先 Put 一個對象根本就沒有實際作用。

綜上,sync.Pool 本質上可以是一個雜貨鋪,支持存放任何類型,所以 Get 出來和 Put 進去的對象類型要業務自己把控。

2、如果只調用 Get 不調用 Put 會怎麼樣?
首先就算不調用 Pool.Put,GC 也會去釋放 Get 獲取的對象(當沒有人去再用它時)。但是隻進行 Get 操作的話,就相當於一直在生成新的對象,Pool 池也失去了它最本質的功能。

三、源碼分析

基於 go 1.14 version

2.1、Pool 的數據結構

type Pool struct {  
   // 用於檢測 Pool 池是否被 copy,因爲 Pool 不希望被 copy。用這個字段可以在 go vet 工具中檢測出被 copy(在編譯期間就發現問題) 
   noCopy noCopy  // A Pool must not be copied after first use.

   // 實際指向 []poolLocal,數組大小等於 P 的數量;每個 P 一一對應一個 poolLocal
   local     unsafe.Pointer 
   localSize uintptr      // []poolLocal 的大小

   // GC 時,victim 和 victimSize 會分別接管 local 和 localSize;
   // victim 的目的是爲了減少 GC 後冷啓動導致的性能抖動,讓分配對象更平滑;
   victim     unsafe.Pointer 
   victimSize uintptr       

   // 對象初始化構造方法,使用方定義
   New func() interface{}
}
複製代碼

1、noCopy 字段是爲了檢測 Pool 池的 copy 行爲。但無法阻止編譯(用戶進行 copy 行爲也能成功運行程序),只能通過 go vet 檢查出用戶的 copy 行爲。noCopy 是 go1.7 引入的一個靜態檢查機制,對用戶代碼有效。

2、local 和 localSize 這兩個字段實現了一個數組,數組元素爲 poolLocal 結構體。每個 Processor(P) 都會對應數組中的一個元素,訪問時,P 的 id 就是 [P]poolLocal 下標索引。通過這樣的設計,多個 goroutine 使用同一個 Pool 時,減少了數據競爭。

3、victim 和 victimSize 在 (GC)poolCleanup 流程裏賦值爲 local 和 localSize。victim 機制是把 Pool 池的清理由一輪 GC 改成兩輪 GC,進而提高對象的複用率,減少抖動。

4、函數變量 New 對外暴露,使用者定義對象初始化構造行爲。todo,沒找到 New 變量初始化的地方。

2.2、poolLocal 的數據結構

// Pool.local 指向的數組元素類型
type poolLocal struct {
   poolLocalInternal
   pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// Local per-P Pool appendix.
type poolLocalInternal struct {
   private interface{} // Can be used only by the respective P.
   shared  poolChain   // 雙鏈表結構,用於掛接 cache 元素
}
複製代碼

1、Pool.local 指針指向的就是 poolLocal 數組。

2、poolLocal struct 中真實有用的只有 poolLocalInternal struct。其中的 pad 字段是用於內存填充,對齊 cache line,防止僞共享 (false sharing) 的性能問題。

3、並不是一個緩存對象就對應了一個 poolLocal 數組中的一個元素(一個 poolLocalInternal struct 對象),而是有多少個 P(Processor) 就有多少個 poolLocalInternal 對象。可以把 poolLocalInternal 對象理解成一個數據桶,每個 P 下面掛載了一個數據桶。

4、poolLocalInternal struct 中的 shared poolChain 成員就是數據桶的作用;private 成員算是一個 vip 緩存位,每次都最先被訪問。poolChain struct 由頭尾鏈表指針構成,其實就是一個雙向鏈表。

type poolChain struct {
   head *poolChainElt  // 頭指針
   tail *poolChainElt  // 尾指針
}
type poolChainElt struct {
   poolDequeue   // 本質是個數組內存空間,管理成 ringbuffer 的模式;
   next, prev *poolChainElt  // 前向、後向指針
}
複製代碼

2.3、數據桶 (poolChain + poolDequeue)

雙鏈表的節點 poolChainElt struct 對象,仍然不是 Pool 池中的對象。poolChainElt struct 中的 poolDequeue struct 是一段數組空間,類似於 ringbuffer,是一個無鎖環形隊列。Pool 池管理的對象存儲在 poolDequeue 的 vals[] 數組裏,即緩存對象存儲在環形隊列 poolDequeue 中。

poolDequeue 是單生產者多消費者的固定大小的無鎖環形隊列。生產者可以從 head 寫 / 插入、從 head 讀 / 刪除,而消費者僅可從 tail 讀 / 刪除。headTail 變量指向了隊列的頭和尾,通過位運算區分 head 和 tail 的值(高 32 位爲 head 的索引下標,低 32 位爲 tail 的索引下標)。利用 atomic 和 CAS 對 headTail 值進行原子操作,從而實現了無鎖機制。

type poolDequeue struct {
   headTail uint64
   vals []eface
}
type eface struct {
   typ, val unsafe.Pointer
}
複製代碼

1、入隊 pushHead

func (c *poolChain) pushHead(val interface{}) {
   d := c.head

   // Initialize the chain.
   if d == nil {
      const initSize = 8  // 初始長度爲8
      d = new(poolChainElt)
      d.vals = make([]eface, initSize)
      c.head = d
      storePoolChainElt(&c.tail, d)
   }

   if d.pushHead(val) {
      return
   }

   // The current dequeue is full. Allocate a new one of twice the size.
   newSize := len(d.vals) * 2
   if newSize >= dequeueLimit {  // Can't make it any bigger.
      newSize = dequeueLimit
   }

   // 拼接鏈表
   d2 := &poolChainElt{prev: d}
   d2.vals = make([]eface, newSize)
   c.head = d2
   storePoolChainElt(&d.next, d2)
   d2.pushHead(val)
}

func (d *poolDequeue) pushHead(val interface{}) bool {
   ptrs := atomic.LoadUint64(&d.headTail)
   head, tail := d.unpack(ptrs)
  
   // Ring式隊列,頭尾相等則隊列已滿
   if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
      return false
   }

   slot := &d.vals[head&uint32(len(d.vals)-1)]

   // Check if the head slot has been released by popTail.
   typ := atomic.LoadPointer(&slot.typ)
   if typ != nil {
      // Another goroutine is still cleaning up the tail, so
      // the queue is actually still full.
      return false
   }

   // The head slot is free, so we own it.
   if val == nil {
      val = dequeueNil(nil)
   }
   *(*interface{})(unsafe.Pointer(slot)) = val

   // Increment head. This passes ownership of slot to popTail
   // and acts as a store barrier for writing the slot.
   atomic.AddUint64(&d.headTail, 1<<dequeueBits)
   return true
}
複製代碼

2、出隊 popHead

func (c *poolChain) popHead() (interface{}, bool) {
   d := c.head
   for d != nil {
      if val, ok := d.popHead(); ok {
         return val, ok
      }

      // There may still be unconsumed elements in the previous dequeue, so try backing up.
      d = loadPoolChainElt(&d.prev)
   }
   return nil, false
}

func (d *poolDequeue) popHead() (interface{}, bool) {
   var slot *eface
   for {
      ptrs := atomic.LoadUint64(&d.headTail)
      head, tail := d.unpack(ptrs)
      
      // 判斷隊列是否爲空
      if tail == head {
         return nil, false
      }

      head--  // head位置是隊頭的前一個位置,所以此處要先退一位
      ptrs2 := d.pack(head, tail)  // 新的 headTail 值
      
      // 通過 CAS 判斷當前沒有併發修改就拿到數據
      if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
         slot = &d.vals[head&uint32(len(d.vals)-1)]
         break
      }
   }

   // 取出數據
   val := *(*interface{})(unsafe.Pointer(slot))
   if val == dequeueNil(nil) {
      val = nil
   }
  
   // 重置slot,typ和val均爲nil
   *slot = eface{}
   return val, true
}
複製代碼

3、從隊尾獲取元素 popTail
假如在鏈表長度爲 3 的情況下。尾部節點指向的無鎖隊列裏緩存對象被偷光了。那麼尾部節點會沿着 next 指針前移,把舊的無鎖隊列內存釋放掉。此時鏈表長度變爲 2,這就是鏈表的收縮策略。最小時也剩下一個節點,不會收縮成空鏈表。

func (c *poolChain) popTail() (interface{}, bool) {
   d := loadPoolChainElt(&c.tail)
   if d == nil {
      return nil, false
   }

   for {
      // It's important that we load the next pointer
      // *before* popping the tail. In general, d may be
      // transiently empty, but if next is non-nil before
      // the pop and the pop fails, then d is permanently
      // empty, which is the only condition under which it's
      // safe to drop d from the chain.
      d2 := loadPoolChainElt(&d.next)

      if val, ok := d.popTail(); ok {
         return val, ok
      }

      if d2 == nil {
         // This is the only dequeue. It's empty right
         // now, but could be pushed to in the future.
         return nil, false
      }

      // The tail of the chain has been drained, so move on
      // to the next dequeue. Try to drop it from the chain
      // so the next pop doesn't have to look at the empty
      // dequeue again.
      if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
         // We won the race. Clear the prev pointer so
         // the garbage collector can collect the empty
         // dequeue and so popHead doesn't back up
         // further than necessary.
         storePoolChainElt(&d2.prev, nil)
      }
      d = d2
   }
}
複製代碼

2.4、Get() interface{}

Get 是從 Pool 池裏獲取一個對象,它會進行逐步嘗試:先嚐試從本 P 對應的 poolLocal 中的 private 區獲取;再嘗試從本 P 的 shared 隊列中獲取元素;然後嘗試從其他 P 的 shared 隊列中獲取元素;接着嘗試從 victim cache 裏獲取元素;最後初始化得到一個新的元素。(下圖來自碼農桃花源)

func (p *Pool) Get() interface{} {
   if race.Enabled {
      race.Disable()
   }
   
   //1. Pool 池中有元素
   l, pid := p.pin()  // l是一個 poolLocal 對象  // pin()的邏輯看後文
   x := l.private  // 從 private 區裏獲取
   l.private = nil
   if x == nil {
      // 從 shared 隊列裏獲取
      x, _ = l.shared.popHead()
      
      // 嘗試從獲取其他 P 的隊列裏取元素,或者嘗試從 victim cache 裏取元素
      if x == nil {
         x = p.getSlow(pid)
      }
   }
  
   // G-M 鎖定解除;上鎖的邏輯在 pin() 中
   runtime_procUnpin()

   if race.Enabled {
      race.Enable()
      if x != nil {
         race.Acquire(poolRaceAddr(x))
      }
   }
   
   //2. Pool 池中沒有元素,需要當場初始化得到一個
   if x == nil && p.New != nil {
      x = p.New()
   }
   return x
}

func (p *Pool) pin() (*poolLocal, int) {
   pid := runtime_procPin()

   // In pinSlow we store to local and then to localSize, here we load in opposite order.
   // Since we've disabled preemption, GC cannot happen in between.
   // Thus here we must observe local at least as large localSize.
   // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
   s := atomic.LoadUintptr(&p.localSize) // load-acquire
   l := p.local                          // load-consume
   if uintptr(pid) < s {
      return indexLocal(l, pid), pid
   }
   return p.pinSlow()
}
複製代碼

1、Pool.pin() 函數的工作是:

(a)runtime_procPin() 就是封裝了 procPin,禁止當前執行 G 所在的 MP 被搶佔調度

func procPin() int {
    _g_ := getg()
    mp := _g_.m
    mp.locks++
    return int(mp.p.ptr().id)  // 並返回當前所在 P 的 ID
}
複製代碼

procPin 函數的目的是把當前 G 鎖住在當前 M(聲明當前 M 不能被搶佔,也可以認爲是 G 綁定到 P)。具體的操作就是執行 mp.locks++。因爲在 newstack 裏會對此條件做判斷:

if preempt {
    // 已經打了搶佔標識了,但是還需要判斷條件滿足才能讓出執行權;
    if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
        gp.stackguard0 = gp.stack.lo + _StackGuard
        gogo(&gp.sched) // never return
    }
}
複製代碼

相對的,後面的 runtime_procUnpin() 操作就是執行了 mp.locks-- 操作。

(b)鎖住 PG 後爲什麼會無法觸發 GC?
觸發 GC 都會調用 gcStart(trigger gcTrigger) 函數,其大致邏輯是:先調用 preemptall() 嘗試搶佔所有的 P,然後停掉當前 P,遍歷所有的 P,如果 P 處於系統調用則直接 stop 掉;然後處理空閒的 P;最後檢查是否存在需要等待處理的 P,如果有則循環等待,並嘗試調用 preemptall()。
preemptall() 函數中會調用 preemptone(p),該邏輯中也會檢查 m.locks 標識符。

GMP、GC 相關的詳細介紹可以參考:mp.weixin.qq.com/s/vR44Y6AGK…

(c)Pool.pin() 函數中如果 Processor.ID 小於 Pool.localSize(如果大於,其實就是 localSize=0、Pool 實例未初始化),就返回 Pool.local 數組中的第 Processor.ID 個元素和 Processor.ID。
而 Pool.local 數組大小就是 P 的數量,詳細的見 local 初始化過程。

(d)Pool.pinSlow() 函數:初始化 Pool.local 數組
Pool 實例第一次調用 Get 的時候纔會進入該邏輯(注意,是每個 P 都有第一次 Get 調用的概念,但是隻有一個 P 上的 G 才能執行這個操作,因爲有 allPoolsMu 鎖互斥)。

func (p *Pool) pinSlow() (*poolLocal, int) {
   // G-M 先解鎖
   runtime_procUnpin()
   
   // 全局鎖 allPoolsMu,控制了只有一個 PG 能夠執行初始化邏輯
   allPoolsMu.Lock()
   defer allPoolsMu.Unlock()
  
   // 再加鎖,因爲後面還有解鎖操作
   pid := runtime_procPin()
  
   // runtime_procPin() 後不會發生 GC,所以獲取當前的localSize和local值
   s := p.localSize
   l := p.local
   if uintptr(pid) < s {  // 已經被初始化
      return indexLocal(l, pid), pid
   }
   
   // 第一次時進行註冊
   if p.local == nil {
      allPools = append(allPools, p)
   }
   
   size := runtime.GOMAXPROCS(0)  // P 的個數
   local := make([]poolLocal, size)
   atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
   atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
   return &local[pid], pid
}
複製代碼

2、shared.popHead() 函數是從無鎖隊列 poolDequeue.Head 處獲取數據

3、Pool.getSlow(pid) 從其他 P 中竊取對象,或者從本 P 的二級緩存 victim 中進行搜索。
(a)用 popTail() 從其他 P 那竊取對象
回顧一下:從當前 P 的 poolLocal 中取對象使用的是 popHead 方法,而從其他 P 的 poolLocal 中竊取對象時使用的是 popTail 方法,再回到上文中對 poolDequeue 的定義,可以知道,當前 P 對本地 poolLocal 是生產者,對其他 P 的 poolLocal 而言是消費者。
注意的是,popTail() 對成員變量的操作與 pushHead() 是反着來的,避免兩個操作有交叉環節。

func (p *Pool) getSlow(pid int) interface{} {
   size := atomic.LoadUintptr(&p.localSize) // load-acquire
   locals := p.local                        // load-consume
  
   // 遍歷其他 P 對應的數據桶(雙向鏈表+環形隊列),從隊尾嘗試獲取對象
   for i := 0; i < int(size); i++ {
      l := indexLocal(locals, (pid+i+1)%int(size))
      if x, _ := l.shared.popTail(); x != nil {
         return x
      }
   }

   // 嘗試從 victim cache 中取對象
   size = atomic.LoadUintptr(&p.victimSize)
   if uintptr(pid) >= size {
      return nil
   }
   locals = p.victim
   // 先 private 區
   l := indexLocal(locals, pid)
   if x := l.private; x != nil {
      l.private = nil
      return x
   }
   // 後 shared 區
   for i := 0; i < int(size); i++ {
      l := indexLocal(locals, (pid+i)%int(size))
      if x, _ := l.shared.popTail(); x != nil {
         return x
      }
   }

   // 如果沒有找到對象,則清空victim cache
   atomic.StoreUintptr(&p.victimSize, 0)
   return nil
}
複製代碼

(b)victim cache 翻譯爲 “受害者緩存”

受害者緩存是由 Norman Jouppi 提出的一種提高緩存性能的硬件技術。如他的論文所述:
Miss caching places a fully-associative cache between cache and its re-fill path. Misses in the cache that hit in the miss cache have a one cycle penalty, as opposed to a many cycle miss penalty without the miss cache. Victim Caching is an improvement to miss caching that loads the small fully-associative cache with victim of a miss and not the requested cache line.

其實可以理解爲二級緩存,在一級緩存中搜索不到對象後,再下降到二級繼續搜索。
注意的是,如果在二級緩存 victim 依舊沒有找到元素的話,就把 victimSize 置 0,防止後來的 “人” 再到 victim 裏找。

2.5、Put(x interface{})

Put 就是將緩存對象放入當前 P 對應的數據桶中,它也有漸進的邏輯:優先將變量放入 private 區中,對應的在 Get 時會先取這個位置上的對象;private 區如果已經被佔用了,就放入 shared 雙向鏈表中。

func (p *Pool) Put(x interface{}) {
   if x == nil {
      return
   }
   if race.Enabled {
      if fastrand()%4 == 0 { // Randomly drop x on floor.
         return
      }
      race.ReleaseMerge(poolRaceAddr(x))
      race.Disable()
   }
  
   // 初始化;禁用搶佔; 獲取對應的 poolLocal(poolLocalInternal) 對象
   l, _ := p.pin()
  
   // 嘗試放到最快的位置,這個位置也跟 Get 請求的順序是一一對應的;
   if l.private == nil {
      l.private = x
      x = nil
   }
   if x != nil {  // 複用 x 變量來控制邏輯
      l.shared.pushHead(x)  // 放到 shared 雙向鏈表中
   }
   
   runtime_procUnpin()   // G-M 鎖定解除,pin中有加鎖
   
   if race.Enabled {
      race.Enable()
   }
}
複製代碼

注意的是,Put 操作也會調用 Pool.pin() ,所以上文的 “第一次執行 Get() 操作時初始化 Pool.local 數組” 是不嚴謹的(Pool.local 也可能會在這裏創建)。

2.6、清理動作在 GC 時

1、三個全局變量

var (
   allPoolsMu Mutex  // 全局鎖
   allPools []*Pool  // 存放程序運行期間所有的 Pool 實例地址
   oldPools []*Pool  // 配合 victim 機制用的
)
複製代碼

2、在 GC 開始的時候,gcStart() 函數中會調用 clearpools() -> poolCleanup() 函數。也就是說,每一輪 GC 都是對所有的 Pool 做一次清理

func init() {
   runtime_registerPoolCleanup(poolCleanup)
}

func clearpools() {
   // clear sync.Pools
   if poolcleanup != nil {
      poolcleanup()
   }
....
複製代碼

3、poolCleanup() 批量清理 allPools 裏的元素

func poolCleanup() {
   // 清理 oldPools 上的 victim 的元素
   for _, p := range oldPools {
      p.victim = nil
      p.victimSize = 0
   }

   // 把 local 數組裏的元素遷移到 victim 上
   for _, p := range allPools {
      p.victim = p.local
      p.victimSize = p.localSize
      p.local = nil
      p.localSize = 0
   }

   // 清理掉 allPools 和 oldPools 中所有的 Pool 實例
   oldPools, allPools = allPools, nil
}
複製代碼

victim 將刪除緩存的動作由一次操作變爲了兩次操作,每次清理的是上上次的緩存內容,本次只是將緩存內容轉移至 victim 中。對應的在 Get 操作中,也會去 victim 成員中嘗試獲取元素。
這樣的設計,不僅提高了緩存的時間、緩存對象的命中率,並且對 shared 區的 O(n) 複雜度的遍歷,變成 O(1) 操作,避免了性能波動。
注意的是,清除只是解除對象的引用,真正的清理由 GC 完成。

4、從清理操作來看,“第一次執行 Get() 操作會初始化 Pool.local 數組” 這句話還有另一個錯誤,正確的是在每次 GC 後 local 都會賦值爲 nil,都需要重新初始化。

2.7、思考 & 總結

1、Pool 是如何實現併發安全的?
某一時刻 P 只會調度一個 G,那麼對於生產者而言,當前 P 操作的都是本 P 的 poolLocal。相當於是通過隔離避免了數據競爭,即調用 pushHead 和 popHead 並不需要加鎖。

當消費者是其他 P(竊取時),會進行 popTail 操作,這時會和 pushHead/popHead 操作形成數據競爭。pushHead 的流程是先取 slot,再判斷是否可插入,最後修改 headTail;而 popTail 的流程是先修改 headTail,再取 slot,然後重置 slot。pushHead 修改 head 位置,popTail 修改 tail 位置,並且都是用的公共字段 headTail,這樣的話使用 CAS 原子操作就可以避免讀寫衝突。

2、pinSlow() 函數中爲什麼先執行了 runtime_procUnpin,隨後又執行了 runtime_procPin?
目的是避免獲取不到全局鎖 allPoolsMu 卻一直佔用當前 P 的情況。而 runtime_procUnpin 和 runtime_procPin 之間,本 G 可能已經切換到其他 P 上,而 []poolLocal 也可能已經被初始化。

3、爲什麼需要將 poolDequeue 串成鏈表?
因爲 poolDequeue 的實現是固定大小的。

4、爲什麼要禁止 copy sync.Pool 實例?
因爲 copy 後,對於同一個 Pool 實例中的 cache 對象,就有了兩個指向來源。原 Pool 清空之後,copy 的 Pool 沒有清理掉,那麼裏面的對象就全都泄露了。並且 Pool 的無鎖設計的基礎是多個 Goroutine 不會操作到同一個數據結構,Pool 拷貝之後則不能保證這點(因爲存儲的成員都是指針)。

四、版本迭代

當前 sync.Pool 的實現,是在 1.13 版本中進行大改動後的結果,其通過無鎖機制和兩輪迴收機制大大提高了緩存對象的使用效率。優秀的實現都是在不斷的迭代中慢慢進步的,而在 version1.12 及之前 sync.Pool 存在如下一些問題:

爲此,1.13 版本在併發和緩存時間上進行了優化。
相關代碼(version1.12)如下,簡單的看過就可以發現導致上述各個問題的原因:

1、數據結構

type Pool struct {
   noCopy noCopy
   // local fixed-size per-P pool, actual type is [P]poolLocal
   local     unsafe.Pointer 
   localSize uintptr      // size of the local array
   New func() interface{}
}
type poolLocal struct {
   poolLocalInternal
   pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
   // vip位置,即緩存的第一層
   private interface{}   
   // 數據桶 ,每個 P 都對應一個。因爲別的 P 上的 G 可以進行竊取,所以要加鎖。
   shared  []interface{}
   Mutex                
}
複製代碼

同樣的,也是每個 P 對應了一個數據桶,local 成員就是 [P]poolLocal 數組,用 Processor.ID 區分各自的桶。數據桶裏有 private 區和 shared 區(這個結構比 version1.13 的簡單很多)。

private 區只能存放一個對象,作爲緩存的第一層,每次優先 Get/Put private 區。因爲每個 P 在任意時刻只運行一個 G,並且每個 P 只會操作自己的 private 區,所以在 private 區上寫入和取出對象是不用加鎖的。

shared 區是一個 slice,可以存放多個對象。進 shared 區就是 append 操作,出 shared 區就截取 slice[:last-1] 操作。shared 區上寫入和取出對象要加鎖,因爲別的 PG 可能來竊取。

2、加鎖 Get,並有輪詢竊取的可能
查詢緩存順序爲:先看當前 P 的 private 區是否有數據;再加鎖,嘗試從當前 P 的 shared 區獲取數據;循環遍歷其他 P 的 share 區並加鎖,嘗試獲取數據;如果都獲取不到數據就 New 一個

func (p *Pool) Get() interface{} {
   // 獲取本 P 對應的數據桶
   l := p.pin()
   
   // 先看當前 P 的 private 區是否有數據
   x := l.private
   l.private = nil
   runtime_procUnpin()
   if x == nil {
      // 再加鎖,嘗試從當前 P 的 shared 區獲取數據
      l.Lock()
      last := len(l.shared) - 1
      if last >= 0 {
         x = l.shared[last]
         l.shared = l.shared[:last]
      }
      l.Unlock()
      
      // 循環遍歷其他 P 的 share 區並加鎖,嘗試獲取數據
      if x == nil {
         x = p.getSlow()
      }
   }
.......
}

func (p *Pool) getSlow() (x interface{}) {
   // See the comment in pin regarding ordering of the loads.
   size := atomic.LoadUintptr(&p.localSize) // load-acquire
   local := p.local                         // load-consume
   pid := runtime_procPin()
   runtime_procUnpin()
  
   // 遍歷一次其他 P 的 share 區,嘗試進行竊取
   for i := 0; i < int(size); i++ {
      l := indexLocal(local, (pid+i+1)%int(size))  // 定位到某個P上的shared區
      l.Lock()
      last := len(l.shared) - 1
      // 如果有緩存對象,就返回,並解鎖
      if last >= 0 {
         x = l.shared[last]
         l.shared = l.shared[:last]
         l.Unlock()
         break
      }
      l.Unlock()  // 沒有緩存對象,解鎖,繼續遍歷下一個P
   }
   return x
}
複製代碼

由於這裏的各種加鎖邏輯(其實 Put 操作裏也有鎖),就會導致併發效率問題。
爲此,version1.13 引入了單生產者多消費者的雙端無鎖環形隊列。

3、清理緩存
同樣的,sync.Pool 在 init() 中向 runtime 註冊了一個 cleanup 方法。它在 STW1 階段被調用的,如果執行時間過久,就會硬生生延長 STW1 階段耗時。
同樣的,cleanup 就是遍歷清空當前註冊的全部 Pool 實例,很容易造成 Pool 池中沒有數據。

func poolCleanup() {
   for i, p := range allPools {
      allPools[i] = nil
      for i := 0; i < int(p.localSize); i++ {
         l := indexLocal(p.local, i)
         l.private = nil
         for j := range l.shared {
            l.shared[j] = nil
         }
         l.shared = nil
      }
      p.local = nil
      p.localSize = 0
   }
   allPools = []*Pool{}
}
複製代碼

爲了解決相應問題,version1.13 就是引入了 victim 成員,實現了兩輪迴收機制,將實際回收的時間線拉長,單位時間內 GC 的開銷減小。

參考

深入淺出 sync.Pool ,圍觀最全的使用姿勢,理解最深刻的原理
sync.Pool 源碼級原理剖析
sync.Pool 的缺點和優化之路
深入剖析 sync.Pool(文中有各種引申的計算機原理、GMP 調度、GC 知識,值得好好學習)
深度解密 Go 語言之 sync.pool

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://juejin.cn/post/6989798306440282148