sync-Map Code Review

概述

sync.Map 提供了併發安全的 map 操作,數據結構語義類似 map[any]any,多個 goroutine 併發操作時無需加鎖。

官方的建議是大多數情況下,應該使用普通 map 類型,並完成對應的鎖和併發控制以保證安全性,這樣可以使類型擁有更好的安全性和可維護性。

sync.Map 類型是針對兩種特殊的場景進行優化的

  1. 當指定的 key 只被寫入一次,但是被讀取多次,例如不斷增長的緩存類應用

  2. 當多個 goroutine 同時讀和寫,但是每個 goroutine 覆蓋到的是不同的 key (類似分段鎖的概念)

在這兩種場景下,與使用 map + 鎖 相比,使用 sync.Map 可以大大降低鎖的競爭。

本文來探究一下 sync.Map 的內部實現,文件路徑爲 $GOROOT/src/sync/map.go,筆者的 Go 版本爲 go1.19 linux/amd64

數據結構

Map 對象

// Map 零值是空的並且可以使用
// Map 一旦使用後,便不能再複製
// Loads, stores, and deletes 通過延遲操作來均攤成本,從而達到常數運行時間
// 在 Go 內存模型術語中,Map 確保寫操作在任何觀察其變動的讀操作之前同步,其中讀和寫的定義如下
//     Load, LoadAndDelete 是讀操作
//     Delete, LoadAndDelete, Store 是寫操作
//     當調用 LoadOrStore 方法返回值 loaded 爲 false 時,LoadOrStore 是寫操作,否則是讀操作
type Map struct {
 // 當寫 read map 或者讀寫 dirty map 時,需要加互斥鎖
 mu Mutex

 // read 字段包含了一部分 map 內容,這部分內容併發訪問安全(無論是否持有互斥鎖)
 // read 字段本身對 load 而言永遠是安全的,但執行 store 操作時,必須持有互斥鎖
 // read 是隻讀的數據結構,訪問無需加鎖,sync.Map 優先訪問 read 字段
 // read 中存儲的數據值可以在不持有互斥鎖的情況下併發更新
 //     但更新一個之前被標記爲 expunged 的元素需要持有互斥鎖,將該元素拷貝到 dirty map 並標記 unexpunged (expunged 逆操作)
 read atomic.Value // readOnly

 // dirty map 包含了訪問時需要持有互斥鎖的元素
 // 爲了確保 dirty map 中的元素能夠快速地移動到 read map
 // 它也包含了那些 read map 中標記爲 (non-expunged) 的元素
 // 新添加的元素會優先放入 dirty map

 // 被標記爲 expunged 的元素不會存儲在 dirty map 中
 // 被標記爲 expunged 的元素如果要存儲新的值,需要先執行 unexpunged (expunged 逆操作) 添加到 dirty map, 然後再更新值

 // 如果 dirty map == nil, 下一個對 map 的寫入將通過淺拷貝一個空 map 來初始化它,忽略過期的元素
 dirty map[any]*entry

 // 一旦發生足夠多的 misses 次數,超過拷貝 dirty map 的成本,dirty map 會被合併進 read map(unamended 狀態下)
 // 並且下一次的存儲操作會生成一個新的 dirty map
 misses int
}

readOnly 對象

readOnly 對象表示 Map.read 字段中的只讀數據 (注意: 這裏的只讀指的是邏輯只讀,底層的數據還是可以修改的), 其中 m 字段用來表示具體的只讀數據,如果某個 key 存在於 dirty 字段中,卻不存在於 m 字段中,amended 字段等於 true

type readOnly struct {
 m       map[any]*entry
 amended bool
}

expunged 類型

expunged 類型表示一個任意類型數據的指針,主要作用是作爲刪除標誌,標記從 dirty map 中刪除的元素

var expunged = unsafe.Pointer(new(any))

entry 對象

entry 對象表示任意 key 對應的數據,這裏同樣使用指針來表示。

