map 設計與實現(上篇)- 數據結構
概述
在計算機科學中,關聯數組 (
Associative Array
),又稱映射 (Map
)、字典 (Dictionary
) 是一個抽象的數據結構,它包含着類似於(鍵,值)的有序對。
從應用的角度來看,Go 語言的 map
數據結構主要有以下常見問題:
-
未初始化寫入時報錯
-
遍歷時無序
-
非併發安全
本文通過分析標準庫中 map
的內部實現,嘗試解答上面的問題。
內部實現
map
的源代碼文件路徑爲 $GOROOT/src/runtime/map.go
,筆者的 Go 版本爲 go1.19 linux/amd64
。
相關常量
const (
bucketCntBits = 3
// 8: 一個 bucket 存放元素的數量上限
bucketCnt = 1 << bucketCntBits
// 觸發擴容的負載因子: 6.5
// 官方的解釋是這個值是根據測試結果統計出來的
// 負載量超過負載因子時,發生擴容
// 負載量 = 元素數量 / bucket 桶數量
loadFactorNum = 13
loadFactorDen = 2
// 保持內聯的鍵值對大小上限, 超過這個大小,鍵和值會被轉換爲指針
// 必須在 uint8 所表示的範圍內 [0 - 255]
maxKeySize = 128
maxElemSize = 128
// 數據偏移量根據 bmap 結構體的大小進行對齊
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
// tophash 除了放正常的高 8 位的 hash 值
// 還會在空閒、遷移時存儲一些自定義的狀態值
emptyRest = 0 // 槽未使用,且在較高的索引或溢出處不再有非空單元
emptyOne = 1 // 槽未使用
evacuatedX = 2 // 對應的鍵值存在,並且已經遷移到擴容後的 map 的前半部分
evacuatedY = 3 // 對應的鍵值存在,並且已經遷移到擴容後的 map 的後半部分
evacuatedEmpty = 4 // 槽未使用, 桶遷移完成
minTopHash = 5 // 正常填充槽的最小 TopHash 值
iterator = 1 // 可能有一個使用 bucket 的迭代器
oldIterator = 2 // 可能有一個使用 oldbuckets 的迭代器
hashWriting = 4 // 一個等待寫入 map 的 goroutine
sameSizeGrow = 8 // 等量擴容: 當前的 map 增長是一個相同大小的新 map
// 用於迭代器檢查的哨兵 bucket ID
noCheck = 1<<(8*goarch.PtrSize) - 1
)
我們暫時只需要關注兩個常量數值:
-
單個
bucket
最多可以放 8 個元素 -
map
負載因子上限爲 6.5
map
負載量的計算方式爲:
負載量 = 元素數量 / bucket 桶數量
數據結構
bmap 對象
bmap
對象表示 map
中用於存儲數據的 bucket
, 也就是數據桶。
type bmap struct {
// tophash 通常包含此 bucket 中每個 key 的 hash 值高位字節
// 如果 tophash[0] < minTopHash, 那麼其表示 bucket 的遷移狀態
tophash [bucketCnt]uint8
}
編譯器會將 bmap
結構體轉換爲下面的結構體:
type bmap struct {
tophash [bucketCnt]uint8
keys [bucketCnt]keytype
values [bucketCnt]valuetype
// 溢出 bucket 的指針
overflow uintptr
}
需要注意的是: key + value
並不是配對存儲的,而是分別存儲在不同的數組中。
把所有的 key
放在一起,然後把所有的 value
放在一起,可以消除爲了內存對齊而導致的內存填充 (主要是爲了節省內存),如 map[int64]int8
這種較爲極端的情況。
-
在配對存儲時,需要內存對齊,所以單個
key/value
大小爲 16 字節,8 個元素一共 128 字節 -
在獨立存儲時,不需要內存對齊,8 個鍵 64 字節,8 個元素 8 字節,一共 72 字節
bmap 對象
mapextra 對象
如果 map
的鍵和值都不包含指針,並且可以被內聯 (<=128 bytes), 那麼標記 bucket
桶不含指針,這樣可以避免 GC
掃描。 但是 bmap.overflow
字段是一個指針,爲了保證溢出的 bucket
桶存活 (不被 GC),在 hmap.extra.overflow
和 hmap.extra.oldoverflow
中存儲了指向所有溢出桶的指針。
type mapextra struct {
// overflow 和 oldoverflow 僅在 key 和 elem 都不包含指針時使用
// overflow 包含 hmap.buckets 的溢出桶
// oldoverflow 包含 hmap.oldbuckets 的溢出桶
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow 指向空閒的溢出桶
nextOverflow *bmap
}
hmap 對象
hmap
對象表示 map
數據類型的主體結構。
type hmap struct {
// map 中的元素個數
// 必須放在 struct 的第一個位置
// 因爲內置的 len 函數會從這裏讀取 (減少指令,提升性能,和 sync.Once 的 done 字段作用一樣)
count int
flags uint8
// bucket 桶數量對應的指數
// bucket 桶數量 = 2^B
// 超過 6.5 * 2^B 個元素,就需要擴容
B uint8
// 溢出 bucket 桶的數量
noverflow uint16
// 哈希種子, 爲哈希函數的結果引入隨機性
hash0 uint32
// bucket 桶存放數據的數組
// 如果 count 字段等於 0,則爲 nil
buckets unsafe.Pointer
// 只有在擴容期間不爲 nil
// 擴容期間指向 buckets
oldbuckets unsafe.Pointer
// 遷移進度計數器
// 小於計數器的 bucket 桶表示已經遷移完成
nevacuate uintptr
// 當鍵和值都可以內聯的時,就會使用這個字段
extra *mapextra
}
hmap 對象
方法
makemap_small 方法
makemap_small
方法實現了兩種 map
創建方式:
-
零長度
map
, 例如 make(map[k]v) -
長度較小的
map
, 例如 make(map[k]v, hint), 前提是在編譯時可以推斷出參數hint
最多爲 8 並且分配到堆上
func makemap_small() *hmap {
h := new(hmap)
// 生成哈希種子
h.hash0 = fastrand()
return h
}
makemap 方法
如果創建的 map
不符合 makemap_small
方法的條件, 會調用 makemap
方法來創建。
// 如果 h != nil, map 可以在 h 中創建
// 如果 h.buckets != nil, 可以複用 h.buckets 數組
func makemap(t *maptype, hint int, h *hmap) *hmap {
if h == nil {
h = new(hmap)
}
// 生成哈希種子
h.hash0 = fastrand()
// 計算需要的 bucket 數量
B := uint8(0)
for overLoadFactor(hint, B) {
B++
}
h.B = B
// 創建 bucket 桶用於保存數據的數組
// 如果 B == 0, buckets 字段懶加載
// 如果 map 長度較大,那麼這部分內存初始化會花較長時間
if h.B != 0 {
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
mapaccess2 方法
mapaccess1
,mapaccess2
,mapaccessK
幾個方法差不多,都可以用來訪問 map
元素,這裏以 mapaccess2
作爲示例說明。
mapaccess2
不僅可以返回參數 key 對應的元素,還可以返回第二個參數用來表示 key
對應的元素是否存在。
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
if h == nil || h.count == 0 {
// map == nil 或元素數量爲 0, 直接返回未找到
return unsafe.Pointer(&zeroVal[0]), false
}
if h.flags&hashWriting != 0 {
// 併發讀寫 map 錯誤
throw("concurrent map read and map write")
}
// 不同類型的 key,所用的 hash 算法是不一樣的
hash := t.hasher(key, uintptr(h.hash0))
// 找到對應的 bucket 桶
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 如果 map 正在擴容
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
// 如果當前擴容機制是翻倍擴容
// 說明之前的 buckets 只有現在的一半
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
// 如果舊桶中的數據還未遷移完成
// 說明 key 對應的值可能存在於舊桶中
b = oldb
}
}
// 取高 8 位的值
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) {
// 一個桶在滿 8 個元素後,會創建新的桶
// 然後掛在原來的桶的 overflow 指針上
for i := uintptr(0); i < bucketCnt; i++ {
// 循環對比桶中各個元素的哈希值
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
// 如果當前元素後面的元素都是空的,說明沒有更多的元素了
// 直接跳轉到下一個溢出桶
break bucketloop
}
continue
}
// 如果找到了相等的哈希值,那說明 key 對應的值可能存在於當前桶中
// 根據偏移量取出對應的 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.key.equal(key, k) {
// 僅噹噹前 key 和參數 key 完全相同時
// 才說明對應的元素找到了
return e, true
}
}
}
// 沒有找到 key 對應的值,返回零值和 false
return unsafe.Pointer(&zeroVal[0]), false
}
mapassign 方法
mapassign
方法用於設置 map
的值 ,和 mapaccess
方法不同的地方在於: 如果 key
不存在於 map
中,則爲它分配對應的槽,然後設置值。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
// map 未初始化時不能賦值
panic(plainError("assignment to entry in nil map"))
}
if h.flags&hashWriting != 0 {
// 併發寫錯誤
throw("concurrent map writes")
}
// 調用對應類型的 hash 算法
hash := t.hasher(key, uintptr(h.hash0))
// 調用 hash 算法後設置 hashWriting 狀態標識
// 因爲 t.hasher 可能發生 panic, 此時實際的設置操作還未完成
h.flags ^= hashWriting
// 初始化第一個桶
if h.buckets == nil {
h.buckets = newobject(t.bucket)
}
again:
// 計算低 8 位
bucket := hash & bucketMask(h.B)
if h.growing() {
// 如果 map 當前正在擴容
// 順便調用一次增量擴容
growWork(t, h, bucket)
}
// 計算出存儲的桶地址, 轉換爲 bucket 對象
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 計算高 8 位
top := tophash(hash)
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {
// 循環對比桶中各個元素的哈希值
if b.tophash[i] != top {
// 在 b.tophash[i] != top 的情況下
// 理論上有可能會是一個空槽位
// 一般情況下 map 的槽位分佈是這樣的,e 表示 empty:
// [h1][h2][h3][h4][h5][e][e][e]
// 但在執行過 delete 操作時,可能會變成這樣:
// [h1][h2][e][e][h5][e][e][e]
// 所以如果再插入的話,會盡量往前面的位置插入
// [h1][h2][e][e][h5][e][e][e]
// ^
// ^
// 這個位置
// 所以在循環的時候還要順便把前面的空位置先記下來
if isEmpty(b.tophash[i]) && inserti == nil {
// 如果這個槽位沒有被佔,說明可以往這裏放入 key 和 value
...
}
if b.tophash[i] == emptyRest {
// 如果當前元素後面的元素都是空的,說明沒有更多的元素了
// 直接跳轉到下一個溢出桶
break bucketloop
}
continue
}
// 如果找到了相等的哈希值,那說明 key 對應的值可能存在於當前桶中
// 根據偏移量取出對應的 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 如果兩個 key 的首 8 位、最後 8 位 hash 值一樣,會進行比較,和處理 hash 碰撞一樣
// 如果相同的哈希位置的 key 和要插入的 key 不相等
// 直接跳過
if !t.key.equal(key, k) {
continue
}
// 如果當前 key 和參數 key 完全相同
// 但是對應的位置已經有值了,直接更新
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// 設置完成,直接跳轉到 done 條件標籤
goto done
}
// 當前桶沒有可用的位置,從溢出桶繼續查找
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
// 沒有找到 key, 分配新的內存空間
// 如果觸發了負載因子上限或者已經有太多溢出 bucket,並且當前沒有在進行擴容,那麼開始擴容操作
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
// 擴容的時候會把當前的 bucket 桶放入 oldbucket
// 如果還沒有分配新的 bucket 桶, 則跳轉到 again 重試一次
// 重試的時候在 growWork 方法裏會把這個 key 對應的 bucket 桶提前分配好
goto again
}
if inserti == nil {
// 當前的桶和它掛載的溢出桶都滿了,分配一個新的桶
...
}
// 將鍵值存儲到對應的位置
...
// map 元素數量 + 1
h.count++
done:
if h.flags&hashWriting == 0 {
// 併發寫錯誤
throw("concurrent map writes")
}
return elem
}
mapdelete 方法
mapdelete
方法用於刪除 map
中指定的 key
, 其內部邏輯和 mapassign
賦值方法大多數都是相同的。
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
// 如果 map 還未初始化或元素數量爲 0, 直接返回
if h == nil || h.count == 0 {
return
}
if h.flags&hashWriting != 0 {
// 併發寫錯誤
throw("concurrent map writes")
}
// 調用對應類型的 hash 算法
hash := t.hasher(key, uintptr(h.hash0))
// 調用 hash 算法後設置 hashWriting 狀態標識
// 因爲 t.hasher 可能發生 panic, 此時實際的刪除操作還未完成
h.flags ^= hashWriting
// 計算低 8 位
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
// 計算出存儲的桶地址, 轉換爲 bmap 結構
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
// 計算高 8 位
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
// 循環對比桶中各個元素的哈希值
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
// 如果當前元素後面的元素都是空的,說明沒有更多的元素了
// 直接跳轉到下一個溢出桶
break search
}
continue
}
// 根據偏移量取出對應的 key
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 對應的槽標記爲未使用
b.tophash[i] = emptyOne
// map 元素數量 + 1
h.count--
}
}
if h.flags&hashWriting == 0 {
// 併發寫錯誤
throw("concurrent map writes")
}
}
makeBucketArray 方法
makeBucketArray
方法用於初始化 bucket
桶對應的底層數組。
-
參數
b
表示 bucket 桶的數量對應的指數 (例如傳入 3, 表示桶的數量爲 2 ^ 3 = 8) -
參數
dirtyalloc
表示是否複用其他的數組,如果爲nil
會分配一個新數組,否則會清空參數dirtyalloc
,然後重用這部分內存作爲新數組
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
base := bucketShift(b)
nbuckets := base
// 當 bucket 數量小於 2^4 時
// 由於數據較少、使用溢出桶的可能性較低,會省略創建的過程以減少額外開銷
// 當 bucket 桶的數量大於等於 2^4 時
// 額外創建 2 ^ (b - 4) 個溢出桶
if b >= 4 {
// 加上溢出桶的數量
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
if dirtyalloc == nil {
// 分配新的數組內存
buckets = newarray(t.bucket, int(nbuckets))
} else {
// 複用參數數組
buckets = dirtyalloc
size := t.bucket.size * nbuckets
if t.bucket.ptrdata != 0 {
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}
if base != nbuckets {
// 預分配一些溢出桶
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
小結
從應用層面來說,map
在使用前一定要進行初始化,如果對於存儲數據的數量有大概的瞭解,可以在初始化時 預分配
一定的容量,可以有效提高性能。
從內部實現來說,map
通過編譯器和運行時實現了常規操作,通過 拉鍊法
來解決哈希碰撞問題 (每個 bmap
對象都有一個 overflow
指針), 此外,通過將每個 bucket
中元素的前 8 位哈希值存入 tophash
, 以空間換時間,可以加速遍歷 bucket
中元素。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/EGM-ZXnuuWOdearV_xtdcQ