sync-Pool 高性能設計之集大成者

概述

sync.Pool 是 Go 語言標準庫中的一個併發安全的對象池,可以用來緩存那些需要重複創建和銷燬的對象,從而避免頻繁地進行內存分配和回收,降低內存和 GC 壓力。

需要注意的是: 任何存儲在對象池中的元素可能會被隨時刪除,如果元素是一個資源類的引用,並且該資源僅在對象池中被引用 (沒有其他地方引用了),那麼當該元素被對象池刪除時,其指向的資源同時也會被釋放。

內部實現

sync.Pool 的使用方法相信讀者已經熟練掌握,本文主要來探究一下底層源代碼實現,文件路徑爲 $GOROOT/src/sync/pool.go,筆者的 Go 版本爲 go1.19 linux/amd64

💡 sync.Pool 的源代碼中細節非常之多,爲了閱讀體驗和效率,筆者幾乎沒有刪減代碼,而且也基本對每行代碼都做了對應的註解和上下文聯繫,這是本文的特色,請讀者留意。

數據結構

全局變量

var (
 // 鎖
 allPoolsMu Mutex

 // 全局的所有緩存池
 allPools []*Pool

 // victim cache 緩存池
 oldPools []*Pool
)

數據結構圖

這裏假設 runtime.GOMAXPROCS() = 4, 處理器 P 的數量爲 4 個,讀者在閱讀下面的源代碼探究過程時,可以對照着結構圖進行分析。

sync.Pool 數據結構

緩存池對象

sync.Pool 包的核心對象,所有的操作都是基於該對象進行的。

// Pool 一旦使用後,便不能再複製

// 在 Go 內存模型術語中,調用 Put(x) 方法在調用 Get 方法之前同步
// 在 Go 內存模型術語中,調用 New 方法在調用 Get 方法之前同步
type Pool struct {
 // noCopy 可以添加到 struct 中,實現 "首次使用之後,無法被複制" 的功能,主要服務於 `go vet`
 // 假設一個緩存池對象 A 被對象 B 拷貝了,接着 A 被清空了,B 裏面的緩存對象指針指向的對象將會不可控
 noCopy noCopy

 // 指向固定長度的數組,數組長度爲處理器 P 的個數,轉換後其實就是 [P]poolLocal 數組
 //    實際的底層數據結構是切片,不過下文中統一用數組描述,讀者不必在意這個細節
 // 訪問時根據處理器 P 的 ID (作爲索引) 去訪問
 // 優化點: 多個 goroutine 使用同一個緩存池時,可以減少競爭,提高性能
 //        類似於分段鎖中降低鎖粒度的設計理念
 local     unsafe.Pointer
 // local 數組的長度
 localSize uintptr

 // 上一輪的 local, 內容語義和 local 一致
 // 新一輪 GC 到來時,更新爲當前 local 的值
 victim     unsafe.Pointer

 // 上一輪的 localSize, 內容語義和 localSize 一致
 // 新一輪 GC 到來時,更新爲當前 localSize 的值
 victimSize uintptr

 // 創建對象的函數
 New func() any
}

這裏引用下維基百科關於 victim cache 的描述:

所謂受害者緩存(Victim Cache),是 CPU 硬件處理緩存的一種技術,是一個與直接匹配或低相聯緩存並用的、容量很小的全相聯緩存。 當一個數據塊被逐出緩存時,並不直接丟棄,而是暫先進入受害者緩存。如果受害者緩存已滿,就替換掉其中一項。當進行緩存標籤匹配時, 在與索引指向標籤匹配的同時,並行查看受害者緩存,如果在受害者緩存發現匹配,就將其此數據塊與緩存中的不匹配數據塊做交換,同時返回給處理器。

簡單通俗地來說,就是已經失效的緩存先不清除,保留一段時間,如果保留時間內該緩存又被用到了,就重新啓用,如果保留時間內一直沒有被用到,就清除。

poolLocal 對象

每個處理器 P 都有一個 poolLocal 對象,GetPut 方法會優先操作當前處理器的對象池。

