map 設計與實現(上篇)- 數據結構

概述

在計算機科學中,關聯數組 (Associative Array),又稱映射 (Map)、字典 (Dictionary) 是一個抽象的數據結構,它包含着類似於(鍵,值)的有序對。

從應用的角度來看,Go 語言的 map 數據結構主要有以下常見問題:

本文通過分析標準庫中 map 的內部實現,嘗試解答上面的問題。

內部實現

map 的源代碼文件路徑爲 $GOROOT/src/runtime/map.go,筆者的 Go 版本爲 go1.19 linux/amd64

相關常量

const (
 bucketCntBits = 3
  // 8: 一個 bucket 存放元素的數量上限
 bucketCnt     = 1 << bucketCntBits

 // 觸發擴容的負載因子: 6.5
 // 官方的解釋是這個值是根據測試結果統計出來的
 // 負載量超過負載因子時,發生擴容
 // 負載量 = 元素數量 / bucket 桶數量
 loadFactorNum = 13
 loadFactorDen = 2

 // 保持內聯的鍵值對大小上限, 超過這個大小,鍵和值會被轉換爲指針
 // 必須在 uint8 所表示的範圍內 [0 - 255]
 maxKeySize  = 128
 maxElemSize = 128

 // 數據偏移量根據 bmap 結構體的大小進行對齊
 dataOffset = unsafe.Offsetof(struct {
  b bmap
  v int64
 }{}.v)

  // tophash 除了放正常的高 8 位的 hash 值
  // 還會在空閒、遷移時存儲一些自定義的狀態值

 emptyRest      = 0 // 槽未使用,且在較高的索引或溢出處不再有非空單元
 emptyOne       = 1 // 槽未使用
 evacuatedX     = 2 // 對應的鍵值存在,並且已經遷移到擴容後的 map 的前半部分
 evacuatedY     = 3 // 對應的鍵值存在,並且已經遷移到擴容後的 map 的後半部分
 evacuatedEmpty = 4 // 槽未使用, 桶遷移完成
 minTopHash     = 5 // 正常填充槽的最小 TopHash 值

 iterator     = 1 // 可能有一個使用 bucket 的迭代器
 oldIterator  = 2 // 可能有一個使用 oldbuckets 的迭代器
 hashWriting  = 4 // 一個等待寫入 map 的 goroutine
 sameSizeGrow = 8 // 等量擴容: 當前的 map 增長是一個相同大小的新 map

 // 用於迭代器檢查的哨兵 bucket ID
 noCheck = 1<<(8*goarch.PtrSize) - 1
)

我們暫時只需要關注兩個常量數值:

  1. 單個 bucket 最多可以放 8 個元素

  2. map 負載因子上限爲 6.5

map 負載量的計算方式爲:

負載量 = 元素數量 / bucket 桶數量

數據結構

bmap 對象

bmap 對象表示 map 中用於存儲數據的 bucket, 也就是數據桶。

type bmap struct {
 // tophash 通常包含此 bucket 中每個 key 的 hash 值高位字節
 // 如果 tophash[0] < minTopHash, 那麼其表示 bucket 的遷移狀態
 tophash [bucketCnt]uint8
}

編譯器會將 bmap 結構體轉換爲下面的結構體:

type bmap struct {
    tophash  [bucketCnt]uint8
    keys     [bucketCnt]keytype
    values   [bucketCnt]valuetype
   // 溢出 bucket 的指針
    overflow uintptr
}

需要注意的是: key + value 並不是配對存儲的,而是分別存儲在不同的數組中

把所有的 key 放在一起,然後把所有的 value 放在一起,可以消除爲了內存對齊而導致的內存填充 (主要是爲了節省內存),如 map[int64]int8 這種較爲極端的情況

g12NEI

bmap 對象

mapextra 對象

如果 map 的鍵和值都不包含指針,並且可以被內聯 (<=128 bytes), 那麼標記 bucket 桶不含指針,這樣可以避免 GC 掃描。 但是 bmap.overflow 字段是一個指針,爲了保證溢出的 bucket 桶存活 (不被 GC),在 hmap.extra.overflowhmap.extra.oldoverflow 中存儲了指向所有溢出桶的指針。

