sync-Map Code Review
概述
sync.Map 提供了併發安全的 map
操作,數據結構語義類似 map[any]any
,多個 goroutine
併發操作時無需加鎖。
官方的建議是大多數情況下,應該使用普通 map
類型,並完成對應的鎖和併發控制以保證安全性,這樣可以使類型擁有更好的安全性和可維護性。
sync.Map
類型是針對兩種特殊的場景進行優化的:
-
當指定的
key
只被寫入一次,但是被讀取多次,例如不斷增長的緩存類應用 -
當多個
goroutine
同時讀和寫,但是每個goroutine
覆蓋到的是不同的key
(類似分段鎖的概念)
在這兩種場景下,與使用 map + 鎖
相比,使用 sync.Map
可以大大降低鎖的競爭。
本文來探究一下 sync.Map
的內部實現,文件路徑爲 $GOROOT/src/sync/map.go
,筆者的 Go 版本爲 go1.19 linux/amd64
。
數據結構
Map 對象
// Map 零值是空的並且可以使用
// Map 一旦使用後,便不能再複製
// Loads, stores, and deletes 通過延遲操作來均攤成本,從而達到常數運行時間
// 在 Go 內存模型術語中,Map 確保寫操作在任何觀察其變動的讀操作之前同步,其中讀和寫的定義如下
// Load, LoadAndDelete 是讀操作
// Delete, LoadAndDelete, Store 是寫操作
// 當調用 LoadOrStore 方法返回值 loaded 爲 false 時,LoadOrStore 是寫操作,否則是讀操作
type Map struct {
// 當寫 read map 或者讀寫 dirty map 時,需要加互斥鎖
mu Mutex
// read 字段包含了一部分 map 內容,這部分內容併發訪問安全(無論是否持有互斥鎖)
// read 字段本身對 load 而言永遠是安全的,但執行 store 操作時,必須持有互斥鎖
// read 是隻讀的數據結構,訪問無需加鎖,sync.Map 優先訪問 read 字段
// read 中存儲的數據值可以在不持有互斥鎖的情況下併發更新
// 但更新一個之前被標記爲 expunged 的元素需要持有互斥鎖,將該元素拷貝到 dirty map 並標記 unexpunged (expunged 逆操作)
read atomic.Value // readOnly
// dirty map 包含了訪問時需要持有互斥鎖的元素
// 爲了確保 dirty map 中的元素能夠快速地移動到 read map
// 它也包含了那些 read map 中標記爲 (non-expunged) 的元素
// 新添加的元素會優先放入 dirty map
// 被標記爲 expunged 的元素不會存儲在 dirty map 中
// 被標記爲 expunged 的元素如果要存儲新的值,需要先執行 unexpunged (expunged 逆操作) 添加到 dirty map, 然後再更新值
// 如果 dirty map == nil, 下一個對 map 的寫入將通過淺拷貝一個空 map 來初始化它,忽略過期的元素
dirty map[any]*entry
// 一旦發生足夠多的 misses 次數,超過拷貝 dirty map 的成本,dirty map 會被合併進 read map(unamended 狀態下)
// 並且下一次的存儲操作會生成一個新的 dirty map
misses int
}
readOnly 對象
readOnly
對象表示 Map.read 字段中的只讀數據 (注意: 這裏的只讀指的是邏輯只讀,底層的數據還是可以修改的), 其中 m 字段用來表示具體的只讀數據,如果某個 key 存在於 dirty 字段中,卻不存在於 m 字段中,amended 字段等於 true。
type readOnly struct {
m map[any]*entry
amended bool
}
expunged 類型
expunged
類型表示一個任意類型數據的指針,主要作用是作爲刪除標誌,標記從 dirty map 中刪除的元素。
var expunged = unsafe.Pointer(new(any))
entry 對象
entry
對象表示任意 key
對應的數據,這裏同樣使用指針來表示。
type entry struct {
// p 指向 entry 對應的數據
// 如果 p == nil, 說明對應的 entry 已經被刪除
// 此時,m.dirty == nil 或 m.dirty[k] 指向該 entry
// 如果 p == expunged, 說明對應的 entry 已經被刪除
// 此時,m.dirty != nil 且 m.dirty 中不存在 entry
// 其他情況代表 entry 是合法的值並且存在於 m.read.m[key]
// 如果 m.dirty != nil, entry 同時也會存在於 m.dirty[key] 中
// 刪除 entry 時執行 CAS 操作替換爲 nil (並不進行實際刪除)
// 當 m.dirty 後續創建時 (dirty 提升後第一次新建 entry)
// 會對 entry 執行 CAS 操作,由 nil 替換爲 expunged, 且不設置 m.dirty[key] 的值
// 一個 entry 對應的值可以用 CAS 操作來更新,前提是 p != expunged
// 如果 p == expunged, entry 對應的值只能在首次賦值 m.dirty[key] = e 之後更新
// 這樣查找操作可以通過 dirty map 來找到這個 entry
p unsafe.Pointer // *interface{}
}
func newEntry(i any) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
// 從 entry 中原子性加載實際的數據值
func (e *entry) load() (value any, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*any)(p), true
}
獲取操作
Load 方法
Load
方法會優先讀取 read map
(無需加鎖),如果沒有找到對應的元素,會加鎖嘗試從 dirty map
中讀取。
// 獲取 map 中 key 對應的值,如果不存在返回 nil, 第二個返回值表示 key 值是否存在於 map 中
func (m *Map) Load(key any) (value any, ok bool) {
// 優先從 read map 中讀取 (無鎖操作)
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果 read 沒有對應的 key,並且 amended 字段標識 dirty map 存在 read map 中不存在的 key
// 則加鎖嘗試從 dirty.map 中獲取
if !ok && read.amended {
m.mu.Lock()
// double check 是避免在加鎖期間,dirty map 提升爲 read map
// 如果已經發生合併,可以避免報告一個不必要的 miss
// 因爲足夠多的 miss 數會將 dirty map 提升爲 read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 無論元素是否存在 dirty map 中,都記錄 1 次 miss
// 在 dirty map 被提升到 read map 之前,這個 key 對應的值會一直從 dirty map 中獲取
// 方法內部可能將 dirty map 提升到 read map
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// 如果 key 存在,加載對應的值
return e.load()
}
設置操作
Store 方法
Store
方法用於設置 key 和對應的值。
func (m *Map) Store(key, value any) {
// 如果 key 對應的值存在於 read map 中,嘗試直接修改
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
// double check 是避免在加鎖期間,dirty map 提升爲 read map
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果 key 對應的值存在於 read map 中,但 p == expunged, 說明 dirty != nil 並且 key 對應的值不存在於 dirty map 中
// 先將 p 的狀態由 expunged 改爲 nil
// dirty map 新建 key
// 更新 entry.p = value (read map 和 dirty map 指向同一個entry)
m.dirty[key] = e
}
// 如果 key 對應的值存在於 read map 中,但 p != expunged, 直接更新 entry
// 此時 m.dirty == nil 或 m.dirty[key] == e
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果 key 對應的值不存在於 read map 中,但存在於 dirty map 中,直接寫入更新 entry (read map 中依然不存在)
e.storeLocked(&value)
} else {
if !read.amended {
// 如果 read map 和 dirty map 都不存在該值
// 如果 dirty map == nil, 創建 dirty map, 並從 read map 中拷貝未刪除的元素
// 更新 amended 字段,標識 dirty map 中存在 read map 中不存在的 key
// 將 K => V 寫入 dirty map, read map 不變
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
// Tips: hot path 不使用 defer 釋放鎖
m.mu.Unlock()
}
tryStore 方法
tryStore
方法嘗試直接更新 entry, 如果 entry == expunged, 返回 false。
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
unexpungeLocked 方法
unexpungeLocked
方法用於取消 entry 的 expunged 標記。
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
storeLocked 方法
storeLocked
方法用於直接將值存入 entry。
func (e *entry) storeLocked(i *any) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
LoadOrStore 方法
LoadOrStore
方法用於獲取 key 對應的值,如果 key 對應的值存在,直接返回 (此時第二個返回值爲 true),否則就將 key 設置爲參數值然後返回 (此時第二個返回值爲 false)。
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// 如果命中就直接返回,避免鎖操作
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
// double check 是避免在加鎖期間,dirty map 提升爲 read map
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果 key 對應的值存在於 read map 中,但 p == expunged, 說明 dirty != nil 並且 key 對應的值不存在於 dirty map 中
// 先將 p 的狀態由 expunged 改爲 nil
// dirty map 新建 key
// 更新 entry.p = value (read map 和 dirty map 指向同一個entry)
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
// 如果 key 對應的值不存在於 read map 中,但存在於 dirty map 中,直接寫入更新 entry (read map 中依然不存在)
actual, loaded, _ = e.tryLoadOrStore(value)
// 記錄 1 次 miss
// 方法內部可能將 dirty map 提升到 read map
m.missLocked()
} else {
if !read.amended {
// 如果 read map 和 dirty map 都不存在該值
// 如果 dirty map == nil, 創建 dirty map, 並從 read map 中拷貝未刪除的元素
// 更新 amended 字段,標識 dirty map 中存在 read map 中不存在的 key
// 將 K => V 寫入 dirty map, read map 不變
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
tryLoadOrStore 方法
tryLoadOrStore
方法用於原子性地加載或存儲一個 entry 對應的值,前提是 entry 未被標記爲 expunged, 如果 entry 被標記爲 expunged, 函數不做任何修改,第二個返回值爲 false。
func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
}
}
刪除操作
Delete 方法
// 刪除 key 對應的值
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}
LoadAndDelete 方法
LoadAndDelete
方法是 Delete
方法的具體實現,和 Load
方法的機制一樣,優先從 read map
中刪除 (無需加鎖),如果沒有找到對應的元素, 會加鎖嘗試從 dirty map
中刪除,第二個返回值表示 key 對應的值是否存在。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// double check 是避免在加鎖期間,dirty map 提升爲 read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 如果 key 對應的值不存在於 read map 中,但存在於 dirty map 中,直接刪除
e, ok = m.dirty[key]
delete(m.dirty, key)
// 記錄 1 次 miss
// 在 dirty map 被提升到 read map 之前,這個 key 對應的值會一直從 dirty map 中獲取
m.missLocked()
}
m.mu.Unlock()
}
if ok {
// key 對應的值存在於 read map 中,直接刪除
return e.delete()
}
return nil, false
}
delete 方法
delete
方法負責 entry
的具體刪除。
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
// 如果 p == nil, 說明對應的 entry 已經被刪除
// 如果 p == expunged, 說明對應的 entry 已經被刪除
if p == nil || p == expunged {
return nil, false
}
// 這裏是數據真正被刪除的代碼
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*any)(p), true
}
}
}
遍歷操作
Range 方法
如果全部的數據都在 read map
中,無需加鎖,直接讀取即可,否則會先加鎖,將 dirty map
中的數據全部加載到 read map
, 然後重置 dirty map
和 misses
計數器,這樣可以避免遍歷過程中多次訪問 dirty map
時導致的加鎖和性能影響。
// Range 方法按照每個 key 和 value 在 map 中出現的順序依次調用參數函數 f
// 如果函數 f 返回 false, range 停止遍歷
// Range 方法是無序遍歷,但是保證每個 key 只會被訪問一次
// 如果某個 key 對應的 value 被併發地更新或者刪除了,Range 可能返回修改前或修改後的值
// Range 方法不阻塞接收者的其他方法,甚至回調函數 f 本身也可以調用 Map 的任何方法
// 不論 Map 中有多少元素,Range 時間複雜度都是 O(N)
// 即使函數 f 在一定的調用次數之後返回 false 也一樣
func (m *Map) Range(f func(key, value any) bool) {
// 在調用 Range 時,能夠遍歷 map 裏面所有的 key
// 如果 read.amended == false
// 說明 dirty map 中沒有元素,直接遍歷 read map 就可以
read, _ := m.read.Load().(readOnly)
if read.amended {
// m.dirty 包含 read.m 中不存在的 key, 不過影響不大,畢竟 Range 時間複雜度爲 O(N)
// 假設調用方不會中斷遍歷過程,對於 Range 的調用必然會分階段完整地拷貝整個 map
// 這時候可以將 dirty map 提升到 read map 就可以提升性能
m.mu.Lock()
// double check 是避免在加鎖期間,dirty map 提升爲 read map
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
// Tips: hot path 不使用 defer 釋放鎖
m.mu.Unlock()
}
// 此時 dirty map 和 read map 已經合併
// 普通 map 遍歷
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
miss 計數
missLocked 方法
missLocked
方法用於增加 Map
操作過程中的 misses 計數,一旦發生足夠多的 misses 次數,超過拷貝 dirty map 的成本,直接將 dirty map 賦值給 read map。
func (m *Map) missLocked() {
m.misses++
// 成本計算: miss 次數小於 dirty map 長度
if m.misses < len(m.dirty) {
return
}
// 提升過程很簡單,直接將 dirty map 賦值給 read map
// 提升完成之後,amended == false, m.dirty == nil
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
數據拷貝
dirtyLocked 方法
dirtyLocked
方法用於當 dirty map 等於 nil 時,將 read map 元素拷貝到 dirty map。
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
// 1. 將所有爲 nil 的元素標記爲 expunged
// 2. 只拷貝沒有標記爲 expunged 的元素
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
tryExpungeLocked 方法
tryExpungeLocked
方法用於將等於 nil 的 entry 標記爲 expunged。
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
相關問題
expunged 的使用場景
調用 Store
, LoadOrStore
方法存入新的元素時,如果 read map
和 dirty map
中都不存在該元素,會調用 dirtyLocked
方法將 read map
中所有未標記爲 expunged
的元素複製到 dirty map
中。
expunged 標記可以去除嗎?
不行,expunged
作爲一個 過渡狀態標識
是必要的。
-
如果沒有
expunged
, 刪除元素時就沒有辦法實現軟刪除
,這就導致必須對read map
加鎖,性能直接退化到了普通 map + 互斥鎖
的方案 -
實現不了
軟刪除
, 只能將元素設置爲nil
, 這就導致即使dirty map
提升爲read map
, 但是裏面被刪除的垃圾數據依然會保留 (這裏的保留指的是key
相關數據,元素刪除時就被置爲nil
了),最終導致內存佔用過多 -
調用
Store
方法存入新的元素時,沒有辦法區分元素是不存在還是已經被刪除 (因爲兩者都是nil
),造成read map
和dirty map
數據不同步 (read map
裏面有dirty map
不存在的數據), 那麼在dirty map
提升爲read map
時,就會導致數據丟失
爲什麼需要雙重鎖檢測?
在 Map
操作相關方法中,可以看到下面這行代碼往往會出現兩次:
read, _ := m.read.Load().(readOnly)
這是爲了避免在加鎖期間,dirty map
提升爲 read map
,所以獲取鎖操作後需要再次檢查 key
是否存在於 read map
中。
爲什麼沒有實現 Len
方法?
-
實現
Len
方法需要統計readOnly
和dirty
兩個字段的數據,這樣就需要引入鎖機制,導致性能下降,還會額外增加代碼實現複雜度 -
針對
sync.Map
的併發操作會導致其數據變化很快,Len
方法難以保證時效性
爲什麼適用的場景是讀多寫少?
參考 Load
方法和 Store
方法的註釋。
小結
從應用層面來說,針對讀多寫少的場景,sync.Map
提供了併發安全的 map
操作,包對外提供的方法非常簡潔,同時可以避免 普通 map + 互斥鎖
實現方案中, 獲取鎖和釋放鎖等繁瑣和容易出錯的地方,有兩個小的細節需要注意:
-
包對外提供的方法參數類型都是
any
, 在使用具體的數據類型時,需要轉換 -
包沒有提供
Len
方法,如果需要獲取元素個數,需要調用Range
方法計算
從內部底層實現來說,sync.Map
通過 entry
類型將 map
的變化和元素的變化進行隔離的同時, 還保證了底層數據的複用 (read map
和 dirty map
指向同一個 entry
),read
, dirty
2 個字段加原子操作的緊密配合,非常簡潔優雅地實現了 sync.Map
根據讀寫比例動態變化和轉換內部字段數據結構, 尤其是 read
和 dirty
兩個字段保持部分內存結構佈局一致並且通過指針直接轉換,這些都是值得我們學習的編碼技巧。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Q1E0Bdt0GH6b4OFsgk1Fcg