type poolLocal struct {
 poolLocalInternal

 // CPU Cache 是距離 CPU 最近的 Cache,如果能充分利用,會極大提升程序性能
 // 防止僞共享,湊齊 128 bytes 的整數倍 (這個小技巧非常值得學習)

 // 什麼是CPU 僞共享?
 //   CPU CacheLine 通常是以 64 byte 或 128 byte 爲單位
 //   在緩存池場景中,各個 P 的 poolLocal 以數組形式存儲在一起
 //   假設 CPU CacheLine 爲 128 byte,而 poolLocal 不足 128 byte 時
 //   CacheLine 將會帶上其他 P 的 poolLocal 的內存數據,以湊齊一個整塊的 CacheLine
 //   如果這時兩個相鄰的 P 同時在兩個不同的 CPU 核上運行,將會同時去覆蓋刷新 CacheLine
 //   造成 CacheLine 的反覆失效,那 CPU Cache 就失去了作用

 //   例如 兩個相鄰但是不同的處理器 P (PA, PB) 被分配在同一個 CacheLine
 //   此時 PA 要修改, PB 也要修改 (兩者去競爭 同一個 CacheLine)
 //   當 PA 被修改時,緩存系統強制 PB 所在 CPU 核的 CacheLine 失效
 //   當 PB 被修改時,緩存系統強制 PA 所在 CPU 核的 CacheLine 失效
 //   最終導致 PA 和 PB 所在 CPU 核的 CacheLine 失效,降低性能

 // 如何避免 CPU 僞共享?
 //   將需要獨立訪問的變量放在不同的 CacheLine 中
 //   保證和 CacheLine 內存對齊

 // Linux 查看 CacheLine 單位大小
 // $ cat /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
 pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

poolLocalInternal 對象

poolLocalInternal 對象表示每個處理器 P 的本地對象池。

type poolLocalInternal struct {
 // 私有變量,只能由當前處理器操作
 private any
 // 共享變量,當前處理器可以執行 pushHead/popHead 操作,其他處理器只能執行 popTail 操作
 shared  poolChain
}

Go 1.13 版本開始,shared 字段的數據結構修改爲 單個生產者/多個消費者 雙端無鎖環形隊列,當前處理器 P 可以執行 pushHead/popHead 操作, 其他處理器 P 只能執行 popTail 操作。

單個生產者:當前處理器 P 上面運行的 goroutine 執行 Put 方法時,將對象放入隊列,並且只能放在隊列頭部,但是其他處理器 P 上運行的 goroutine 不能放入。 由於每個處理器 P 在任意時刻只有一個 goroutine 運行,所以無需加鎖。

多個消費者分兩種角色

  1. 在當前處理器 P 上運行的 goroutine,執行 Get 方法時,從隊列頭部取對象,由於每個處理器 P 在任意時刻只有一個 goroutine 運行,所以無需加鎖

  2. 在其他處理器 P 上運行的 goroutine,執行 Get 方法時,如果該處理器 P 沒有緩存對象,就到別的處理器 P 的隊列上竊取。 此時竊取者 goroutine 只能從隊列尾部取對象,因爲同時可能有多個竊取者 goroutine 竊取同一個處理器 P 的隊列, 所以用 CAS 來實現無鎖隊列功能

按照這種設計,poolDequeue.pushHeadpoolDequeue.popTail 存在競爭 (可能同時有多個 goroutine 同時操作), 而 poolDequeue.pushHeadpoolDequeue.popHead 不存在競爭 (只能有一個 goroutine 操作)。

poolChain 對象

poolChain 對象表示 poolDequeue 數據類型的雙端環形隊列鏈表,每個節點表示的隊列長度是後驅節點隊列長度的兩倍, 如果當前所有的節點隊列滿了,就創建一個新的隊列 (長度是當前頭節點隊列長度的 2 倍),然後掛載到頭節點。

// 隊列節點示意圖
// --------------------------------------------------------------------------
// | 節點 1, size: 64 | 節點 2, size: 32 | 節點 3, size: 16 | 節點 4, size: 8 |
// --------------------------------------------------------------------------
type poolChain struct {
 // head 表示頭節點隊列,只能由生產者操作,不存在競爭
 head *poolChainElt

 // tail 表示尾節點隊列,由多個消費者操作,存在競爭
 tail *poolChainElt
}

type poolChainElt struct {
 poolDequeue

 // next 由生產者原子性寫入,由消費者原子性讀取
 // 值只會從 nil 轉換爲非 nil

 // prev 由消費者原子性寫入,由生產者原子性讀取
 // 值只會從非 nil 轉換爲 nil
 next, prev *poolChainElt
}

爲什麼 poolChain 的數據結構是鏈表 + ring buffer (環形隊列) 呢?

因爲使用 ring buffer 數據結構的優點非常適用於 sync.Pool 對象池的使用場景。

  1. 預先分配好內存並且分配的元素內存可複用,避免了數據遷移

  2. 作爲底層數據結構的數組是連續內存結構,非常利於 CPU Cache, 在訪問 poolDequeue 隊列中的某個元素時,其附近的元素可能被加載到同一個 Cache Line 中,訪問速度更快

  3. 更高效的出隊和入隊操作,因爲環形隊列是首尾相連的,避免了普通隊列中隊首和隊尾頻繁變動的問題

poolDequeue 對象

poolDequeue 對象是一個由 單個生產者/多個消費者 模式組成的固定大小的無鎖隊列。單個生產者可以從隊列頭部執行 pushpop 操作, 多個消費者只能從隊列尾部執行 pop 操作。

type poolDequeue struct {
 // 經典的字段合併使用方法
    // 高 32 位 是 head, 指向下一個存放對象的索引
    // 低 32 位 是 tail, 指向隊列中最早 (下一個讀取) 的對象索引

 // 索引區間 tail <= i < head, 表示消費者可以操作的索引區域
 // 消費者可以在該區間不斷獲取對象,直至獲取到的對象爲 nil
 headTail uint64

 // vals 表示隊列元素容器,大小必須爲 2 的 N 次冪
 // 容器會在初始化時指定容量,實現數據元素內存預初始化
 vals []eface
}

爲什麼要將 headtail 合併到一個變量裏面?

因爲這樣可以進行原子操作,完成兩個字段的 lock free (無鎖編程) 優化。

例如:當隊列中僅剩一個對象時,如果多個處理器 P 同時訪問隊列,如果沒有進行併發限制,兩個處理器 P 都可能獲取到對象,這顯然是不符合預期的。 那麼在不引入互斥鎖的前提下,sync.Pool 是如何實現臨界區數據控制的呢?sync.Pool 利用了 atomic 包的提供的 CAS 操作,併發情況下兩個處理器 P 都可能獲取到對象,但是最終只會有一個處理器 P CAS 操作成功, 另外一個處理器操作失敗,在更新 headtail 兩個字段的時候,也是通過 CAS + 位運算 進行操作的。

小結

通過對源代碼中的數據結構進行分析,我們可以看到內部隱藏了非常多的設計技巧和對應的基礎理論知識,接下來開始閱讀構建於數據結構之上的具體算法。

這裏再放一張數據結構圖,方便讀者結合算法代碼進行分析。

sync.Pool 數據結構

對象歸還

我們首先來看下對象歸還流程,也就是如何把一個對象放入緩存池的某個隊列中,從 Pool.Put 方法開始追蹤代碼。

func (p *Pool) Put(x any) {
 ...

 l, _ := p.pin()
 if l.private == nil {
  // 優先設置私有變量
  l.private = x
 } else {
  // 其次設置共享變量
  l.shared.pushHead(x)
 }

 ...
}

func (c *poolChain) pushHead(val any) {
 d := c.head
 if d == nil {
  // 初始化頭節點
  // 對象池元素數量從 8 個開始,必須爲 2 的 N 次冪
  const initSize = 8
  d = new(poolChainElt)
  d.vals = make([]eface, initSize)
  c.head = d
  storePoolChainElt(&c.tail, d)
 }

 if d.pushHead(val) {
  // 如果對象成功加入隊列,直接返回
  return
 }

 // 如果當前隊列已滿,分配一個新的隊列 (長度是當前隊列的 2 倍)
 newSize := len(d.vals) * 2
 if newSize >= dequeueLimit {
  // 隊列長度最大爲 1073741824
  newSize = dequeueLimit
 }

 // 初始化新的隊列
 d2 := &poolChainElt{prev: d}
 d2.vals = make([]eface, newSize)
 // 將頭節點指向到新的隊列
 c.head = d2
 // 將新的隊列添加到鏈表中
 storePoolChainElt(&d.next, d2)
 // 將對象添加到新的隊列
 d2.pushHead(val)
}

poolDequeue.pushHead 方法將一個對象加入到隊列中,如果隊列已滿,返回 false,該方法必須由 單個生產者 操作。

func (d *poolDequeue) pushHead(val any) bool {
 ptrs := atomic.LoadUint64(&d.headTail)
 head, tail := d.unpack(ptrs)
 if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
  // 說明隊列已滿 (tail 索引 + 當前隊列元素個數) == head 索引
  return false
 }
 slot := &d.vals[head&uint32(len(d.vals)-1)]

 // 檢測索引位置的對象是否和 popTail 方法操作衝突
 typ := atomic.LoadPointer(&slot.typ)
 if typ != nil {
  // 有其他 goroutine 正在調用 popTail 方法操作當前位置的對象
  // 所以隊列實際上已滿
  return false
 }

 // 執行到這裏,typ == nil
 // 說明即使存在 popTail 方法操作當前位置的對象,操作也已經結束了,衝突解除
 if val == nil {
  val = dequeueNil(nil)
 }
 // 使用歸還的對象填充索引位置
 *(*any)(unsafe.Pointer(slot)) = val

 // head 索引 + 1
 atomic.AddUint64(&d.headTail, 1<<dequeueBits)
 return true
}