type mapextra struct {
 // overflow 和 oldoverflow 僅在 key 和 elem 都不包含指針時使用
 // overflow 包含 hmap.buckets 的溢出桶
 // oldoverflow 包含 hmap.oldbuckets 的溢出桶

 overflow    *[]*bmap
 oldoverflow *[]*bmap

 // nextOverflow 指向空閒的溢出桶
 nextOverflow *bmap
}

hmap 對象

hmap 對象表示 map 數據類型的主體結構。

type hmap struct {
  // map 中的元素個數
 // 必須放在 struct 的第一個位置
 // 因爲內置的 len 函數會從這裏讀取 (減少指令,提升性能,和 sync.Once 的 done 字段作用一樣)
 count     int
 flags     uint8

 // bucket 桶數量對應的指數
 // bucket 桶數量 = 2^B
 // 超過 6.5 * 2^B 個元素,就需要擴容
 B         uint8
 // 溢出 bucket 桶的數量
 noverflow uint16
  // 哈希種子, 爲哈希函數的結果引入隨機性
 hash0     uint32

  // bucket 桶存放數據的數組
 // 如果 count 字段等於 0,則爲 nil
 buckets    unsafe.Pointer
 // 只有在擴容期間不爲 nil
 // 擴容期間指向 buckets
 oldbuckets unsafe.Pointer
 // 遷移進度計數器
 // 小於計數器的 bucket 桶表示已經遷移完成
 nevacuate  uintptr

 // 當鍵和值都可以內聯的時,就會使用這個字段
 extra *mapextra
}

hmap 對象

方法

makemap_small 方法

makemap_small 方法實現了兩種 map 創建方式:

func makemap_small() *hmap {
 h := new(hmap)
 // 生成哈希種子
 h.hash0 = fastrand()
 return h
}

makemap 方法

如果創建的 map 不符合 makemap_small 方法的條件, 會調用 makemap 方法來創建。

// 如果 h != nil, map 可以在 h 中創建
// 如果 h.buckets != nil, 可以複用 h.buckets 數組
func makemap(t *maptype, hint int, h *hmap) *hmap {
 if h == nil {
  h = new(hmap)
 }
 // 生成哈希種子
 h.hash0 = fastrand()

 // 計算需要的 bucket 數量
 B := uint8(0)
 for overLoadFactor(hint, B) {
  B++
 }
 h.B = B

 // 創建 bucket 桶用於保存數據的數組
 // 如果 B == 0, buckets 字段懶加載
 // 如果 map 長度較大,那麼這部分內存初始化會花較長時間
 if h.B != 0 {
  var nextOverflow *bmap
  h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
  if nextOverflow != nil {
   h.extra = new(mapextra)
   h.extra.nextOverflow = nextOverflow
  }
 }

 return h
}

mapaccess2 方法

mapaccess1mapaccess2mapaccessK 幾個方法差不多,都可以用來訪問 map 元素,這裏以 mapaccess2 作爲示例說明。

mapaccess2 不僅可以返回參數 key 對應的元素,還可以返回第二個參數用來表示 key 對應的元素是否存在。

func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
 if h == nil || h.count == 0 {
  // map == nil 或元素數量爲 0, 直接返回未找到
  return unsafe.Pointer(&zeroVal[0]), false
 }
 if h.flags&hashWriting != 0 {
    // 併發讀寫 map 錯誤
  throw("concurrent map read and map write")
 }

  // 不同類型的 key,所用的 hash 算法是不一樣的
 hash := t.hasher(key, uintptr(h.hash0))
  // 找到對應的 bucket 桶
 m := bucketMask(h.B)
 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))

  // 如果 map 正在擴容
 if c := h.oldbuckets; c != nil {
  if !h.sameSizeGrow() {
   // 如果當前擴容機制是翻倍擴容
   // 說明之前的 buckets 只有現在的一半
   m >>= 1
  }
  oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
  if !evacuated(oldb) {
   // 如果舊桶中的數據還未遷移完成
   // 說明 key 對應的值可能存在於舊桶中
   b = oldb
  }
 }
  // 取高 8 位的值
 top := tophash(hash)

