Go 底層探索 -四-: 哈希表 Map[上篇]

@注: 以下內容來自【Go 語言底層原理剖析、Go 語言設計與實現】兩書中的摘要信息,一起讀書、一起學習。

  1. 介紹

Go語言中的map又被稱爲哈希表,是使用頻率極高的一種數據結構。哈希表的原理是將多個鍵/值(key/value)對, 分散存儲在buckets(桶)中。

通常map的時間複雜度爲O(1), 通過一個鍵快速尋找其唯一對應的值value。在許多情況下,哈希表的查找速度明顯快於一些搜索樹形式的數據結構,被廣泛用於關聯數組、緩存、數據庫緩存等場景中。

如果想要實現一個性能優異的哈希表,需要注意兩個關鍵點 —— 哈希函數和衝突解決方法。

  1. 哈希函數與碰撞

實現哈希表的關鍵點在於哈希函數的選擇,哈希函數的選擇在很大程度上能夠決定哈希表的讀寫性能。在理想情況下,哈希函數應該能夠將不同鍵映射到不同的索引上,但是在實際使用時,這個理想的效果是不可能實現的。

下面展示兩個圖,分別示意完美哈希函數和不均勻哈希函數計算的結果圖:


完美哈希函數計算值

不均勻哈希函數計算值

不同的鍵通過哈希函數產生相同的哈希值, 則就是我們常聽到的哈希碰撞 (哈希衝突)。

如果將2450個鍵隨機分配到一百萬個桶中,則根據概率計算,至少有兩個鍵被分配到同一個桶中的可能性有驚人的95%

哈希碰撞導致同一個桶中可能存在多個元素,有多種方式可以避免哈希碰撞,一般有兩種主要的策略:拉鍊法和開放尋址法。

  1. 開放尋址法

3.1 設計原理

概念: 開放尋址法是一種在哈希表中解決哈希碰撞的方法,這種方法的核心思想是: 依次探測和比較數組中的元素以判斷目標鍵值對是否存在於哈希表中。

優點: 由於存儲的地址連續,可以更好的利用CPU高速緩存。

缺點: 當散列表中插入的數據越來越多時,散列衝突發生的可能性就會越來越大,空閒位置會越來越少,線性探測的時間就會越來越久。極端情況下,需要從頭到尾探測整個散列表,所以最壞情況下的時間複雜度爲 O(n)

@注: Go 語言中的哈希表採用的是開放尋址法中的線性探測(Linear Probing)策略

開放尋址法中對性能影響最大的是裝載因子,它是數組中元素的數量與數組大小的比值。隨着裝載因子的增加,線性探測的平均用時就會逐漸增加,這會影響哈希表的讀寫性能。當裝載率超過 70% 之後,哈希表的性能就會急劇下降,而一旦裝載率達到 100%,整個哈希表就會完全失效,這時查找和插入任意元素的時間複雜度都是 𝑂(𝑛) ,這時需要遍歷數組中的全部元素,所以在實現哈希表時一定要關注裝載因子的變化。

裝載因子:

在一個性能比較好的哈希表中,每一個桶中都應該有 0~1 個元素,有時會有 2~3 個,很少會超過這個數量。計算哈希、定位桶和遍歷鏈表三個過程是哈希表讀寫操作的主要開銷,裝載因子公式:

裝載因子元素數量桶數量

3.2 寫入動畫

寫入流程動畫演示 - 先行探測

以上圖爲例,散列表的大小爲8 ,黃色區域表示空閒位置,橙色區域表示已經存儲了數據。目前散列表中已經存儲了 4 個元素。此時元素 7777777  經過 Hash 算法之後,被散列到位置下標爲 7 的位置,但是這個位置已經有數據了,所以就產生了衝突。於是按順序地往後一個一個找,看有沒有空閒的位置,此時,運氣很好正巧在下一個位置就有空閒位置,將其插入,完成了數據存儲。

  1. 拉鍊法

4.1 設計原理

概念: 將同一個桶中的多個元素通過鏈表的形式進行鏈接,這是一種最簡單、最常用的策略。

優點: 隨着桶中元素的增加,可以不斷鏈接新的元素,同時不用預先爲元素分配內存。