獲取對象

接下來探究從緩存池中獲取對象的流程,從 Pool.Get 方法開始追蹤代碼。

func (p *Pool) Get() any {
 l, pid := p.pin()
 // 首先嚐試從當前處理器的私有變量獲取對象
 x := l.private
 // 從私有變量獲取後,及時將私有變量置爲 nil
 l.private = nil

 if x == nil {
  // 私有變量沒有獲取到對象,嘗試從共享變量獲取
  x, _ = l.shared.popHead()
  if x == nil {
   // 當前處理器 P 沒有對象,嘗試從其他處理器竊取
   x = p.getSlow(pid)
  }
 }

 // 如果從所有處理器的緩存池都沒有獲取到對象,並且 New 方法不爲 nil
 // 那就調用 New 方法創建一個對象返回
 if x == nil && p.New != nil {
  x = p.New()
 }
 return x
}

func (c *poolChain) popHead() (any, bool) {
 d := c.head
 for d != nil {
  // 從隊列頭部開始獲取對象
  if val, ok := d.popHead(); ok {
   return val, ok
  }
  // 將當前隊列前驅節點作爲接下來要遍歷的隊列
  d = loadPoolChainElt(&d.prev)
 }
 return nil, false
}

poolDequeue.popHead 方法從隊列頭部刪除一個對象並返回,如果隊列爲空,返回 false, 該方法必須由 單個生產者 操作。

