Go 中線程安全 map 方案選型

概述

Go 語言標準庫中的 map 數據類型並不是線程安全的,多個 goroutine 可以併發讀取同一個 map, 但是不能併發寫入同一個 map, 否則會引發 panic。

爲了解決這個問題,實際開發中通常會使用下面的三種方案中的一個或多個:

  1. 通過 map 數據類型 + 鎖 (互斥鎖, 讀寫鎖)

  2. 標準庫內置的 sync.Map 對象 (支持併發讀寫)

  3. 分段鎖

作爲補充,本文會順帶對比一下自旋鎖和標準庫中的互斥鎖的性能差異,對於 map 數據類型及其操作原語來說,兩者實現的功能保證是一致的, 而且自旋鎖更多的應用場景在無鎖編程,所以文章末尾的基準測試不包含自旋鎖 (當然,感興趣的讀者可以在本文基礎上進行修改,自行對比測試結果)。

💡 本文代碼較多,對測試過程不感興趣的讀者可以直接跳轉到文章末尾看結論。

讀寫鎖和互斥鎖

標準庫中的讀寫鎖和互斥鎖的性能差異及使用場景,在之前的 這篇文章中 [1] 已經有基礎的說明,本文不再贅述。

自旋鎖

基礎概念在 死鎖、活鎖、飢餓、自旋鎖 一文中 [2] 已經介紹過,這裏不再贅述,其中自旋鎖操作獲取鎖的核心代碼如下:

// 獲取自旋鎖
func (sl *spinLock) lock() {
 for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
  // 獲取到自旋鎖
 }
}

上面的代碼直觀上很符合自旋鎖的語義,只要沒有獲取到鎖,就一直空轉 CPU 嘗試獲取鎖,但是這會帶來一個問題: CPU 空轉帶來了很大的資源浪費, 是否可以降低甚至避免這種資源浪費嗎?

一個顯而易見的方法是在每兩次獲取鎖的操作之間休眠一下,但是這樣做會帶來兩個新的問題:

  1. 延遲增加,本來也許下次獲取鎖就可以成功,但是現在必須等休眠結束才能繼續獲取鎖

  2. 引起上下文切換,因爲當前 goroutine 休眠,根據 GMP 調取器的管理規則,處理器 P 會切換到其他可以運行的 goroutine, 如果當前 P 的 goroutine 隊列已經是空的, 那麼會給當前 M 關聯一個新的處理器,不管是哪種情況發生,都會引起上下文切換

優化版

看起來似乎沒有完美的解決方案,筆者前兩兩天在閱讀 ants[3] 的源代碼時,看到作者是這樣處理自旋鎖的:

const maxBackoff = 16

func (sl *spinLock) Lock() {
 backoff := 1
 for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
  for i := 0; i < backoff; i++ {
   runtime.Gosched()
  }

  if backoff < maxBackoff {
   backoff <<= 1
  }
 }
}

作者借鑑了 TCP 流量控制中的指數退避理念,每兩次獲取鎖的間隔時間呈指數級別增長,並且在間隔時間內執行 N 次 GMP 調取,當然這是根據該組件的場景特性決定的 (goroutine pool), 在實際項目中實現和使用自旋鎖時,也可以根據具體的業務場景來自定義間隔時間內的操作,比如可以執行一個 CPU 密集型的任務,最終的目的只有一個: 儘可能榨乾 CPU 資源

基準測試

首先來對標準庫中的互斥鎖、普通自旋鎖、優化版自旋鎖做一個簡單的基準測試。

// 普通自旋鎖實現 --------------------------------------------
type originSpinLock uint32

func (sl *originSpinLock) Lock() {
 for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
  runtime.Gosched()
 }
}

func (sl *originSpinLock) Unlock() {
 atomic.StoreUint32((*uint32)(sl), 0)
}

func NewOriginSpinLock() sync.Locker {
 return new(originSpinLock)
}

// 優化自旋鎖實現 --------------------------------------------
type spinLock uint32

const maxBackoff = 16

func (sl *spinLock) Lock() {
 backoff := 1
 for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
  for i := 0; i < backoff; i++ {
   runtime.Gosched()
  }
  if backoff < maxBackoff {
   backoff <<= 1
  }
 }
}

func (sl *spinLock) Unlock() {
 atomic.StoreUint32((*uint32)(sl), 0)
}