type entry struct {
 // p 指向 entry 對應的數據

 // 如果 p == nil, 說明對應的 entry 已經被刪除
 //     此時,m.dirty == nil 或 m.dirty[k] 指向該 entry
 // 如果 p == expunged, 說明對應的 entry 已經被刪除
 //     此時,m.dirty != nil 且 m.dirty 中不存在 entry
 // 其他情況代表 entry 是合法的值並且存在於 m.read.m[key]
 //     如果 m.dirty != nil, entry 同時也會存在於 m.dirty[key] 中

 // 刪除 entry 時執行 CAS 操作替換爲 nil (並不進行實際刪除)
 // 當 m.dirty 後續創建時 (dirty 提升後第一次新建 entry)
 // 會對 entry 執行 CAS 操作,由 nil 替換爲 expunged, 且不設置 m.dirty[key] 的值

 // 一個 entry 對應的值可以用 CAS 操作來更新,前提是 p != expunged
 // 如果 p == expunged, entry 對應的值只能在首次賦值 m.dirty[key] = e 之後更新
 // 這樣查找操作可以通過 dirty map 來找到這個 entry
 p unsafe.Pointer // *interface{}
}

func newEntry(i any) *entry {
    return &entry{p: unsafe.Pointer(&i)}
}

// 從 entry 中原子性加載實際的數據值
func (e *entry) load() (value any, ok bool) {
 p := atomic.LoadPointer(&e.p)
 if p == nil || p == expunged {
  return nil, false
 }
 return *(*any)(p)true
}

獲取操作

Load 方法

Load 方法會優先讀取 read map (無需加鎖),如果沒有找到對應的元素,會加鎖嘗試從 dirty map 中讀取。

// 獲取 map 中 key 對應的值,如果不存在返回 nil, 第二個返回值表示 key 值是否存在於 map 中
func (m *Map) Load(key any) (value any, ok bool) {
 // 優先從 read map 中讀取 (無鎖操作)
 read, _ := m.read.Load().(readOnly)
 e, ok := read.m[key]

 // 如果 read 沒有對應的 key,並且 amended 字段標識 dirty map 存在 read map 中不存在的 key
 // 則加鎖嘗試從 dirty.map 中獲取
 if !ok && read.amended {
  m.mu.Lock()
  // double check 是避免在加鎖期間,dirty map 提升爲 read map
  // 如果已經發生合併,可以避免報告一個不必要的 miss
  // 因爲足夠多的 miss 數會將 dirty map 提升爲 read map
  read, _ = m.read.Load().(readOnly)
  e, ok = read.m[key]

  if !ok && read.amended {
   e, ok = m.dirty[key]
   // 無論元素是否存在 dirty map 中,都記錄 1 次 miss
   // 在 dirty map 被提升到 read map 之前,這個 key 對應的值會一直從 dirty map 中獲取

   // 方法內部可能將 dirty map 提升到 read map
   m.missLocked()
  }
  m.mu.Unlock()
 }
 if !ok {
  return nil, false
 }

 // 如果 key 存在,加載對應的值
 return e.load()
}

設置操作

Store 方法

Store 方法用於設置 key 和對應的值。

func (m *Map) Store(key, value any) {
 // 如果 key 對應的值存在於 read map 中,嘗試直接修改
 read, _ := m.read.Load().(readOnly)
 if e, ok := read.m[key]; ok && e.tryStore(&value) {
  return
 }

 m.mu.Lock()
 // double check 是避免在加鎖期間,dirty map 提升爲 read map
 read, _ = m.read.Load().(readOnly)
 if e, ok := read.m[key]; ok {
  if e.unexpungeLocked() {
   // 如果 key 對應的值存在於 read map 中,但 p == expunged, 說明 dirty != nil 並且 key 對應的值不存在於 dirty map 中
   //    先將 p 的狀態由 expunged 改爲 nil
   //    dirty map 新建 key
   //    更新 entry.p = value (read map 和 dirty map 指向同一個entry)
   m.dirty[key] = e
  }
  // 如果 key 對應的值存在於 read map 中,但 p != expunged, 直接更新 entry
  // 此時 m.dirty == nil 或 m.dirty[key] == e
  e.storeLocked(&value)
 } else if e, ok := m.dirty[key]; ok {
  // 如果 key 對應的值不存在於 read map 中,但存在於 dirty map 中,直接寫入更新 entry (read map 中依然不存在)
  e.storeLocked(&value)
 } else {
  if !read.amended {
   // 如果 read map 和 dirty map 都不存在該值
   //     如果 dirty map == nil, 創建 dirty map, 並從 read map 中拷貝未刪除的元素
   //     更新 amended 字段,標識 dirty map 中存在 read map 中不存在的 key
   //     將 K => V 寫入 dirty map, read map 不變
   m.dirtyLocked()
   m.read.Store(readOnly{m: read.m, amended: true})
  }
  m.dirty[key] = newEntry(value)
 }
    // Tips: hot path 不使用 defer 釋放鎖
 m.mu.Unlock()
}

