Go 底層探索 -五-: 哈希表 Map - 擴容 [下篇]

  1. 介紹

隨着哈希表中元素的逐漸增加,哈希的性能會逐漸惡化,所以我們需要更多的桶和更大的內存保證哈希的讀寫性能。

  1. 怎麼觸發

在每次對哈希表賦值時,都會調用runtime.mapassign 函數,該函數每次都會判斷是否需要擴容,主要有兩個函數:overLoadFactorytooManyOverflowBuckets:

// hash[k]=x表達式,會在編譯期間轉換成runtime.mapassign 函數的調用 
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
 ...
 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
    // 擴容入口
  hashGrow(t, h)
  goto again
 }
 ...
}

因爲Go 語言哈希的擴容不是一個原子的過程,所以 runtime.mapassign 還需要判斷當前哈希是否已經處於擴容狀態,避免二次擴容造成混亂。

  1. 擴容方式

根據觸發的條件不同擴容的方式分成兩種:

3.1 等量擴容

當我們對map不斷進行新增和刪除時,桶中可能會出現很多斷斷續續的空位,這些空位會導致連接的bmap溢出桶很長,對應的掃描時間也會變長,查詢性能就會下降。這種擴容實際上是一種整理,把後置位的數據整理到前面。

3.2 雙倍重建

兩倍重建是爲了讓map存儲更多的數據, 在雙倍重建時,我們還需要解決舊桶中的數據要轉移到某一個新桶中的問題。其中有一個非常重要的原則:如果數據的hash&bucketMask[當前新桶所在的位置] 小於或等於舊桶的大小,則此數據必須轉移到和舊桶位置完全對應的新桶中去,理由是當前key所在新桶的序號與舊桶是完全相同的。

  1. 擴容流程

4.1 擴容核心函數

擴容需要處理的問題是,擴容後,map中原本的數據重新放到擴容後的map中,即數據遷移問題,golangmap擴容時核心函數有如下幾個

4.2 hashGrow

重建時需要調用hashGrow函數,如果負載因子超載,則會進行雙倍重建。當溢出桶的數量過多時,會進行等量重建。新桶會存儲到buckets字段,舊桶會存儲到oldbuckets字段。mapextra字段的溢出桶也進行同樣的轉移。

func hashGrow(t *maptype, h *hmap) {
 bigger := uint8(1)
 if !overLoadFactor(h.count+1, h.B) {
  bigger = 0
  h.flags |= sameSizeGrow
 }
  // 舊數據存到舊桶
 oldbuckets := h.buckets
  // 創建一組新桶和溢出桶
 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

 h.B += bigger
 h.flags = flags
  // 舊數據存到舊桶上
 h.oldbuckets = oldbuckets
 h.buckets = newbuckets
 h.nevacuate = 0
 h.noverflow = 0
  // 原有的溢出桶,存到舊溢出桶
 h.extra.oldoverflow = h.extra.overflow
 h.extra.overflow = nil
 h.extra.nextOverflow = nextOverflow
}

@注意:這裏並沒有實際執行將舊桶中的數據轉移到新桶的過程。數據轉移遵循寫時複製(copy on write)的規則,只有在真正賦值時,纔會選擇是否需要進行數據轉移,其核心邏輯位於 growWork 和 evacuate 函數中。

擴容後的 map 的各個字段

4.3 growWork

growWork函數並不會真正進行數據遷移,它會調用evacuate函數來完成遷移工作,growWork函數每次會遷移至多兩個桶的數據,一個是目前需要使用的桶,一個是h.nevacuate桶(這裏很重要,在後面判斷是否遷移過程中有很大的作用),h.nevacuate記錄的是目前至少已經遷移的桶的個數。

func growWork(t *maptype, h *hmap, bucket uintptr) {
 // make sure we evacuate the oldbucket corresponding
 // to the bucket we're about to use
 evacuate(t, h, bucket&h.oldbucketmask())

 // evacuate one more oldbucket to make progress on growing
 if h.growing() {
  evacuate(t, h, h.nevacuate)
 }
}

4.4 evacuate

evacuate是真正進行數據遷移的函數,它每次會遷移一個bmap中的數據,簡單說,就是遍歷舊有bucketsbmap中的數據,將其放到新bmap的對應位置;

在學習evacuate函數前,先記bmap.tophash的幾個特殊值,在擴容過程中會使用到:

emptyRest      = 0 // 表明該位置及其以後的位置都沒有數據
emptyOne       = 1 // 表明該位置沒有數據
evacuatedX     = 2 // key/elem是有效的,它已經在擴容過程中被遷移到了更大表的前半部分
evacuatedY     = 3 // key/elem是有效的,它已經在擴容過程中被遷移到了更大表的後半部分
evacuatedEmpty = 4 // 該位置沒有數據,且已被擴容
minTopHash     = 5 // 一個被正常填充的tophash的最小值

4.4.1 判斷桶的遷移狀態

首先會判斷這個桶是否已經被遷移過了,或者正在遷移中,如果沒有被遷移,纔會進行遷移工作。該判斷是通過evacuated函數完成的,該函數很簡單,只需要判斷tophash[0]是否是evacuatedX,evacuatedY,evacuateEmpty即可.

func evacuated(b *bmap) bool {
 h := b.tophash[0]
 return h > emptyOne && h < minTopHash
}