func NewSpinLock() sync.Locker {
 return new(spinLock)
}

// 標準庫的互斥鎖
func BenchmarkMutex(b *testing.B) {
 m := sync.Mutex{}
 b.RunParallel(func(pb *testing.PB) {
  for pb.Next() {
   m.Lock()
   m.Unlock()
  }
 })
}

// 普通自旋鎖
func BenchmarkSpinLock(b *testing.B) {
 spin := NewOriginSpinLock()
 b.RunParallel(func(pb *testing.PB) {
  for pb.Next() {
   spin.Lock()
   spin.Unlock()
  }
 })
}

// 優化版自旋鎖
func BenchmarkBackOffSpinLock(b *testing.B) {
 spin := NewSpinLock()
 b.RunParallel(func(pb *testing.PB) {
  for pb.Next() {
   spin.Lock()
   spin.Unlock()
  }
 })
}

從測試結果可以看到,優化後的自旋鎖相比普通自旋鎖和互斥鎖,性能有了很大的提高。

// goos: linux
// goarch: amd64
// cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
// BenchmarkMutex
// BenchmarkMutex-8                21886387                55.83 ns/op
// BenchmarkSpinLock
// BenchmarkSpinLock-8             46848830                25.81 ns/op
// BenchmarkBackOffSpinLock
// BenchmarkBackOffSpinLock-8      55894545                21.16 ns/op

分段鎖

分段鎖 (Segmented Locking) 是一種併發控制的技術,它將共享資源劃分成多個不重疊的片段,並對每個片段進行獨立的加鎖,通過這種方法可以減小鎖的粒度,提高系統的併發性能。

使用分段鎖時,每個線程只需要獲取 key 所在的範圍片段的鎖,而不必像互斥鎖那樣鎖住並獨佔整個共享資源,有效避免了多個線程因爲競爭全局鎖而導致的等待和延遲,提高系統的併發性能。

雖然分段鎖可以提高系統的併發性能,但同時也會增加鎖衝突的概率,並且需要付出額外的開銷來維護鎖的狀態 (互斥鎖只需要一把全局鎖即可,分段鎖每個區間範圍都需要一把鎖)。

讀寫鎖和分段鎖差異

代碼實現

筆者在項目中用到的分段鎖組件是開源的 concurrent-map[4], 下面就以該組件的源代碼爲基礎,來分析如何實現一個分段鎖,本文選用實現了泛型的 v2 版本。

Map 對象

ConcurrentMap 對象表示實現了分段鎖的 Map 對象,內部有兩個字段:

  1. 表示區域元素對象的 shares 字段

  2. 表示哈希函數的 sharding 字段

type ConcurrentMap[K comparable, V any] struct {
 shards   []*ConcurrentMapShared[K, V]
 sharding func(key K) uint32
}

GetShard 方法用於計算給定的參數 key 對應的區間元素集合對象並返回。

func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
 // 優化版: m % n = m & ( n - 1 )
    return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}

區間元素集合對象

ConcurrentMapShared 對象表示 Map 中某個區間元素集合對象,內部有兩個字段:

  1. 用於存儲具體元素的 map, 數據結構就是標準庫中的 map 類型

  2. 內嵌一個讀寫鎖,用於管理對 map 結構的讀寫併發控制

type ConcurrentMapShared[K comparable, V any] struct {
 items        map[K]V
 sync.RWMutex
}

操作原語

下面來看一下常用的幾個操作原語的代碼實現。

1. SET

方法的內部執行分爲 4 步:

  1. 通過 key 獲取區間元素集合對象

  2. 獲取寫鎖

  3. 寫入 key 對應的數據

  4. 釋放寫鎖

func (m ConcurrentMap[K, V]) Set(key K, value V) {
 shard := m.GetShard(key)
 shard.Lock()
 shard.items[key] = value
 shard.Unlock()
}

2. GET

方法的內部執行分爲 4 步:

  1. 通過 key 獲取區間元素集合對象

  2. 獲取讀鎖

  3. 寫入 key 對應的數據

  4. 釋放讀鎖

func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
 shard := m.GetShard(key)
 shard.RLock()
 val, ok := shard.items[key]
 shard.RUnlock()
 return val, ok
}

3. Has

出了返回值之外,內部實現和 GET 方法實現一致,這裏不在贅述。