缺點: 需要存儲額外的指針用於鏈接元素,這增加了整個哈希表的大小。同時由於鏈表存儲的地址不連續,所以無法高效利用 CPU 高速緩存。

4.2 寫入動畫

已上圖爲例,還是插入元素7777777, 過 Hash 算法之後,被散列到位置下標爲 7 的位置, 但是這個位置已經有數據了,所以就產生了衝突,然後通過鏈表的形式,掛載在後面。

  1. 併發衝突

和其他語言不同的是,map並不支持併發的讀寫,map併發讀寫是初級開發者經常會犯的錯誤。下面的示例由於協程併發讀寫會報錯爲fatal error:concurrent map read and map write

5.1 錯誤示例

func TestRun(t *testing.T) {
 mp := map[string]int{
  "a": 10,
  "b": 5,
 }
 // 開啓讀協程
 go func() {
  for true {
   fmt.Println("a = ", mp["a"])
  }
 }()
 // 開啓寫協程
 go func() {
  for true {
   mp["b"] = 10
  }
 }()
 time.Sleep(time.Second * 3)
 fmt.Println("ok")
}

5.2 不支持併發讀寫原因

Go語言爲什麼不支持併發的讀寫,是一個頻繁被提起的問題。我們可以在Go官方文檔中找到問題的答案。

官方文檔的解釋是:"map 不需要從多個 Goroutine 安全訪問,在實際情況下,map 可能是某些已經同步的較大數據結構或計算的一部分。因此,要求所有 map 操作都互斥將減慢大多數程序的速度,而只會增加少數程序的安全性。" 即Go語言只支持併發讀寫的原因是保證大多數場景下的查找效率。

  1. 數據結構

6.1 核心結構體

Go 語言運行時同時使用了多個數據結構組合表示哈希表,其中 runtime.hmap 是最核心的結構體,我們先來了解一下該結構體的內部字段:

type hmap struct {
 count     int
 flags     uint8
 B         uint8
 noverflow uint16
 hash0     uint32
 buckets    unsafe.Pointer
 oldbuckets unsafe.Pointer
 nevacuate  uintptr
 extra *mapextra
}

type mapextra struct {
 overflow    *[]*bmap
 oldoverflow *[]*bmap
 nextOverflow *bmap
}

6.2 結構示意圖

如上圖所示哈希表 runtime.hmap 的桶是 runtime.bmap。每一個 runtime.bmap 都能存儲 8 個鍵值對,當哈希表中存儲的數據過多,單個桶已經裝滿時就會使用 extra.nextOverflow 中桶存儲溢出的數據。

上述兩種不同的桶在內存中是連續存儲的,我們在這裏將它們分別稱爲正常桶和溢出桶,上圖中黃色的 runtime.bmap 就是正常桶,綠色的 runtime.bmap 是溢出桶,溢出桶是在 Go 語言還使用 C 語言實現時使用的設計,由於它能夠減少擴容的頻率所以一直使用至今。

6.3 桶結構 (bmap)

代表桶的bmap結構在運行時只列出了首個字段,即一個固定長度爲8的數組。此字段順序存儲key的哈希值的前 8 位。

type bmap struct {
 tophash [bucketCnt]uint8
}

1. 桶中存儲的 key 和 value 到哪裏去了?

在運行期間,runtime.bmap 結構體其實不止包含 tophash 字段,因爲哈希表中可能存儲不同類型的鍵值對,而且 Go 語言也不支持泛型 (當時不支持),所以鍵值對佔據的內存空間大小隻能在編譯時進行推導。runtime.bmap 中的其他字段在運行時也都是通過計算內存地址的方式訪問的,所以它的定義中就不包含這些字段,不過我們能根據編譯期間的 cmd/compile/internal/gc.bmap 函數重建它的結構:

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

隨着哈希表存儲的數據逐漸增多,我們會擴容哈希表或者使用額外的桶存儲溢出的數據,不會讓單個桶中的數據超過 8 個,不過溢出桶只是臨時的解決方案,創建過多的溢出桶最終也會導致哈希的擴容。

  1. 初始化

7.1 字面量初始化

目前的現代編程語言基本都支持使用字面量的方式初始化哈希,一般都會使用 key: value 的語法來表示鍵值對,Go 語言中也不例外:

hash := map[string]int{
 "1": 2,
 "3": 4,
 "5": 6,
}