bucketloop:
 for ; b != nil; b = b.overflow(t) {
  // 一個桶在滿 8 個元素後,會創建新的桶
  // 然後掛在原來的桶的 overflow 指針上
  for i := uintptr(0); i < bucketCnt; i++ {
   // 循環對比桶中各個元素的哈希值
   if b.tophash[i] != top {
    if b.tophash[i] == emptyRest {
          // 如果當前元素後面的元素都是空的,說明沒有更多的元素了
     // 直接跳轉到下一個溢出桶
     break bucketloop
    }
    continue
   }

   // 如果找到了相等的哈希值,那說明 key 對應的值可能存在於當前桶中
   // 根據偏移量取出對應的 key
      k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

   if t.key.equal(key, k) {
        // 僅噹噹前 key 和參數 key 完全相同時
    // 才說明對應的元素找到了
    return e, true
   }
  }
 }

  // 沒有找到 key 對應的值,返回零值和 false
 return unsafe.Pointer(&zeroVal[0]), false
}

mapassign 方法

mapassign 方法用於設置 map 的值 ,和 mapaccess 方法不同的地方在於: 如果 key 不存在於 map 中,則爲它分配對應的槽,然後設置值。

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
 if h == nil {
  // map 未初始化時不能賦值
  panic(plainError("assignment to entry in nil map"))
 }

 if h.flags&hashWriting != 0 {
    // 併發寫錯誤
  throw("concurrent map writes")
 }
  // 調用對應類型的 hash 算法
 hash := t.hasher(key, uintptr(h.hash0))

 // 調用 hash 算法後設置 hashWriting 狀態標識
 // 因爲 t.hasher 可能發生 panic, 此時實際的設置操作還未完成
 h.flags ^= hashWriting

  // 初始化第一個桶
 if h.buckets == nil {
  h.buckets = newobject(t.bucket)
 }

again:
  // 計算低 8 位
 bucket := hash & bucketMask(h.B)
 if h.growing() {
  // 如果 map 當前正在擴容
  // 順便調用一次增量擴容
  growWork(t, h, bucket)
 }

 // 計算出存儲的桶地址, 轉換爲 bucket 對象
 b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
  // 計算高 8 位
 top := tophash(hash)

bucketloop:
 for {
  for i := uintptr(0); i < bucketCnt; i++ {
   // 循環對比桶中各個元素的哈希值
   if b.tophash[i] != top {
    // 在 b.tophash[i] != top 的情況下
    // 理論上有可能會是一個空槽位
    // 一般情況下 map 的槽位分佈是這樣的,e 表示 empty:
    // [h1][h2][h3][h4][h5][e][e][e]
    // 但在執行過 delete 操作時,可能會變成這樣:
    // [h1][h2][e][e][h5][e][e][e]
    // 所以如果再插入的話,會盡量往前面的位置插入
    // [h1][h2][e][e][h5][e][e][e]
    //          ^
    //          ^
    //       這個位置
    // 所以在循環的時候還要順便把前面的空位置先記下來
    if isEmpty(b.tophash[i]) && inserti == nil {
     // 如果這個槽位沒有被佔,說明可以往這裏放入 key 和 value
                    ...
    }
    if b.tophash[i] == emptyRest {
                    // 如果當前元素後面的元素都是空的,說明沒有更多的元素了
                    // 直接跳轉到下一個溢出桶
     break bucketloop
    }
    continue
   }

   // 如果找到了相等的哈希值,那說明 key 對應的值可能存在於當前桶中
   // 根據偏移量取出對應的 key
   k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

      // 如果兩個 key 的首 8 位、最後 8 位 hash 值一樣,會進行比較,和處理 hash 碰撞一樣
   // 如果相同的哈希位置的 key 和要插入的 key 不相等
   // 直接跳過
   if !t.key.equal(key, k) {
    continue
   }

   // 如果當前 key 和參數 key 完全相同
   // 但是對應的位置已經有值了,直接更新
   if t.needkeyupdate() {
    typedmemmove(t.key, k, key)
   }
   elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
   // 設置完成,直接跳轉到 done 條件標籤
   goto done
  }

    // 當前桶沒有可用的位置,從溢出桶繼續查找
  ovf := b.overflow(t)
  if ovf == nil {
   break
  }
  b = ovf
 }

 // 沒有找到 key, 分配新的內存空間

 // 如果觸發了負載因子上限或者已經有太多溢出 bucket,並且當前沒有在進行擴容,那麼開始擴容操作
 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
  hashGrow(t, h)
  // 擴容的時候會把當前的 bucket 桶放入 oldbucket
  // 如果還沒有分配新的 bucket 桶, 則跳轉到 again 重試一次
  // 重試的時候在 growWork 方法裏會把這個 key 對應的 bucket 桶提前分配好
  goto again
 }

 if inserti == nil {
  // 當前的桶和它掛載的溢出桶都滿了,分配一個新的桶
    ...
 }

 // 將鍵值存儲到對應的位置
  ...

  // map 元素數量 + 1
 h.count++