func (m ConcurrentMap[K, V]) Has(key K) bool {
 shard := m.GetShard(key)
 shard.RLock()
 _, ok := shard.items[key]
 shard.RUnlock()
 return ok
}

4. Remove

方法的內部執行分爲 4 步:

  1. 通過 key 獲取區間元素集合對象

  2. 獲取寫鎖

  3. 寫入 key 對應的數據

  4. 釋放寫鎖

func (m ConcurrentMap[K, V]) Remove(key K) {
 shard := m.GetShard(key)
 shard.Lock()
 delete(shard.items, key)
 shard.Unlock()
}

哈希算法

FNV32 是一種快速哈希函數,採用 32 位哈希值,算法實現非常簡單並且具有很高的性能和較低的哈希碰撞率。

值得注意的是,concurrent-map 並沒有使用標準庫的 "hash/fnv" 方法作爲求內部哈希函數實現,而是在組件內部重新實現了一個 fnv32 函數, 但是算法用到的算子常數 FNV_PRIMEFNV_OFFSET_BASIS, concurrent-map 和標準庫是保持一致的,感興趣的讀者可以對比一下實現差異。

func strfnv32[K fmt.Stringer](key K "K fmt.Stringer") uint32 {
    return fnv32(key.String())
}

func fnv32(key string) uint32 {
    ...
}

除此之外,也可以通過 NewWithCustomShardingFunction 函數在創建 Map 時來指定哈希函數:

func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K "K comparable, V any") uint32) ConcurrentMap[K, V] {
 return create[K, V](sharding "K, V")
}

sync.Map

標準庫中的 sync.Map 的底層實現和應用場景在之前的 這篇文章中 [5] 已經有基礎的說明,本文不再贅述。

基準測試

最後,我們對文章開頭提到的三種方案進行基準測試,根據測試結果來總結不同方案的各自適用場景。

下面的測試代碼分別對 讀多寫少、讀少寫多、讀寫均等這三種常見的負載場景,進行了性能基準測試:

package maps

import (
 "strconv"
 "sync"
 "testing"

 cmap "github.com/orcaman/concurrent-map/v2"
)

// 線程安全 Map 接口
type ThreadSafeMap interface {
 Get(key string) any
 Set(key string, val any)
}

// -------------------------------------------------------------------
// map 數據類型 + 讀寫鎖實現線程安全的 map
type MutexMap struct {
 sync.RWMutex
 m map[string]any
}

func (m *MutexMap) Get(key string) any {
 m.RLock()
 v, ok := m.m[key]
 m.RUnlock()
 if ok {
  return v
 }
 return nil
}

func (m *MutexMap) Set(key string, val any) {
 m.Lock()
 m.m[key] = val
 m.Unlock()
}

// -------------------------------------------------------------------
// sync.Map 實現線程安全的 map
type SyncMap struct {
 m sync.Map
}

func (s *SyncMap) Get(key string) any {
 v, _ := s.m.Load(key)
 return v
}

func (s *SyncMap) Set(key string, val any) {
 s.m.Store(key, val)
}

// -------------------------------------------------------------------
// 分段鎖實現線程安全的 map
type ConcurMap struct {
 m cmap.ConcurrentMap[string, any]
}

func (c *ConcurMap) Get(key string) any {
 v, _ := c.m.Get(key)
 return v
}

func (c *ConcurMap) Set(key string, val any) {
 c.m.Set(key, val)
}

// 基準測試
func benchmark(b *testing.B, m ThreadSafeMap, read, write int) {
 for i := 0; i < b.N; i++ {
  var wg sync.WaitGroup

  // 注意: 這裏的讀寫操作有一部分 key 是重合的

  // 讀操作
  for k := 0; k < read*100; k++ {
   wg.Add(1)
   go func(key int) {
    m.Get(strconv.Itoa(i * key))
    wg.Done()
   }(k)
  }

  // 寫操作
  for k := 0; k < write*100; k++ {
   wg.Add(1)
   go func(key int) {
    m.Set(strconv.Itoa(i*key), key)
    wg.Done()
   }(k)
  }

  wg.Wait()
 }
}