tryStore 方法

tryStore 方法嘗試直接更新 entry, 如果 entry == expunged, 返回 false。

func (e *entry) tryStore(i *any) bool {
 for {
  p := atomic.LoadPointer(&e.p)
  if p == expunged {
   return false
  }
  if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
   return true
  }
 }
}

unexpungeLocked 方法

unexpungeLocked 方法用於取消 entry 的 expunged 標記。

func (e *entry) unexpungeLocked() (wasExpunged bool) {
 return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

storeLocked 方法

storeLocked 方法用於直接將值存入 entry。

func (e *entry) storeLocked(i *any) {
 atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

LoadOrStore 方法

LoadOrStore 方法用於獲取 key 對應的值,如果 key 對應的值存在,直接返回 (此時第二個返回值爲 true),否則就將 key 設置爲參數值然後返回 (此時第二個返回值爲 false)。

func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
 // 如果命中就直接返回,避免鎖操作
 read, _ := m.read.Load().(readOnly)
 if e, ok := read.m[key]; ok {
  actual, loaded, ok := e.tryLoadOrStore(value)
  if ok {
   return actual, loaded
  }
 }

 m.mu.Lock()
 // double check 是避免在加鎖期間,dirty map 提升爲 read map
 read, _ = m.read.Load().(readOnly)
 if e, ok := read.m[key]; ok {
  if e.unexpungeLocked() {
  // 如果 key 對應的值存在於 read map 中,但 p == expunged,  說明 dirty != nil 並且 key 對應的值不存在於 dirty map 中
  //    先將 p 的狀態由 expunged 改爲 nil
  //    dirty map 新建 key
  //    更新 entry.p = value (read map 和 dirty map 指向同一個entry)
   m.dirty[key] = e
  }
  actual, loaded, _ = e.tryLoadOrStore(value)
 } else if e, ok := m.dirty[key]; ok {
  // 如果 key 對應的值不存在於 read map 中,但存在於 dirty map 中,直接寫入更新 entry (read map 中依然不存在)
  actual, loaded, _ = e.tryLoadOrStore(value)
  // 記錄 1 次 miss
  // 方法內部可能將 dirty map 提升到 read map
  m.missLocked()
 } else {
  if !read.amended {
   // 如果 read map 和 dirty map 都不存在該值
   //     如果 dirty map == nil, 創建 dirty map, 並從 read map 中拷貝未刪除的元素
   //     更新 amended 字段,標識 dirty map 中存在 read map 中不存在的 key
   //     將 K => V 寫入 dirty map, read map 不變
   m.dirtyLocked()
   m.read.Store(readOnly{m: read.m, amended: true})
  }
  m.dirty[key] = newEntry(value)
  actual, loaded = value, false
 }
 m.mu.Unlock()

 return actual, loaded
}

tryLoadOrStore 方法

tryLoadOrStore 方法用於原子性地加載或存儲一個 entry 對應的值,前提是 entry 未被標記爲 expunged, 如果 entry 被標記爲 expunged, 函數不做任何修改,第二個返回值爲 false。

func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
 p := atomic.LoadPointer(&e.p)
 if p == expunged {
  return nil, false, false
 }
 if p != nil {
  return *(*any)(p), true, true
 }

 ic := i
 for {
  if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
   return i, false, true
  }
  p = atomic.LoadPointer(&e.p)
  if p == expunged {
   return nil, false, false
  }
  if p != nil {
   return *(*any)(p), true, true
  }
 }
}

刪除操作

Delete 方法

// 刪除 key 對應的值
func (m *Map) Delete(key any) {
 m.LoadAndDelete(key)
}