done:
 if h.flags&hashWriting == 0 {
    // 併發寫錯誤
  throw("concurrent map writes")
 }

 return elem
}

mapdelete 方法

mapdelete 方法用於刪除 map 中指定的 key, 其內部邏輯和 mapassign 賦值方法大多數都是相同的。

func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
  // 如果 map 還未初始化或元素數量爲 0, 直接返回
 if h == nil || h.count == 0 {
  return
 }
 if h.flags&hashWriting != 0 {
    // 併發寫錯誤
  throw("concurrent map writes")
 }

  // 調用對應類型的 hash 算法
 hash := t.hasher(key, uintptr(h.hash0))

  // 調用 hash 算法後設置 hashWriting 狀態標識
  // 因爲 t.hasher 可能發生 panic, 此時實際的刪除操作還未完成
 h.flags ^= hashWriting

  // 計算低 8 位
 bucket := hash & bucketMask(h.B)
 if h.growing() {
  growWork(t, h, bucket)
 }
  // 計算出存儲的桶地址, 轉換爲 bmap 結構
 b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
 bOrig := b
  // 計算高 8 位
 top := tophash(hash)

search:
 for ; b != nil; b = b.overflow(t) {
  // 循環對比桶中各個元素的哈希值
  for i := uintptr(0); i < bucketCnt; i++ {
   if b.tophash[i] != top {
    if b.tophash[i] == emptyRest {
     // 如果當前元素後面的元素都是空的,說明沒有更多的元素了
     // 直接跳轉到下一個溢出桶
     break search
    }
    continue
   }

      // 根據偏移量取出對應的 key
   k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
   // 對應的槽標記爲未使用
      b.tophash[i] = emptyOne
   // map 元素數量 + 1
   h.count--
  }
 }

 if h.flags&hashWriting == 0 {
    // 併發寫錯誤
  throw("concurrent map writes")
 }
}

makeBucketArray 方法

makeBucketArray 方法用於初始化 bucket 桶對應的底層數組。

func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
 base := bucketShift(b)
 nbuckets := base
 // 當 bucket 數量小於 2^4 時
 // 由於數據較少、使用溢出桶的可能性較低,會省略創建的過程以減少額外開銷

 // 當 bucket 桶的數量大於等於 2^4 時
 // 額外創建 2 ^ (b - 4) 個溢出桶

 if b >= 4 {
  // 加上溢出桶的數量
  nbuckets += bucketShift(b - 4)
  sz := t.bucket.size * nbuckets
  up := roundupsize(sz)
  if up != sz {
   nbuckets = up / t.bucket.size
  }
 }

 if dirtyalloc == nil {
  // 分配新的數組內存
  buckets = newarray(t.bucket, int(nbuckets))
 } else {
  // 複用參數數組
  buckets = dirtyalloc
  size := t.bucket.size * nbuckets
  if t.bucket.ptrdata != 0 {
   memclrHasPointers(buckets, size)
  } else {
   memclrNoHeapPointers(buckets, size)
  }
 }

 if base != nbuckets {
  // 預分配一些溢出桶
  nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
  last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
  last.setoverflow(t, (*bmap)(buckets))
 }
 return buckets, nextOverflow
}

小結

從應用層面來說,map 在使用前一定要進行初始化,如果對於存儲數據的數量有大概的瞭解,可以在初始化時 預分配 一定的容量,可以有效提高性能。

從內部實現來說,map 通過編譯器和運行時實現了常規操作,通過 拉鍊法 來解決哈希碰撞問題 (每個 bmap 對象都有一個 overflow 指針), 此外,通過將每個 bucket 中元素的前 8 位哈希值存入 tophash, 以空間換時間,可以加速遍歷 bucket 中元素。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/EGM-ZXnuuWOdearV_xtdcQ