// 讀寫比例 9:1
func BenchmarkReadMoreRWMutex(b *testing.B)   { benchmark(b, &MutexMap{m: make(map[string]any)}, 9, 1) }
func BenchmarkReadMoreSyncMap(b *testing.B)   { benchmark(b, &SyncMap{m: sync.Map{}}, 9, 1) }
func BenchmarkReadMoreConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]( "any")}, 9, 1) }

// 讀寫比例 1:9
func BenchmarkWriteMoreRWMutex(b *testing.B)   { benchmark(b, &MutexMap{m: make(map[string]any)}, 1, 9) }
func BenchmarkWriteMoreSyncMap(b *testing.B)   { benchmark(b, &SyncMap{m: sync.Map{}}, 1, 9) }
func BenchmarkWriteMoreConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]( "any")}, 1, 9) }

// 讀寫比例 5:5
func BenchmarkEqualRWMutex(b *testing.B)   { benchmark(b, &MutexMap{m: make(map[string]any)}, 5, 5) }
func BenchmarkEqualSyncMap(b *testing.B)   { benchmark(b, &SyncMap{m: sync.Map{}}, 5, 5) }
func BenchmarkEqualConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]( "any")}, 5, 5) }

運行基準測試:

$ go test -count=1 -run='^$' -bench=. -benchtime=3s  -benchmem

輸出結果如下:

goos: linux
goarch: amd64
cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
BenchmarkReadMoreRWMutex-8                  9107            362827 ns/op           84094 B/op       3001 allocs/op
BenchmarkReadMoreSyncMap-8                  7740            765258 ns/op          128974 B/op       3284 allocs/op
BenchmarkReadMoreConcurMap-8               10000            345271 ns/op           83985 B/op       3000 allocs/op
BenchmarkWriteMoreRWMutex-8                 6212            825778 ns/op          137330 B/op       3656 allocs/op
BenchmarkWriteMoreSyncMap-8                 3352           1155236 ns/op          157766 B/op       6041 allocs/op
BenchmarkWriteMoreConcurMap-8               9480            370214 ns/op          119970 B/op       3653 allocs/op
BenchmarkEqualRWMutex-8                     7108            529450 ns/op          104626 B/op       3249 allocs/op
BenchmarkEqualSyncMap-8                     5360            735393 ns/op          133008 B/op       4601 allocs/op
BenchmarkEqualConcurMap-8                   9548            347809 ns/op          104303 B/op       3250 allocs/op
PASS

從基準測試的輸出結果來看,不論是哪種應用場景,結合運行速度還是內存分配,三者的排序都是一致的: 分段鎖優於讀寫鎖 + map, 後者優於 sync.Map 。 筆者沒有遇到過 100% 的只讀或只寫操作的應用場景,所以沒有做對應的基準測試,不過這裏可以猜測一下:

  1. 100% 只讀場景下 sync.Map 的性能最好 (不過既然都 100% 只讀了, 直接使用 map 類型即可,因爲無需加鎖,所以性能肯定最高)

  2. 100% 只寫場景下 分段鎖 的性能最好

感興趣的讀者可以通過調整基準測試代碼的百分比參數來驗證一下,這裏是 concurrent-map 官方的基準測試代碼 [6]。

小結

本文主要介紹了在 Go 語言中如何實現線程安全的 map 的三種方法,並通過三種常見的業務場景對方法進行了性能基準測試,最後,我們來簡單總結下三種方法的特點。

TuOCHl

Reference

擴展閱讀

鏈接

[1] 這篇文章中: https://golang.dbwu.tech/performance/mutex/

[2] 死鎖、活鎖、飢餓、自旋鎖 一文中: https://dbwu.tech/posts/os_lock/

[3] ants: https://github.com/panjf2000/ants

[4] concurrent-map: https://github.com/orcaman/concurrent-map

[5] 這篇文章中: https://dbwu.tech/golang_sync_map/

[6] concurrent-map 官方的基準測試代碼: https://github.com/orcaman/concurrent-map/blob/master/concurrent_map_bench_test.go

[7] concurrent-map: https://github.com/orcaman/concurrent-map

[8] Go 無鎖編程: https://dbwu.tech/posts/golang_lockfree/

[9] Map 實現中的一些優化: https://halfrost.com/go_map_chapter_one/#toc-22

[10] 設計實現高性能本地內存緩存: https://blog.joway.io/posts/modern-memory-cache/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/kZH8wV2hy5ez0pJQAcxVUQ