4.4.2 初始化 evacDst 結構

初始化evacDst結構,如果是等量擴容,則只會初始化一個,如果是普通擴容,則會初始化兩個。runtime.evacuate 會將一箇舊桶中的數據分流到兩個新桶,所以它會創建兩個用於保存分配上下文的 runtime.evacDst 結構體,這兩個結構體分別指向了一個新桶:

hashmap-evacuate-destination

evacDst結構體如下所示:

type evacDst struct {
 b *bmap          // 目標桶
 i int            // 每個桶有8個位置可以塞數據,這個i可以理解爲當前塞了多少個數據
 k unsafe.Pointer // 目標桶中key的位置
 e unsafe.Pointer // 目標桶中elem的位置
}

4.4.3 舊桶元素分流

如果這是等量擴容,那麼舊桶與新桶之間是一對一的關係,所以兩個 runtime.evacDst 只會初始化一個。而當哈希表的容量翻倍時,每個舊桶的元素會都分流到新創建的兩個桶中,這裏仔細分析一下分流元素的邏輯:

...
    // 遍歷 oldbuckets 對應的 bucket 以及 oveflow
  for ; b != nil; b = b.overflow(t) {
      //獲取當前 bucket 的 key 的起始位置
   k := add(unsafe.Pointer(b), dataOffset)
      //獲取當前 bucket 的 elem 的起始位置
   e := add(k, bucketCnt*uintptr(t.keysize))
      //遍歷當前 bucket 中 8 個 key,elem
   for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
    top := b.tophash[i]
        // 如果爲空,則跳過
    if isEmpty(top) {
     b.tophash[i] = evacuatedEmpty
     continue
    }
        //如果小於 minTopHash ,則表示其已經被轉移走了,則 throw
    if top < minTopHash {
     throw("bad map state")
    }
    k2 := k
        //如果存的是對應 key 的指針,則要獲取 key 的地址
    if t.indirectkey() {
     k2 = *((*unsafe.Pointer)(k2))
    }
    var useY uint8
        // 如果是非等量遷移(雙倍重建)
    if !h.sameSizeGrow() {
          //算出當前 key 的 hash 值
     hash := t.hasher(k2, uintptr(h.hash0))
     if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
      useY = top & 1 //讓這個 key 50% 概率去 Y 半區
      top = tophash(hash)
     } else {
      if hash&newbit != 0 {
       useY = 1
      }
     }
    }

    if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
     throw("bad evacuatedN")
    }

    b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
    dst := &xy[useY]                 // 移動目標

    if dst.i == bucketCnt {
     dst.b = h.newoverflow(t, dst.b)
     dst.i = 0
     dst.k = add(unsafe.Pointer(dst.b), dataOffset)
     dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
    }
    dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
    if t.indirectkey() {
     *(*unsafe.Pointer)(dst.k) = k2 // copy pointer
    } else {
     typedmemmove(t.key, dst.k, k) // copy elem
    }
    if t.indirectelem() {
     *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
    } else {
     typedmemmove(t.elem, dst.e, e)
    }
    dst.i++
    // These updates might push these pointers past the end of the
    // key or elem arrays.  That's ok, as we have the overflow pointer
    // at the end of the bucket to protect against pointing past the
    // end of the bucket.
    dst.k = add(dst.k, uintptr(t.keysize))
    dst.e = add(dst.e, uintptr(t.elemsize))
   }
  }
...

只使用哈希函數是不能定位到具體某一個桶的,哈希函數只會返回很長的哈希,例如:b72bfae3f3285244c4732ce457cca823bc189e0b,我們還需一些方法將哈希映射到具體的桶上。我們一般都會使用取模或者位操作來獲取桶的編號,假如當前哈希中包含 4 個桶,那麼它的桶掩碼就是 0b11(3),使用位操作就會得到 3, 我們就會在 3 號桶中存儲該數據:

0xb72bfae3f3285244c4732ce457cca823bc189e0b & 0b11 #=> 0

如果新的哈希表有 8 個桶,在大多數情況下,原來經過桶掩碼 0b11 結果爲 3 的數據會因爲桶掩碼增加了一位變成 0b111 而分流到新的 3 號和 7 號桶,所有數據也都會被 runtime.typedmemmove 拷貝到目標桶中:

4.5 advanceEvacuationMark

runtime.evacuate 最後會調用 runtime.advanceEvacuationMark 增加哈希的 nevacuate 計數器並在所有的舊桶都被分流後清空哈希的 oldbucketsoldoverflow

func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
    h.nevacuate++
    // Experiments suggest that 1024 is overkill by at least an order of magnitude.
    // Put it in there as a safeguard anyway, to ensure O(1) behavior.
    stop := h.nevacuate + 1024
    if stop > newbit {
        stop = newbit
    }
    for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
        h.nevacuate++
    }
    if h.nevacuate == newbit { // newbit == # of oldbuckets
        // 大小增長全部結束。釋放老的 bucket array
        h.oldbuckets = nil
        // 同樣可以丟棄老的 overflow buckets
        // 如果它們還被迭代器所引用的話
        // 迭代器會持有一份指向 slice 的指針
        if h.extra != nil {
            h.extra.oldoverflow = nil
        }
        h.flags &^= sameSizeGrow
    }
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NcTJ8zKL19pQmNjfx3WbZQ