Go 中線程安全 map 方案選型
概述
Go 語言標準庫中的 map 數據類型並不是線程安全的,多個 goroutine
可以併發讀取同一個 map, 但是不能併發寫入同一個 map, 否則會引發 panic。
爲了解決這個問題,實際開發中通常會使用下面的三種方案中的一個或多個:
-
通過 map 數據類型 + 鎖 (互斥鎖, 讀寫鎖)
-
標準庫內置的
sync.Map
對象 (支持併發讀寫) -
分段鎖
作爲補充,本文會順帶對比一下自旋鎖和標準庫中的互斥鎖的性能差異,對於 map 數據類型及其操作原語來說,兩者實現的功能保證是一致的, 而且自旋鎖更多的應用場景在無鎖編程,所以文章末尾的基準測試不包含自旋鎖 (當然,感興趣的讀者可以在本文基礎上進行修改,自行對比測試結果)。
💡 本文代碼較多,對測試過程不感興趣的讀者可以直接跳轉到文章末尾看結論。
讀寫鎖和互斥鎖
標準庫中的讀寫鎖和互斥鎖的性能差異及使用場景,在之前的 這篇文章中 [1] 已經有基礎的說明,本文不再贅述。
自旋鎖
基礎概念在 死鎖、活鎖、飢餓、自旋鎖 一文中 [2] 已經介紹過,這裏不再贅述,其中自旋鎖操作獲取鎖的核心代碼如下:
// 獲取自旋鎖
func (sl *spinLock) lock() {
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
// 獲取到自旋鎖
}
}
上面的代碼直觀上很符合自旋鎖的語義,只要沒有獲取到鎖,就一直空轉 CPU 嘗試獲取鎖,但是這會帶來一個問題: CPU 空轉帶來了很大的資源浪費, 是否可以降低甚至避免這種資源浪費嗎?
一個顯而易見的方法是在每兩次獲取鎖的操作之間休眠一下,但是這樣做會帶來兩個新的問題:
-
延遲增加,本來也許下次獲取鎖就可以成功,但是現在必須等休眠結束才能繼續獲取鎖
-
引起上下文切換,因爲當前 goroutine 休眠,根據 GMP 調取器的管理規則,處理器 P 會切換到其他可以運行的 goroutine, 如果當前 P 的 goroutine 隊列已經是空的, 那麼會給當前 M 關聯一個新的處理器,不管是哪種情況發生,都會引起上下文切換
優化版
看起來似乎沒有完美的解決方案,筆者前兩兩天在閱讀 ants[3] 的源代碼時,看到作者是這樣處理自旋鎖的:
const maxBackoff = 16
func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
if backoff < maxBackoff {
backoff <<= 1
}
}
}
作者借鑑了 TCP 流量控制中的指數退避理念,每兩次獲取鎖的間隔時間呈指數級別增長,並且在間隔時間內執行 N 次 GMP 調取,當然這是根據該組件的場景特性決定的 (goroutine pool), 在實際項目中實現和使用自旋鎖時,也可以根據具體的業務場景來自定義間隔時間內的操作,比如可以執行一個 CPU 密集型的任務,最終的目的只有一個: 儘可能榨乾 CPU 資源。
基準測試
首先來對標準庫中的互斥鎖、普通自旋鎖、優化版自旋鎖做一個簡單的基準測試。
// 普通自旋鎖實現 --------------------------------------------
type originSpinLock uint32
func (sl *originSpinLock) Lock() {
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
runtime.Gosched()
}
}
func (sl *originSpinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
func NewOriginSpinLock() sync.Locker {
return new(originSpinLock)
}
// 優化自旋鎖實現 --------------------------------------------
type spinLock uint32
const maxBackoff = 16
func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
if backoff < maxBackoff {
backoff <<= 1
}
}
}
func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}
func NewSpinLock() sync.Locker {
return new(spinLock)
}
// 標準庫的互斥鎖
func BenchmarkMutex(b *testing.B) {
m := sync.Mutex{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m.Lock()
m.Unlock()
}
})
}
// 普通自旋鎖
func BenchmarkSpinLock(b *testing.B) {
spin := NewOriginSpinLock()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
spin.Lock()
spin.Unlock()
}
})
}
// 優化版自旋鎖
func BenchmarkBackOffSpinLock(b *testing.B) {
spin := NewSpinLock()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
spin.Lock()
spin.Unlock()
}
})
}
從測試結果可以看到,優化後的自旋鎖相比普通自旋鎖和互斥鎖,性能有了很大的提高。
// goos: linux
// goarch: amd64
// cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
// BenchmarkMutex
// BenchmarkMutex-8 21886387 55.83 ns/op
// BenchmarkSpinLock
// BenchmarkSpinLock-8 46848830 25.81 ns/op
// BenchmarkBackOffSpinLock
// BenchmarkBackOffSpinLock-8 55894545 21.16 ns/op
分段鎖
分段鎖 (Segmented Locking) 是一種併發控制的技術,它將共享資源劃分成多個不重疊的片段,並對每個片段進行獨立的加鎖,通過這種方法可以減小鎖的粒度,提高系統的併發性能。
使用分段鎖時,每個線程只需要獲取 key 所在的範圍片段的鎖,而不必像互斥鎖那樣鎖住並獨佔整個共享資源,有效避免了多個線程因爲競爭全局鎖而導致的等待和延遲,提高系統的併發性能。
雖然分段鎖可以提高系統的併發性能,但同時也會增加鎖衝突的概率,並且需要付出額外的開銷來維護鎖的狀態 (互斥鎖只需要一把全局鎖即可,分段鎖每個區間範圍都需要一把鎖)。
讀寫鎖和分段鎖差異
代碼實現
筆者在項目中用到的分段鎖組件是開源的 concurrent-map[4], 下面就以該組件的源代碼爲基礎,來分析如何實現一個分段鎖,本文選用實現了泛型的 v2 版本。
Map 對象
ConcurrentMap
對象表示實現了分段鎖的 Map 對象,內部有兩個字段:
-
表示區域元素對象的 shares 字段
-
表示哈希函數的 sharding 字段
type ConcurrentMap[K comparable, V any] struct {
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
}
GetShard
方法用於計算給定的參數 key 對應的區間元素集合對象並返回。
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
// 優化版: m % n = m & ( n - 1 )
return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}
區間元素集合對象
ConcurrentMapShared
對象表示 Map 中某個區間元素集合對象,內部有兩個字段:
-
用於存儲具體元素的 map, 數據結構就是標準庫中的 map 類型
-
內嵌一個讀寫鎖,用於管理對 map 結構的讀寫併發控制
type ConcurrentMapShared[K comparable, V any] struct {
items map[K]V
sync.RWMutex
}
操作原語
下面來看一下常用的幾個操作原語的代碼實現。
1. SET
方法的內部執行分爲 4 步:
-
通過 key 獲取區間元素集合對象
-
獲取寫鎖
-
寫入 key 對應的數據
-
釋放寫鎖
func (m ConcurrentMap[K, V]) Set(key K, value V) {
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
2. GET
方法的內部執行分爲 4 步:
-
通過 key 獲取區間元素集合對象
-
獲取讀鎖
-
寫入 key 對應的數據
-
釋放讀鎖
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
shard := m.GetShard(key)
shard.RLock()
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
3. Has
出了返回值之外,內部實現和 GET
方法實現一致,這裏不在贅述。
func (m ConcurrentMap[K, V]) Has(key K) bool {
shard := m.GetShard(key)
shard.RLock()
_, ok := shard.items[key]
shard.RUnlock()
return ok
}
4. Remove
方法的內部執行分爲 4 步:
-
通過 key 獲取區間元素集合對象
-
獲取寫鎖
-
寫入 key 對應的數據
-
釋放寫鎖
func (m ConcurrentMap[K, V]) Remove(key K) {
shard := m.GetShard(key)
shard.Lock()
delete(shard.items, key)
shard.Unlock()
}
哈希算法
FNV32 是一種快速哈希函數,採用 32 位哈希值,算法實現非常簡單並且具有很高的性能和較低的哈希碰撞率。
值得注意的是,concurrent-map
並沒有使用標準庫的 "hash/fnv"
方法作爲求內部哈希函數實現,而是在組件內部重新實現了一個 fnv32
函數, 但是算法用到的算子常數 FNV_PRIME
和 FNV_OFFSET_BASIS
, concurrent-map
和標準庫是保持一致的,感興趣的讀者可以對比一下實現差異。
func strfnv32[K fmt.Stringer](key K "K fmt.Stringer") uint32 {
return fnv32(key.String())
}
func fnv32(key string) uint32 {
...
}
除此之外,也可以通過 NewWithCustomShardingFunction
函數在創建 Map 時來指定哈希函數:
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K "K comparable, V any") uint32) ConcurrentMap[K, V] {
return create[K, V](sharding "K, V")
}
sync.Map
標準庫中的 sync.Map
的底層實現和應用場景在之前的 這篇文章中 [5] 已經有基礎的說明,本文不再贅述。
基準測試
最後,我們對文章開頭提到的三種方案進行基準測試,根據測試結果來總結不同方案的各自適用場景。
下面的測試代碼分別對 讀多寫少、讀少寫多、讀寫均等這三種常見的負載場景,進行了性能基準測試:
package maps
import (
"strconv"
"sync"
"testing"
cmap "github.com/orcaman/concurrent-map/v2"
)
// 線程安全 Map 接口
type ThreadSafeMap interface {
Get(key string) any
Set(key string, val any)
}
// -------------------------------------------------------------------
// map 數據類型 + 讀寫鎖實現線程安全的 map
type MutexMap struct {
sync.RWMutex
m map[string]any
}
func (m *MutexMap) Get(key string) any {
m.RLock()
v, ok := m.m[key]
m.RUnlock()
if ok {
return v
}
return nil
}
func (m *MutexMap) Set(key string, val any) {
m.Lock()
m.m[key] = val
m.Unlock()
}
// -------------------------------------------------------------------
// sync.Map 實現線程安全的 map
type SyncMap struct {
m sync.Map
}
func (s *SyncMap) Get(key string) any {
v, _ := s.m.Load(key)
return v
}
func (s *SyncMap) Set(key string, val any) {
s.m.Store(key, val)
}
// -------------------------------------------------------------------
// 分段鎖實現線程安全的 map
type ConcurMap struct {
m cmap.ConcurrentMap[string, any]
}
func (c *ConcurMap) Get(key string) any {
v, _ := c.m.Get(key)
return v
}
func (c *ConcurMap) Set(key string, val any) {
c.m.Set(key, val)
}
// 基準測試
func benchmark(b *testing.B, m ThreadSafeMap, read, write int) {
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
// 注意: 這裏的讀寫操作有一部分 key 是重合的
// 讀操作
for k := 0; k < read*100; k++ {
wg.Add(1)
go func(key int) {
m.Get(strconv.Itoa(i * key))
wg.Done()
}(k)
}
// 寫操作
for k := 0; k < write*100; k++ {
wg.Add(1)
go func(key int) {
m.Set(strconv.Itoa(i*key), key)
wg.Done()
}(k)
}
wg.Wait()
}
}
// 讀寫比例 9:1
func BenchmarkReadMoreRWMutex(b *testing.B) { benchmark(b, &MutexMap{m: make(map[string]any)}, 9, 1) }
func BenchmarkReadMoreSyncMap(b *testing.B) { benchmark(b, &SyncMap{m: sync.Map{}}, 9, 1) }
func BenchmarkReadMoreConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]( "any")}, 9, 1) }
// 讀寫比例 1:9
func BenchmarkWriteMoreRWMutex(b *testing.B) { benchmark(b, &MutexMap{m: make(map[string]any)}, 1, 9) }
func BenchmarkWriteMoreSyncMap(b *testing.B) { benchmark(b, &SyncMap{m: sync.Map{}}, 1, 9) }
func BenchmarkWriteMoreConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]( "any")}, 1, 9) }
// 讀寫比例 5:5
func BenchmarkEqualRWMutex(b *testing.B) { benchmark(b, &MutexMap{m: make(map[string]any)}, 5, 5) }
func BenchmarkEqualSyncMap(b *testing.B) { benchmark(b, &SyncMap{m: sync.Map{}}, 5, 5) }
func BenchmarkEqualConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]( "any")}, 5, 5) }
運行基準測試:
$ go test -count=1 -run='^$' -bench=. -benchtime=3s -benchmem
輸出結果如下:
goos: linux
goarch: amd64
cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
BenchmarkReadMoreRWMutex-8 9107 362827 ns/op 84094 B/op 3001 allocs/op
BenchmarkReadMoreSyncMap-8 7740 765258 ns/op 128974 B/op 3284 allocs/op
BenchmarkReadMoreConcurMap-8 10000 345271 ns/op 83985 B/op 3000 allocs/op
BenchmarkWriteMoreRWMutex-8 6212 825778 ns/op 137330 B/op 3656 allocs/op
BenchmarkWriteMoreSyncMap-8 3352 1155236 ns/op 157766 B/op 6041 allocs/op
BenchmarkWriteMoreConcurMap-8 9480 370214 ns/op 119970 B/op 3653 allocs/op
BenchmarkEqualRWMutex-8 7108 529450 ns/op 104626 B/op 3249 allocs/op
BenchmarkEqualSyncMap-8 5360 735393 ns/op 133008 B/op 4601 allocs/op
BenchmarkEqualConcurMap-8 9548 347809 ns/op 104303 B/op 3250 allocs/op
PASS
從基準測試的輸出結果來看,不論是哪種應用場景,結合運行速度還是內存分配,三者的排序都是一致的: 分段鎖優於讀寫鎖 + map, 後者優於 sync.Map 。 筆者沒有遇到過 100% 的只讀或只寫操作的應用場景,所以沒有做對應的基準測試,不過這裏可以猜測一下:
-
100% 只讀場景下 sync.Map 的性能最好 (不過既然都 100% 只讀了, 直接使用 map 類型即可,因爲無需加鎖,所以性能肯定最高)
-
100% 只寫場景下 分段鎖 的性能最好
感興趣的讀者可以通過調整基準測試代碼的百分比參數來驗證一下,這裏是 concurrent-map 官方的基準測試代碼 [6]。
小結
本文主要介紹了在 Go 語言中如何實現線程安全的 map 的三種方法,並通過三種常見的業務場景對方法進行了性能基準測試,最後,我們來簡單總結下三種方法的特點。
Reference
- concurrent-map[7]
擴展閱讀
-
Go 無鎖編程 [8]
-
Map 實現中的一些優化 [9]
-
設計實現高性能本地內存緩存 [10]
鏈接
[1] 這篇文章中: https://golang.dbwu.tech/performance/mutex/
[2] 死鎖、活鎖、飢餓、自旋鎖 一文中: https://dbwu.tech/posts/os_lock/
[3] ants: https://github.com/panjf2000/ants
[4] concurrent-map: https://github.com/orcaman/concurrent-map
[5] 這篇文章中: https://dbwu.tech/golang_sync_map/
[6] concurrent-map 官方的基準測試代碼: https://github.com/orcaman/concurrent-map/blob/master/concurrent_map_bench_test.go
[7] concurrent-map: https://github.com/orcaman/concurrent-map
[8] Go 無鎖編程: https://dbwu.tech/posts/golang_lockfree/
[9] Map 實現中的一些優化: https://halfrost.com/go_map_chapter_one/#toc-22
[10] 設計實現高性能本地內存緩存: https://blog.joway.io/posts/modern-memory-cache/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/kZH8wV2hy5ez0pJQAcxVUQ