LoadAndDelete 方法

LoadAndDelete 方法是 Delete 方法的具體實現,和 Load 方法的機制一樣,優先從 read map 中刪除 (無需加鎖),如果沒有找到對應的元素, 會加鎖嘗試從 dirty map 中刪除,第二個返回值表示 key 對應的值是否存在。

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
 read, _ := m.read.Load().(readOnly)
 e, ok := read.m[key]

 if !ok && read.amended {
  m.mu.Lock()
  // double check 是避免在加鎖期間,dirty map 提升爲 read map
  read, _ = m.read.Load().(readOnly)
  e, ok = read.m[key]
  if !ok && read.amended {
  // 如果 key 對應的值不存在於 read map 中,但存在於 dirty map 中,直接刪除
   e, ok = m.dirty[key]
   delete(m.dirty, key)

   // 記錄 1 次 miss
   // 在 dirty map 被提升到 read map 之前,這個 key 對應的值會一直從 dirty map 中獲取
   m.missLocked()
  }
  m.mu.Unlock()
 }
 if ok {
  // key 對應的值存在於 read map 中,直接刪除
  return e.delete()
 }
 return nil, false
}

delete 方法

delete 方法負責 entry 的具體刪除。

func (e *entry) delete() (value any, ok bool) {
 for {
  p := atomic.LoadPointer(&e.p)
  // 如果 p == nil, 說明對應的 entry 已經被刪除
  // 如果 p == expunged, 說明對應的 entry 已經被刪除
  if p == nil || p == expunged {
   return nil, false
  }
  // 這裏是數據真正被刪除的代碼
  if atomic.CompareAndSwapPointer(&e.p, p, nil) {
   return *(*any)(p)true
  }
 }
}

遍歷操作

Range 方法

如果全部的數據都在 read map 中,無需加鎖,直接讀取即可,否則會先加鎖,將 dirty map 中的數據全部加載到 read map, 然後重置 dirty mapmisses 計數器,這樣可以避免遍歷過程中多次訪問 dirty map 時導致的加鎖和性能影響。

// Range 方法按照每個 key 和 value 在 map 中出現的順序依次調用參數函數 f
// 如果函數 f 返回 false, range 停止遍歷

// Range 方法是無序遍歷,但是保證每個 key 只會被訪問一次
// 如果某個 key 對應的 value 被併發地更新或者刪除了,Range 可能返回修改前或修改後的值
// Range 方法不阻塞接收者的其他方法,甚至回調函數 f 本身也可以調用 Map 的任何方法

// 不論 Map 中有多少元素,Range 時間複雜度都是 O(N)
// 即使函數 f 在一定的調用次數之後返回 false 也一樣
func (m *Map) Range(f func(key, value any) bool) {
 // 在調用 Range 時,能夠遍歷 map 裏面所有的 key
 // 如果 read.amended == false
 //  說明 dirty map 中沒有元素,直接遍歷 read map 就可以
 read, _ := m.read.Load().(readOnly)

 if read.amended {
  // m.dirty 包含 read.m 中不存在的 key, 不過影響不大,畢竟 Range 時間複雜度爲 O(N)
  // 假設調用方不會中斷遍歷過程,對於 Range 的調用必然會分階段完整地拷貝整個 map
  // 這時候可以將 dirty map 提升到 read map 就可以提升性能
  m.mu.Lock()
  // double check 是避免在加鎖期間,dirty map 提升爲 read map
  read, _ = m.read.Load().(readOnly)
  if read.amended {
   read = readOnly{m: m.dirty}
   m.read.Store(read)
   m.dirty = nil
   m.misses = 0
  }
  // Tips: hot path 不使用 defer 釋放鎖
  m.mu.Unlock()
 }

 // 此時 dirty map 和 read map 已經合併
 // 普通 map 遍歷
 for k, e := range read.m {
  v, ok := e.load()
  if !ok {
   continue
  }
  if !f(k, v) {
   break
  }
 }
}

miss 計數

missLocked 方法

missLocked 方法用於增加 Map 操作過程中的 misses 計數,一旦發生足夠多的 misses 次數,超過拷貝 dirty map 的成本,直接將 dirty map 賦值給 read map。