func (d *poolDequeue) popHead() (any, bool) {
 var slot *eface
 for {
  ptrs := atomic.LoadUint64(&d.headTail)
  // 高 32 bit 是 head
  // 低 32 bit 是 tail
  head, tail := d.unpack(ptrs)
  if tail == head {
   // 頭尾相等,說明隊列爲空
   return nil, false
  }

  // head 索引指向下一個新對象的索引位置,所以使用前先減 1
  head--
  ptrs2 := d.pack(head, tail)
  if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
   // CAS 操作成功,移除頭部的對象,此時 head 指向 head - 1
   // vals 切片的長度是 2 的 N 次冪,因此 len(d.vals)-1 之後,低的 N 位全是 1
   // 和 head 進行與運算之後,可以獲取到對象的索引下標
   // 例如: 切片長度 = 32, len(d.vals)-1 = 31
   //      head = 5, 索引下標 = 5 & 31 = 5
   //      head = 25, 索引下標 = 25 & 31 = 25
   slot = &d.vals[head&uint32(len(d.vals)-1)]
   break
  }
 }

 val := *(*any)(unsafe.Pointer(slot))
 if val == dequeueNil(nil) {
  // 獲取到的對象是 nil
  val = nil
 }
 // 重置索引位置的元素
 *slot = eface{}
 // 返回獲取到的對象
 return val, true
}

Pool.getSlow 方法用於當前處理器 P 沒有對象時,嘗試從其他處理器 P 竊取對象。

