Go 底層探索 -五-: 哈希表 Map - 擴容 [下篇]
- 介紹
隨着哈希表中元素的逐漸增加,哈希的性能會逐漸惡化,所以我們需要更多的桶和更大的內存保證哈希的讀寫性能。
- 怎麼觸發
在每次對哈希表賦值時,都會調用runtime.mapassign
函數,該函數每次都會判斷是否需要擴容,主要有兩個函數:overLoadFactory
和tooManyOverflowBuckets
:
// hash[k]=x表達式,會在編譯期間轉換成runtime.mapassign 函數的調用
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
...
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
// 擴容入口
hashGrow(t, h)
goto again
}
...
}
-
overLoadFactory
: 主要判斷裝載因子是否超過6.5
-
tooManyOverflowBuckets
: 用來判斷是否使用了太多溢出桶; -
h.growing()
: 用來判斷是否已經處於擴容狀態;
因爲
Go
語言哈希的擴容不是一個原子的過程,所以runtime.mapassign
還需要判斷當前哈希是否已經處於擴容狀態,避免二次擴容造成混亂。
- 擴容方式
根據觸發的條件不同擴容的方式分成兩種:
-
第一種: 裝載因子超過
6.5
,則會進行雙倍重建; -
第二種: 當溢出桶的數量過多時,會進行等量重建;
3.1 等量擴容
當我們對map
不斷進行新增和刪除時,桶中可能會出現很多斷斷續續的空位,這些空位會導致連接的bmap
溢出桶很長,對應的掃描時間也會變長,查詢性能就會下降。這種擴容實際上是一種整理,把後置位的數據整理到前面。
3.2 雙倍重建
兩倍重建是爲了讓map
存儲更多的數據, 在雙倍重建時,我們還需要解決舊桶中的數據要轉移到某一個新桶中的問題。其中有一個非常重要的原則:如果數據的hash&bucketMask
[當前新桶所在的位置] 小於或等於舊桶的大小,則此數據必須轉移到和舊桶位置完全對應的新桶中去,理由是當前key
所在新桶的序號與舊桶是完全相同的。
- 擴容流程
4.1 擴容核心函數
擴容需要處理的問題是,擴容後,map
中原本的數據重新放到擴容後的map
中,即數據遷移問題,golang
中map
擴容時核心函數有如下幾個
-
hashGrow
:決定擴容方式,負責初始化新的桶,以及設置擴容後的map
的各個字段 -
growWork
:每調用一次growWork
函數,都至多會遷移兩個桶的數據 -
evacuate
:真正負責遷移數據的函數,會負責遷移指定桶中的數據 -
advanceEvacuationMark
:收尾工作,增加nevacuate
,如果所有的oldbuckets
都遷移完成了,會摘除oldbuckets
4.2 hashGrow
重建時需要調用hashGrow
函數,如果負載因子超載,則會進行雙倍重建。當溢出桶的數量過多時,會進行等量重建。新桶會存儲到buckets
字段,舊桶會存儲到oldbuckets
字段。map
中extra
字段的溢出桶也進行同樣的轉移。
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
// 舊數據存到舊桶
oldbuckets := h.buckets
// 創建一組新桶和溢出桶
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
h.B += bigger
h.flags = flags
// 舊數據存到舊桶上
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
// 原有的溢出桶,存到舊溢出桶
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
h.extra.nextOverflow = nextOverflow
}
@注意:這裏並沒有實際執行將舊桶中的數據轉移到新桶的過程。數據轉移遵循寫時複製(copy on write)的規則,只有在真正賦值時,纔會選擇是否需要進行數據轉移,其核心邏輯位於 growWork 和 evacuate 函數中。
4.3 growWork
growWork
函數並不會真正進行數據遷移,它會調用evacuate
函數來完成遷移工作,growWork
函數每次會遷移至多兩個桶的數據,一個是目前需要使用的桶,一個是h.nevacuate
桶(這裏很重要,在後面判斷是否遷移過程中有很大的作用),h.nevacuate
記錄的是目前至少已經遷移的桶的個數。
func growWork(t *maptype, h *hmap, bucket uintptr) {
// make sure we evacuate the oldbucket corresponding
// to the bucket we're about to use
evacuate(t, h, bucket&h.oldbucketmask())
// evacuate one more oldbucket to make progress on growing
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
4.4 evacuate
evacuate
是真正進行數據遷移的函數,它每次會遷移一個bmap
中的數據,簡單說,就是遍歷舊有buckets
中bmap
中的數據,將其放到新bmap
的對應位置;
在學習evacuate
函數前,先記bmap.tophash
的幾個特殊值,在擴容過程中會使用到:
emptyRest = 0 // 表明該位置及其以後的位置都沒有數據
emptyOne = 1 // 表明該位置沒有數據
evacuatedX = 2 // key/elem是有效的,它已經在擴容過程中被遷移到了更大表的前半部分
evacuatedY = 3 // key/elem是有效的,它已經在擴容過程中被遷移到了更大表的後半部分
evacuatedEmpty = 4 // 該位置沒有數據,且已被擴容
minTopHash = 5 // 一個被正常填充的tophash的最小值
4.4.1 判斷桶的遷移狀態
首先會判斷這個桶是否已經被遷移過了,或者正在遷移中,如果沒有被遷移,纔會進行遷移工作。該判斷是通過evacuated
函數完成的,該函數很簡單,只需要判斷tophash[0]
是否是evacuatedX,evacuatedY,evacuateEmpty
即可.
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}
4.4.2 初始化 evacDst 結構
初始化evacDst
結構,如果是等量擴容,則只會初始化一個,如果是普通擴容,則會初始化兩個。runtime.evacuate
會將一箇舊桶中的數據分流到兩個新桶,所以它會創建兩個用於保存分配上下文的 runtime.evacDst
結構體,這兩個結構體分別指向了一個新桶:
evacDst
結構體如下所示:
type evacDst struct {
b *bmap // 目標桶
i int // 每個桶有8個位置可以塞數據,這個i可以理解爲當前塞了多少個數據
k unsafe.Pointer // 目標桶中key的位置
e unsafe.Pointer // 目標桶中elem的位置
}
4.4.3 舊桶元素分流
如果這是等量擴容,那麼舊桶與新桶之間是一對一的關係,所以兩個 runtime.evacDst
只會初始化一個。而當哈希表的容量翻倍時,每個舊桶的元素會都分流到新創建的兩個桶中,這裏仔細分析一下分流元素的邏輯:
...
// 遍歷 oldbuckets 對應的 bucket 以及 oveflow
for ; b != nil; b = b.overflow(t) {
//獲取當前 bucket 的 key 的起始位置
k := add(unsafe.Pointer(b), dataOffset)
//獲取當前 bucket 的 elem 的起始位置
e := add(k, bucketCnt*uintptr(t.keysize))
//遍歷當前 bucket 中 8 個 key,elem
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
top := b.tophash[i]
// 如果爲空,則跳過
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
//如果小於 minTopHash ,則表示其已經被轉移走了,則 throw
if top < minTopHash {
throw("bad map state")
}
k2 := k
//如果存的是對應 key 的指針,則要獲取 key 的地址
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
// 如果是非等量遷移(雙倍重建)
if !h.sameSizeGrow() {
//算出當前 key 的 hash 值
hash := t.hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
useY = top & 1 //讓這個 key 50% 概率去 Y 半區
top = tophash(hash)
} else {
if hash&newbit != 0 {
useY = 1
}
}
}
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // 移動目標
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.key, dst.k, k) // copy elem
}
if t.indirectelem() {
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
typedmemmove(t.elem, dst.e, e)
}
dst.i++
// These updates might push these pointers past the end of the
// key or elem arrays. That's ok, as we have the overflow pointer
// at the end of the bucket to protect against pointing past the
// end of the bucket.
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
}
}
...
只使用哈希函數是不能定位到具體某一個桶的,哈希函數只會返回很長的哈希,例如:b72bfae3f3285244c4732ce457cca823bc189e0b
,我們還需一些方法將哈希映射到具體的桶上。我們一般都會使用取模或者位操作來獲取桶的編號,假如當前哈希中包含 4
個桶,那麼它的桶掩碼就是 0b11(3)
,使用位操作就會得到 3
, 我們就會在 3
號桶中存儲該數據:
0xb72bfae3f3285244c4732ce457cca823bc189e0b & 0b11 #=> 0
如果新的哈希表有 8
個桶,在大多數情況下,原來經過桶掩碼 0b11
結果爲 3
的數據會因爲桶掩碼增加了一位變成 0b111
而分流到新的 3
號和 7
號桶,所有數據也都會被 runtime.typedmemmove
拷貝到目標桶中:
4.5 advanceEvacuationMark
runtime.evacuate
最後會調用 runtime.advanceEvacuationMark
增加哈希的 nevacuate
計數器並在所有的舊桶都被分流後清空哈希的 oldbuckets
和 oldoverflow
:
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++
// Experiments suggest that 1024 is overkill by at least an order of magnitude.
// Put it in there as a safeguard anyway, to ensure O(1) behavior.
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
if h.nevacuate == newbit { // newbit == # of oldbuckets
// 大小增長全部結束。釋放老的 bucket array
h.oldbuckets = nil
// 同樣可以丟棄老的 overflow buckets
// 如果它們還被迭代器所引用的話
// 迭代器會持有一份指向 slice 的指針
if h.extra != nil {
h.extra.oldoverflow = nil
}
h.flags &^= sameSizeGrow
}
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/NcTJ8zKL19pQmNjfx3WbZQ