func (m *Map) missLocked() {
 m.misses++
 // 成本計算: miss 次數小於 dirty map 長度
 if m.misses < len(m.dirty) {
  return
 }
 // 提升過程很簡單,直接將 dirty map 賦值給 read map
 // 提升完成之後,amended == false, m.dirty == nil
 m.read.Store(readOnly{m: m.dirty})
 m.dirty = nil
 m.misses = 0
}

數據拷貝

dirtyLocked 方法

dirtyLocked 方法用於當 dirty map 等於 nil 時,將 read map 元素拷貝到 dirty map。

func (m *Map) dirtyLocked() {
 if m.dirty != nil {
  return
 }

 read, _ := m.read.Load().(readOnly)
 m.dirty = make(map[any]*entry, len(read.m))
 for k, e := range read.m {
  // 1. 將所有爲 nil 的元素標記爲 expunged
  // 2. 只拷貝沒有標記爲 expunged 的元素
  if !e.tryExpungeLocked() {
   m.dirty[k] = e
  }
 }
}

tryExpungeLocked 方法

tryExpungeLocked 方法用於將等於 nil 的 entry 標記爲 expunged。

func (e *entry) tryExpungeLocked() (isExpunged bool) {
 p := atomic.LoadPointer(&e.p)
 for p == nil {
  if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
   return true
  }
  p = atomic.LoadPointer(&e.p)
 }
 return p == expunged
}

相關問題

expunged 的使用場景

調用 Store, LoadOrStore 方法存入新的元素時,如果 read mapdirty map 中都不存在該元素,會調用 dirtyLocked 方法將 read map 中所有未標記爲 expunged 的元素複製到 dirty map 中。

expunged 標記可以去除嗎?

不行,expunged 作爲一個 過渡狀態標識 是必要的。

  1. 如果沒有 expunged, 刪除元素時就沒有辦法實現 軟刪除,這就導致必須對 read map 加鎖,性能直接退化到了 普通 map + 互斥鎖 的方案

  2. 實現不了 軟刪除, 只能將元素設置爲 nil, 這就導致即使 dirty map 提升爲 read map, 但是裏面被刪除的垃圾數據依然會保留 (這裏的保留指的是 key 相關數據,元素刪除時就被置爲 nil 了),最終導致內存佔用過多

  3. 調用 Store 方法存入新的元素時,沒有辦法區分元素是不存在還是已經被刪除 (因爲兩者都是 nil),造成 read mapdirty map 數據不同步 (read map 裏面有 dirty map 不存在的數據), 那麼在 dirty map 提升爲 read map 時,就會導致數據丟失

爲什麼需要雙重鎖檢測?

Map 操作相關方法中,可以看到下面這行代碼往往會出現兩次:

read, _ := m.read.Load().(readOnly)

這是爲了避免在加鎖期間,dirty map 提升爲 read map,所以獲取鎖操作後需要再次檢查 key 是否存在於 read map 中。

爲什麼沒有實現 Len 方法?

爲什麼適用的場景是讀多寫少?

參考 Load 方法和 Store 方法的註釋。

小結

從應用層面來說,針對讀多寫少的場景,sync.Map 提供了併發安全的 map 操作,包對外提供的方法非常簡潔,同時可以避免 普通 map + 互斥鎖 實現方案中, 獲取鎖和釋放鎖等繁瑣和容易出錯的地方,有兩個小的細節需要注意:

  1. 包對外提供的方法參數類型都是 any, 在使用具體的數據類型時,需要轉換

  2. 包沒有提供 Len 方法,如果需要獲取元素個數,需要調用 Range 方法計算

從內部底層實現來說,sync.Map 通過 entry 類型將 map 的變化和元素的變化進行隔離的同時, 還保證了底層數據的複用 (read mapdirty map 指向同一個 entry),read, dirty 2 個字段加原子操作的緊密配合,非常簡潔優雅地實現了 sync.Map 根據讀寫比例動態變化和轉換內部字段數據結構, 尤其是 readdirty 兩個字段保持部分內存結構佈局一致並且通過指針直接轉換,這些都是值得我們學習的編碼技巧。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Q1E0Bdt0GH6b4OFsgk1Fcg