func (p *Pool) getSlow(pid int) any {
 // 原子加載 localSize 字段
 size := runtime_LoadAcquintptr(&p.localSize)
 locals := p.local

 // 嘗試從其他處理器獲取對象
 for i := 0; i < int(size); i++ {
  // 注意這裏定位處理器 P 的索引計算方式
  // pid+i+1 是爲了忽略當前處理器 P
  l := indexLocal(locals, (pid+i+1)%int(size))
  // 從隊列尾部獲取,減少併發衝突
  if x, _ := l.shared.popTail(); x != nil {
   return x
  }
 }

 // 如果從其他處理器沒有獲取到對象,嘗試從 victim 緩存中獲取 (也就是上一輪對象池中的對象)
 // 這樣做可以儘可能地複用對象
 size = atomic.LoadUintptr(&p.victimSize)
 if uintptr(pid) >= size {
  // 當前處理器不存在於 victim 中,可能原因如下:
  // 1. victim 已經被標記爲空
  // 2. 當前處理器比 victim 的長度要大,屬於 "後來創建的"
  // 此時直接返回即可,否則會發生處理器 pid 索引越界錯誤
  return nil
 }

 // 下面開始嘗試從 victim 中獲取對象

 // 嘗試從尾部處理器的私有變量獲取對象
 locals = p.victim
 l := indexLocal(locals, pid)
 if x := l.private; x != nil {
  l.private = nil
  return x
 }

 // 嘗試從其他處理器獲取對象
 for i := 0; i < int(size); i++ {
  // 注意這裏定位處理器 P 的索引計算方式和剛纔的不同
  // 這裏不需要忽略任何處理器
  l := indexLocal(locals, (pid+i)%int(size))
  // 從隊列尾部獲取,減少併發衝突
  if x, _ := l.shared.popTail(); x != nil {
   return x
  }
 }

 // 將 victim 緩存標記爲空,後續請求直接返回,避免多餘的查詢
 atomic.StoreUintptr(&p.victimSize, 0)

 return nil
}

func (c *poolChain) popTail() (any, bool) {
 d := loadPoolChainElt(&c.tail)
 if d == nil {
  // 如果鏈表尾部節點爲 nil, 直接返回
  return nil, false
 }

 for {
  // 在隊列尾部節點出隊之前,提前加載 d.next 指針很重要 (刪除鏈表節點的邊界條件)
  // 因爲 d 節點的尾元素出隊之後,d 節點可能會變爲 nil, 這樣永遠無法找到 d.next 節點了
  d2 := loadPoolChainElt(&d.next)

  if val, ok := d.popTail(); ok {
   return val, ok
  }

  if d2 == nil {
   // 如果 next 節點都變成 nil 了,說明隊列已經空了
   return nil, false
  }

  // 代碼執行到這裏,說明尾部節點 (d) 爲 nil
  // 這時就可以將其刪除了,防止下次調用 popTail 時發生錯誤 (誤以爲隊列已空)
  if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
   // 刪除 d2 的前驅節點,也就是 d (因爲此時前驅節點已經沒有數據了)
   storePoolChainElt(&d2.prev, nil)
  }
  d = d2
 }
}

poolDequeue.popTail 方法從隊列尾部刪除一個對象並返回,如果隊列爲空,返回 false, 該方法必須由 多個生產者 操作。

func (d *poolDequeue) popTail() (any, bool) {
 var slot *eface
 for {
  ptrs := atomic.LoadUint64(&d.headTail)
  head, tail := d.unpack(ptrs)
  if tail == head {
   // 頭尾相等,說明隊列爲空
   return nil, false
  }

  ptrs2 := d.pack(head, tail+1)
  if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
   // CAS 操作成功,移除尾部的對象,此時 tail 指向 tail + 1
   slot = &d.vals[tail&uint32(len(d.vals)-1)]
   break
  }
 }

 val := *(*any)(unsafe.Pointer(slot))
 if val == dequeueNil(nil) {
  val = nil
 }

 // 通過重置元素爲 nil 的方式通知 pushHead 方法 (因爲這兩個方法存在併發操作同一位置元素的可能)
 // 當前位置的元素已經操作完成 (pushHead 方法操作當前元素時會檢測否和 popTail 方法衝突)
 // 先重置 val, 再重置 typ
 slot.val = nil
 atomic.StorePointer(&slot.typ, nil)

 return val, true
}

流程圖

注意當前 P 和 其他 P 的區別

輔助方法

pin

pin 方法綁定當前 goroutine 到處理器 P 並禁止搶佔,返回一個 poolLocal 對象指針和處理器 P 的 ID。