我們需要在初始化哈希時聲明鍵值對的類型,這種使用字面量初始化的方式最終都會通過 cmd/compile/internal/gc.maplit 初始化,我們來分析一下該函數初始化哈希的過程:

func maplit(n *Node, m *Node, init *Nodes) {
 a := nod(OMAKE, nil, nil)
 a.Esc = n.Esc
 a.List.Set2(typenod(n.Type), nodintconst(int64(n.List.Len())))
 litas(m, a, init)

 entries := n.List.Slice()
 if len(entries) > 25 {
  ...
  return
 }

 // Build list of var[c] = expr.
 // Use temporaries so that mapassign1 can have addressable key, elem.
 ...
}

1. 元素數量小於等於 25 時:

當哈希表中的元素數量 <=25 個時,編譯器會將字面量初始化的結構體轉換成以下的代碼,將所有的鍵值對一次加入到哈希表中:

hash := make(map[string]int, 3)
hash["1"] = 2
hash["3"] = 4
hash["5"] = 6

這種初始化的方式與的數組和切片幾乎完全相同,由此看來集合類型的初始化在 Go 語言中有着相同的處理邏輯。

2. 元素數量大於 25 時:

一旦哈希表中元素的數量超過了 25 個,編譯器會創建兩個數組分別存儲鍵和值,這些鍵值對會通過如下所示的 for 循環加入哈希:

hash := make(map[string]int, 26)
vstatk := []string{"1""2""3", ... , "26"}
vstatv := []int{1, 2, 3, ... , 26}
for i := 0; i < len(vstak); i++ {
    hash[vstatk[i]] = vstatv[i]
}

使用字面量初始化的過程都會使用 Go 語言中的關鍵字 make 來創建新的哈希並通過最原始的 [] 語法向哈希追加元素.

7.2 運行時

當創建的哈希被分配到棧上並且其容量小於 BUCKETSIZE = 8 時,Go 語言在編譯階段會使用如下方式快速初始化哈希,這也是編譯器對小容量的哈希做的優化:

var h *hmap
var hv hmap
var bv bmap
h := &hv
b := &bv
h.buckets = b
h.hash0 = fashtrand0()

除了上述特定的優化之外,無論 make 是從哪裏來的,只要我們使用 make 創建哈希,Go 語言編譯器都會在類型檢查期間將它們轉換成 runtime.makemap,使用字面量初始化哈希也只是語言提供的輔助工具,最後調用的都是 runtime.makemap

func makemap(t *maptype, hint int, h *hmap) *hmap {
 mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
 if overflow || mem > maxAlloc {
  hint = 0
 }

 if h == nil {
  h = new(hmap)
 }
 h.hash0 = fastrand()

 B := uint8(0)
 for overLoadFactor(hint, B) {
  B++
 }
 h.B = B

 if h.B != 0 {
  var nextOverflow *bmap
  h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
  if nextOverflow != nil {
   h.extra = new(mapextra)
   h.extra.nextOverflow = nextOverflow
  }
 }
 return h
}

這個函數會按照下面的步驟執行:

  1. 計算哈希佔用的內存是否溢出或者超出能分配的最大值;

  2. 調用 runtime.fastrand 獲取一個隨機的哈希種子;

  3. 根據傳入的 hint 計算出需要的最小需要的桶的數量;

  4. 使用 runtime.makeBucketArray 創建用於保存桶的數組;

  5. 讀寫原理


8.1 讀取

在編譯的類型檢查期間,hash[key] 以及類似的操作都會被轉換成哈希的 OINDEXMAP 操作,中間代碼生成階段會在 cmd/compile/internal/gc.walkexpr 函數中將這些 OINDEXMAP 操作轉換成如下的代碼:

v     := hash[key] // => v     := *mapaccess1(maptype, hash, &key)
v, ok := hash[key] // => v, ok := mapaccess2(maptype, hash, &key)

賦值語句左側接受參數的個數會決定使用的運行時方法:

runtime.mapaccess1 會先通過哈希表設置的哈希函數、種子獲取當前鍵對應的哈希,再通過 runtime.bucketMaskruntime.add 拿到該鍵值對所在的桶序號和哈希高位的 8 位數字,