func (p *Pool) pin() (*poolLocal, int) {
 pid := runtime_procPin()
 // 原子加載 localSize 字段
 s := runtime_LoadAcquintptr(&p.localSize)
 l := p.local
 if uintptr(pid) < s {
  // 如果 pid 小於 local 數組的長度
  // 說明對應的 poolLocal 對象已經創建,直接返回即可
  return indexLocal(l, pid), pid
 }

 // 代碼執行到這裏,一般是因爲兩種原因:
 // 1. 緩存池還未創建
 // 2. 處理器 P 的數量被動態調整了
 return p.pinSlow()
}

pinSlow

pinSlow 方法創建一個新的 poolLocal 對象並返回。

func (p *Pool) pinSlow() (*poolLocal, int) {
 runtime_procUnpin()
 allPoolsMu.Lock()
 defer allPoolsMu.Unlock()
 pid := runtime_procPin()

 // 加鎖完成,就不需要原子性加載了
 s := p.localSize
 l := p.local
 if uintptr(pid) < s {
  // 雙重檢測
  // local 數組已經發生變化 (加鎖期間被其他線程修改)
  // 直接返回即可
  return indexLocal(l, pid), pid
 }
 if p.local == nil {
  allPools = append(allPools, p)
 }

 // 使用處理器 P 的數量作爲數組長度
 size := runtime.GOMAXPROCS(0)
 // 初始化新的 local 數組
 local := make([]poolLocal, size)
 // 原子更新 local 字段
 atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
 // 原子更新 localSize 字段
 runtime_StoreReluintptr(&p.localSize, uintptr(size))
 return &local[pid], pid
}

indexLocal

indexLocal 方法根據索引參數,返回 local 數組中對應的 poolLocal 對象。

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
 lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
 return (*poolLocal)(lp)
}

緩存池 GC 過程

sync.Pool 包文件中有一個 init 函數,內部註冊了 GC 執行方法。

func init() {
    runtime_registerPoolCleanup(poolCleanup)
}

poolCleanup 方法在緩存池的清理過程中,並不會直接釋放池對象,而是會將其放入 victim 中,等到下一輪清理時再釋放。這樣可以防止緩存池被直接釋放後,變爲冷啓動時面對突然暴漲的對象請求造成的性能抖動,通過將緩存池放入 victim 中,可以起到避免 GC 毛刺、平滑過渡的作用

func poolCleanup() {
 // 函數會在 GC 過程中的 STW 階段被調用

 // 清理 victim 緩存對象池
 for _, p := range oldPools {
  p.victim = nil
  p.victimSize = 0
 }

 // 將當前所有緩存池對象移動到 victim 緩存池對象
 for _, p := range allPools {
  p.victim = p.local
  p.victimSize = p.localSize
  p.local = nil
  p.localSize = 0
 }

 // 將全局緩存池移動到全局 victim 緩存池
 // 將全局 victim 重置爲 nil
 oldPools, allPools = allPools, nil
}

小結

通過學習 sync.Pool 的源代碼,我們可以深入理解和學習到的高性能編程設計理念和技巧:

Reference

鏈接

[1]

Go 高性能 - 無鎖編程: https://dbwu.tech/posts/golang_lockfree/

[2]

Go 高性能 - sync.Pool: https://golang.dbwu.tech/performance/sync_pool/

[3]

false sharing: https://en.wikipedia.org/wiki/False_sharing

[4]

victim cache: https://en.wikipedia.org/wiki/Victim_cache

[5]

多圖詳解 Go 的 sync.Pool 源碼: https://www.luozhiyun.com/archives/416

[6]

golang 的對象池 sync.pool 源碼解讀: https://zhuanlan.zhihu.com/p/99710992

[7]

深度分析 Golang sync.Pool 底層原理: https://www.cyhone.com/articles/think-in-sync-pool

[8]

Go 1.13 中 sync.Pool 是如何優化的?: https://colobu.com/2019/10/08/how-is-sync-Pool-improved-in-Go-1-13/

[9]

僞共享(false sharing),併發編程無聲的性能殺手: https://www.cnblogs.com/cyfonly/p/5800758.html

[10]

memory barrier: https://github.com/cch123/golang-notes/blob/master/memory_barrier.md

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/dLzWAqM9lCln83jhkvmtMw