runtime.mapaccess1源碼:

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
 alg := t.key.alg
  // 獲取當前鍵對應的哈希
 hash := alg.hash(key, uintptr(h.hash0))
 m := bucketMask(h.B)
  // hash&m計算出key應該位於哪一個桶中
 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
  // tophash(hash)計算出hash的前8位
 top := tophash(hash)
bucketloop:
 for ; b != nil; b = b.overflow(t) {
  for i := uintptr(0); i < bucketCnt; i++ {
      // 與存儲在桶中的tophash進行對比
   if b.tophash[i] != top {
    if b.tophash[i] == emptyRest {
     break bucketloop
    }
    continue
   }
   k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
   if alg.equal(key, k) {
    v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
    return v
   }
  }
 }
 return unsafe.Pointer(&zeroVal[0])
}

bucketloop 循環中,哈希會依次遍歷正常桶和溢出桶中的數據,它會先比較哈希的高 8 位和桶中存儲的 tophashGo語言採用了一種簡單的方式hash&m計算出key應該位於哪一個桶中。獲取桶的位置後,tophash(hash)計算出hash的前8位。接着此hash挨個與存儲在桶中的tophash進行對比。如果有hash值相同,則會找到此hash值對應的key值並判斷是否相同。如果key值也相同,則說明查找到了結果,返回value

看圖加深理解:

訪問哈希表中的數據

如上圖所示,每一個桶都是一整片的內存空間,當發現桶中的 tophash 與傳入鍵的 tophash 匹配之後,我們會通過指針和偏移量獲取哈希中存儲的鍵 keys[0] 並與 key 比較,如果兩者相同就會獲取目標值的指針 values[0] 並返回。

8.2 寫入

當形如 hash[k] 的表達式出現在賦值符號左側時,該表達式也會在編譯期間轉換成 runtime.mapassign 函數的調用,該函數與 runtime.mapaccess1 比較相似,我們將其分成幾個部分依次分析,首先是函數會根據傳入的鍵拿到對應的哈希和桶:

src/runtime/map.go:578

func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
 alg := t.key.alg
  // 先計算key的hash值
 hash := alg.hash(key, uintptr(h.hash0))
  // 標記當前map是寫入狀態
 h.flags ^= hashWriting

again:
  // 計算對應的桶
 bucket := hash & bucketMask(h.B)
 b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
 top := tophash(hash)

然後通過遍歷比較桶中存儲的 tophash 和鍵的哈希,如果找到了相同結果就會返回目標位置的地址,獲得目標地址之後會通過算術計算尋址獲得鍵值對 kval

src/runtime/map.go:619

 //inserti 表示目標元素的在桶中的索引
  var inserti *uint8
  // insertk 和 val`分別表示鍵值對的地址
 var insertk unsafe.Pointer
 var val unsafe.Pointer
bucketloop:
 for {
  for i := uintptr(0); i < bucketCnt; i++ {
      // 判斷 tophash 是否相等
   if b.tophash[i] != top {
    if isEmpty(b.tophash[i]) && inserti == nil {
          // 插入數據inserti,insertk會記錄此空元素的位置
     inserti = &b.tophash[i]
     insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
     val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
    }
    if b.tophash[i] == emptyRest {
     break bucketloop
    }
    continue
   }
   k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
      // key 是否相等
   if !alg.equal(key, k) {
    continue
   }
   val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
   goto done
  }
  ovf := b.overflow(t)
  if ovf == nil {
   break
  }
    // 溢出桶變量賦值給b,開始遍歷溢出桶
  b = ovf
 }

桶滿後存到溢出桶:

如果當前桶已經滿了,哈希會調用 runtime.hmap.newoverflow 創建新桶或者使用 runtime.hmap 預先在 noverflow 中創建好的桶來保存數據,新創建的桶不僅會被追加到已有桶的末尾,還會增加哈希表的 noverflow 計數器。

src/runtime/map.go:662

 if inserti == nil {
  newb := h.newoverflow(t, b)
  inserti = &newb.tophash[0]
  insertk = add(unsafe.Pointer(newb), dataOffset)
  val = add(insertk, bucketCnt*uintptr(t.keysize))
 }

 typedmemmove(t.key, insertk, key)
 *inserti = top
 h.count++

done:
 return val
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/bogsT5hBGxwV8-XtoxAqXQ