2 萬字圖解 map

map 是什麼

in computer science, an associative array, map, symbol table, or dictionary is an abstract data type composed of a collection of (key, value) pairs, such that each possible key appears at most once in the collection.

上面引用的是維基百科對 map 的定義,意思是說,在計算機學科中,map 是一種抽象的數據結構,它由 key 和 value 組成組成鍵值對的集合,在集合中每個 key 最多出現一次。像關聯數組、符號表、字典數據結構都是 map 的一種具體實現 map 數據結構在實際的項目使用的非常頻繁,很多語言都提供了 mpa 數據結構,像 Java 語言的 HashMap,Go 語言中的 map 和 sync.Map 數據類型。map 基本操作包含添加 key 和 value 鍵值對,獲取 key 對應的 value, 刪除 key, 遍歷操作。

map 實現方法

map 的實現主要有兩種方法:hash table(哈希表) 和 search tree(搜索樹)

哈希表

哈希表背後的基本思想是通過索引訪問數組元素是一個簡單的、恆定時間的操作,對於給定的 key 通過哈希函數計算,得到一個數值,然後將該數值映射到數組下表,value 就存儲在這個位置。因此,哈希表操作的平均開銷只是計算鍵的哈希值,並結合訪問數組中的相應桶。因此,哈希表通常在 O(1) 時間內完成操作,並且在大多數情況下優於其他方案。哈希表的兩個關鍵要素是哈希函數和哈希衝突的處理,理解了這兩個元素,基本搞懂了哈希表。下面分別講講哈希函數和衝突處理。哈希函數是可以用於將任意大小的數據映射到固定大小值的函數,通俗解釋是對一串數據 data1 進行雜糅,輸出另一段固定長度的數據 data2,作爲這段數據的特徵, 常見的哈希函數有 MD5、SHA。

那這個 hash 函數有什麼特別的呢?是不是任意一個函數都可以作爲 hash 函數,顯然不是的。hash 函數選擇對於 Map 的性能啓動關鍵性的作用,一個好的 hash 函數,應該具有以下 5 性:

哈希衝突,根據前面哈希函數的定義,是將任意大小的輸入數據映射到固定大小的輸出,也就是說輸入是無限的,輸出是有限的,從無限到有限的集合映射很難找到一個一一映射,很可能存在着多個輸入對應到一個輸出的情況,即多個 key 映射到同一個 value,這就是哈希衝突,既然哈希衝突不能避免,我們需要處理哈希衝突的方法。常用的哈希衝突解決方法有鏈地址法和開放尋址法兩種。

下面通過一個實際的例子,說明鏈地址方法和開放尋址法的一些細節。需要裝載的數據爲一組整數數據 [21,2,1,56,6,40], 哈希函數 f(k)=k%7。通過鏈地址創建哈希的過程如下

通過線性探測法創建哈希的過程如下

上面介紹了哈希表中兩個關鍵點哈希函數和哈希衝突解決方法,還有一個概念需要我們理解,此概念是裝填因子。裝填因子是表示哈希表中元素的填滿程度。它的計算公式:裝載因子 = 填入哈希表中的元素個數 / 哈希表的長度。裝載因子越大,填入的元素越多,空間利用率就越高,但發生哈希衝突的幾率就變大。反之,裝載因子越小,填入的元素越少,衝突發生的幾率減小,但空間浪費也會變得更多,而且還會提高擴容操作的次數。裝載因子也是決定哈希表是否進行擴容的關鍵指標,在 Go 中,當裝填因子達到 6.5 時就會觸發擴容操作。

對比鏈地址法和開放尋址法,兩種方法各有優缺點。

ArGgly

實際使用的時候要根據具體的業務場景進行選擇,當數據量已知,裝填因子小的的時候,適合採用開放尋址法,當存儲的數據量很大,大小不明確時,採用鏈地址法更靈活,衝突處理效率高。

搜索樹

搜索樹也是一種 map 的實現方式,無論是用鏈地址法還是開放尋址法實現的 mpa,本質都是使用的線性數據結構實現的 map。用搜索樹實現 map 本質是用樹形結構實現 map. 根據採用的樹形結構不同,利用搜索樹實現 map 也有多種實現方法,用的比較多的有二叉搜索樹和平衡搜索二叉樹。像 Java 中提供的 TreeMap 結構,就是用搜索樹實現的 map.

下面以二叉搜索樹實現 map 爲例,採用圖形化方式解釋以二叉搜索樹實現的 mpa 是如何工作的。添加一組數據到 map 中 {<21,"hello">,<2,"mingyong">,<1,"smile">,<56,"cheap">,<40,"shop">}

Go 中 map 實現原理

Go 語言中 map 是使用哈希表方式實現的,解決哈希衝突採用的是鏈地址法。整個結構由桶和鏈表組成,下面是它的整體結構圖。先對 map 有個整體印象,後面會結合 map 的操作代碼實現進行說明。桶指的是下圖中 buckets 指向的 []bmap 數組。bmap 的 overflow 串聯成鏈表的結構

下面會先講 map 的結構定義,然後從創建、查找、賦值、遍歷、刪除和擴容 6 個方面分別詳細講述實現細節。注意下面分析 Go 源碼是 1.14 版本,涉及到的文件有 src/runtim/map.go, src/cmd/compile/internal/gc/reflect.go,

map 數據結構

hmap 是 map 的結構體頭,它維護了一個 map 對象的所有信息。申明一個 map 對象,var m map[int]int, 此時的 m 是一個 nil 指針。當通過 m=make(map[int]int) 初始化之後,m 是指向一個 hmap 對象的指針。也就是說此時的 m 類型是 * hmap.hmap 結構體中最重要的一個字段是 buckets, 它指向 bucket 數組,此外還有記錄 map 中已存放數據個數的 count, 描述桶的數量即 bucket 數組大小的 B.

// map的結構體頭信息
type hmap struct {
 // 記錄當前map中k-v鍵值對的數量,即map當前有多少元素
 count int 
 // 標記狀態map操作過程中的狀態用的,後面會有介紹
 flags uint8
 // B決定哈希桶的數量,桶的數量爲2^B個
 B uint8 
 // 溢出桶的大概數量
 noverflow uint16 
 // 哈希種子,計算key的哈希值時會用到
 hash0 uint32 
 // 指向桶的數組,桶的個數爲2^B個,每個桶中的元素是bmap,如果map中沒有數據,buckets可能爲nil
 buckets unsafe.Pointer 
 // 指向舊桶的指針,map在擴容的時候會用到
 oldbuckets unsafe.Pointer 
 // 標識擴容進度,小於此地址的buckets代表已搬遷完成,在後面的擴容講解中會有說明
 nevacuate uintptr 
 // extra是爲了降低GC掃描而設計的。當key和value均不包含指針,並且都可以inline時使用,桶的數據是存在extra.overflow中的
 extra *mapextra 
}


type mapextra struct {
 // 對於key和value中的值都不包含指針,並且可以內聯(不大於128字節)的情況下,用hmap的extra字段來存儲overflow buckets
 // 可以避免GC掃描整個map. 然而bmap中overflow字段是個指針,爲了保證overflow指向桶的生命週期,將溢出桶保存在extra.overflow
 // 切片中,爲了保證oldbucket中的桶溢出桶生命週期,將其保存在oldoverflow切片中
 overflow    *[]*bmap
 oldoverflow *[]*bmap
    
 // 指向預分配的溢出桶的位置
 nextOverflow *bmap
}

申明一個 map 對象,var m map[int]int, 此時的 m 是一個 nil 指針。當通過 m=make(map[int]int) 初始化之後,m 是指向一個 hmap 對象的指針。也就是說此時的 m 類型是 * hmap. hmap 結構體中最重要的一個字段是 buckets, 它指向 bucket 數組,此外還有記錄 map 中已存放數據個數的 count, 描述桶的數量即 bucket 數組大小的 B.

下面着重看 bucket(桶) 結構,定義在 bmap 結構中,包含的字段如下

type bmap struct {
 // tophash是一個[8]uint8數組,存儲該桶中每個鍵的哈希值的最高8位信息,因爲該數組大小爲8
 // 所以也就是說每個桶能夠存儲的鍵數量有8個,注意該結構在編譯期間會動態添加字段。添加字段存儲
 // 有8個key和8個value,還有一個*overflow字段。
 tophash [bucketCnt]uint8
}

需要注意的是,上面看到的 bmap 只是靜態結構,在編譯期間會動態添加存儲 8 個 key 和 8 個 value,以及一個 * overflow 信息。動態添加處理的代碼在 src/cmd/compile/internal/gc/reflect.go 中的 bmap,下面附上的是添加的關鍵代碼,省略了不其他內容。

// 動態創建bmap
func bmap(t *types.Type) *types.Type {
 ...
    
 field := make([]*types.Field, 0, 5)

 // The first field is: uint8 topbits[BUCKETSIZE].
 // 第一個字段是[8]uint8
 arr := types.NewArray(types.Types[TUINT8], BUCKETSIZE)
 field = append(field, makefield("topbits", arr))
    // 添加8個key
 arr = types.NewArray(keytype, BUCKETSIZE)
 arr.SetNoalg(true)
 keys := makefield("keys", arr)
 field = append(field, keys)
    // 添加8個value
 arr = types.NewArray(elemtype, BUCKETSIZE)
 arr.SetNoalg(true)
 elems := makefield("elems", arr)
 field = append(field, elems)
    ...
    return bucket
}

綜上,可以得到 bucket(桶)的內存結構,這裏畫了一張 bucket 的結構圖,方便讀者理解,圖中每個 tophash 稱之爲一個槽位,一共有 8 個槽位。每個槽位對應了一個存放 key 和 value 的位置,每個 tophash、key 和 value 佔用的內存空間都是固定的,所以根據 tophash 結合偏移量很容易得到 key 和 value 的位置。

如上圖,並不是按照一個 key 一個 value 形式存儲的,而是先存儲 8 個 key,然後在存儲 8 個 value,作者解釋了這樣存儲的好處是是能讓我們消除例如 map[int64]int 所需要的填充(padding)。最後一個字段 overflow 指針,該字段用於指向下一個桶的位置,因爲一個桶中最多隻能裝 8 個鍵值對,如果有多餘的鍵值對落到了當前桶,那麼就需要再構建一個溢出桶,通過 overflow 指針串聯起來。

map 創建

map 只能通過 make 進行初始化,在初始化的時候可以指定大小也可以不指定大小。

// 未指定初始化大小,實際上創建的大小爲0
m:=make(map[int]int)
// 指定創建存儲8個數據大小的map
m=make(map[int]int,8)

對於初始化小於等於 8 個數據的 map 時,會調用 makemap_small 函數。大於 8 個的時候,會調用 makemap 函數。

// makemap_small用於創建小數據量的map,當不指定初始化大小(即大小爲0)或者當前初始化大小
// 小於等於8(bucketCnt)的時候,會調用次函數,並直接在堆上分配內存
func makemap_small() *hmap {
 h := new(hmap)
 h.hash0 = fastrand()
 return h
}

創建 map 大小大於 8 時,會調用 makemap 函數,編譯器會自動分析要創建的 map 或 map 中的 bucket 是否能夠放在棧上。當桶的數量不小於 16 的時候,會產生 2^(b-4) 個溢出桶,採用了預分配的方法,提前多分配幾個桶,保存在 hmap.extra.nextOverflow 中。

// makemap是通過make(map[key]value,9)創建一個map的實現
// 編譯器會自動分析要創建的map或map中的bucket是否能夠放在棧上,
// 如果不能放在棧上,會創建在堆上,具體什麼情況下會放在棧上,下面
// 的解析中有說
// 在創建的時候,如果map結構頭h已經非空,直接使用不會新建,同樣
// 對於h中指向的bucket地址已存在,不會重新分配
func makemap(t *maptype, hint int, h *hmap) *hmap {
 // 計算創建map要分配的內存
 mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
 // 如果需要分配的內存超過最大可分配的內存大小或是內存過大數字溢出了,
 // 強行設置hint爲0,即初始化創建有0個元素的map
 if overflow || mem > maxAlloc {
  hint = 0
 }

 // 如果h爲空,新分配map結構頭h內存
 if h == nil {
  h = new(hmap)
 }
 // 產生一個隨機數據,作爲哈希計算時候的種子
 h.hash0 = fastrand()

 B := uint8(0)
 // 根據傳入的map的大小hint,找到一個合適的B值,這個B值是滿足裝填因子6.5*2^B不小於
 // hint的一個最小值
 // 如果創建map大小不大於8,則B值爲0
 for overLoadFactor(hint, B) {
  B++
 }
 h.B = B

 // B值不爲0,根據B值創建2^B個桶,也就是B值爲4,會創建16個桶
 // 對於B值爲0的情況,這裏不做處理,會延遲在mapassign即向map添加元素的時候分配
 // 如果hint是一個很大的值,即B也會很大,清理這片內存會花費比較長的時間
 if h.B != 0 {
  var nextOverflow *bmap
  // 創建桶,分配相應的內存
  h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
  if nextOverflow != nil {
   // 如果有溢出桶產生,將溢出桶的位置保存在h.extra.nextOverflow中
   h.extra = new(mapextra)
   h.extra.nextOverflow = nextOverflow
  }
 }

 return h
}

func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
 // 計算桶的數量=2^b
 base := bucketShift(b)
 nbuckets := base
 
 // 對於b很小的情況,桶的數量也會很少,使用到溢出桶的概率會很小,就不
 // 分配溢出桶,也就不用做一些產生溢出桶相關的計算
 if b >= 4 {
  // 當b大於等於4,即桶的數量不小於16的時候,會產生2^(b-4)個溢出桶
  nbuckets += bucketShift(b - 4)
  sz := t.bucket.size * nbuckets
  up := roundupsize(sz)
  if up != sz {
   nbuckets = up / t.bucket.size
  }
 }

 if dirtyalloc == nil {
  // 新創建nbuckets個桶
  buckets = newarray(t.bucket, int(nbuckets))
 } else {
  // dirtyalloc已經有值,之前已經有分配過bucket,將之前的桶中內容清理掉,
  // 繼續複用之前已經分配的桶
  buckets = dirtyalloc
  size := t.bucket.size * nbuckets
  if t.bucket.ptrdata != 0 {
   // 清理掉有指針的buckets的數據
   memclrHasPointers(buckets, size)
  } else {
   // 清理掉沒有堆上指針的buckets的數據
   memclrNoHeapPointers(buckets, size)
  }
 }

 if base != nbuckets {
  // 在b大於等於4的時候,會預先分配一些溢出桶
  // 爲了將跟蹤這些溢出桶的開銷保持在最低限度,使用了下面的約定:
  // 如果預先分配的桶的overflow字段值是nil的,通過碰撞指針可以獲得更多的桶
  // 對於最後一個預先分配的溢出桶last, 需要將其overflow指向一個非nil值,這裏實際
  // 指向的是第一個桶buckets的位置
  nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
  last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
  last.setoverflow(t, (*bmap)(buckets))
 }
 return buckets, nextOverflow
}

讀到這裏,讀者可能會有一個疑問,創建 map 的時候哈希函數是怎麼選擇的呢?其實哈希函數的選擇是在運行的時候初始化的,不用我們關心。具體初始的邏輯在 src/runtime/proc.go 文件的 schedinit 函數中。我們先來看下 schedinit 函數。schedinit 函數中的 alginit 函數就是處理哈希函數選取的。根據當前程序編譯的目標架構判斷是否支持 aes, 如果支持就選取 ase hash,具體實現是用匯編寫的,在 runtime 下的 asm_{386,amd64,arm64}.s 中。若不支持,採用啓發時產生,實現在 runtime 下的 hash32.go 和 hash64.go 中。

func schedinit() {
    ...
 stackinit()
 mallocinit()
 fastrandinit() // must run before mcommoninit
 mcommoninit(_g_.m)
 cpuinit()       // must run before alginit
    // 在使用map的時候,alginit函數必須已經被調用
 alginit()       // maps must not be used before this call
 modulesinit()   // provides activeModules
  ...
}


func alginit() {
 // Install AES hash algorithms if the instructions needed are present.
 if (GOARCH == "386" || GOARCH == "amd64") &&
  cpu.X86.HasAES && // AESENC
  cpu.X86.HasSSSE3 && // PSHUFB
  cpu.X86.HasSSE41 { // PINSR{D,Q}
  initAlgAES()
  return
 }
 if GOARCH == "arm64" && cpu.ARM64.HasAES {
  initAlgAES()
  return
 }
 getRandomData((*[len(hashkey) * sys.PtrSize]byte)(unsafe.Pointer(&hashkey))[:])
 hashkey[0] |= 1 // make sure these numbers are odd
 hashkey[1] |= 1
 hashkey[2] |= 1
 hashkey[3] |= 1
}

maptype(runtime 下的 type.go 文件中) 結構體定義的是 map 的 type 類型,裏面有個 hasher 字段,該字段會存儲上面選擇的哈希函數。hasher 函數接收 2 個入參,第一個參數就是指向 key 的地址,第二個參數是 h.hash0,h.hash0 是通過 fastrand() 得到的。調用 haser 函數之後,就得到了 key 對應的哈希值。

map 查找

map 查找就是獲取 map 中 key 對應的 value. 有兩種獲取方法。分別是

// 只有一個返回值,如果key在map中存在,就返回vlaue的值。如果key不在map中,返回的value是一個零值,是一個無效的值。哪怎麼知道value到底是有效的還是無效的,於是就有了下面的這種帶有2個返回值的調用
value:=m[key]

// 第二返回值表示key在map中是否存在,如果存在,返回true,否則返回false.
value,ok:=m[key]

對應到實現來說,就是 mapaccess1 和 mapaccess2 函數。在分析查找實現前,我們先來看一個 key 的定位問題,就是給一個 key 怎麼確定它在哪個桶的哪位槽位。理解了這個,下面的 mapaccess1 就很好理解了。假設 hmap 中的 B 爲 5,即 map 的桶有 2^5=32 個,對添加的 key 通過哈希函數計算後得到的值爲 64bit 數據。現在有一個鍵值對 {k,v} 被添加到 map 中,對 k 計算出來的哈希值爲 hash(k),現在如果是你設計一個方法將 hash(k)放在哪個桶,你會採用什麼方法?對 hash(k)取模就可以,非常棒。官方作者就是這麼實現的,落在桶的公式爲 hash(k)%32=hash(k)%(2^B), 就是將對 key 計算出來的哈希值模與桶的數量,得到的餘數肯定是在 [0,2^B) 中。實現的時候,作者並沒有採用 % 運算,而是更巧妙的對二進制進行 & 運算。bucketMask 計算的是通的掩碼,它的返回值是 2^B-1, 對 hash(k) &bucketMask 實際就是 hash(k)%(2^B). 那落在桶內的哪個槽位是怎麼確定的呢?因爲桶內的槽位有 8 個,取哈希值的最高 8bit 對應的值放在 tophash 中,8bit 的最大值爲 255,tophash 是 uint8 類型是完全可用放入的。具體放在哪個槽位,是根據從小到大順序。先放到槽位 0 即 tophash0 的位置,如果該位置已放有數據,就繼續看一下個位置 tophash1, 如果 tophahs0 到 tophash7 都有數據了,那說明當前桶已經滿了,繼續看它的溢出桶即 overflow 指向的桶。這裏注意一點,怎麼判斷一個槽位是否被佔用。採用的方法是判斷 tophash 值,tophash 值的 0 到 4 是狀態標誌。最小有效的值爲高 8bit 的值 + 5. 具體狀態標誌見下面的 tophash 值特殊含義。

上圖中 B 爲 5,一共有 34 個桶,其中 32 和 33 號是溢出桶,常規桶是 32 個(0 至 31 號),key 經過哈希函數計算之後得到的 64bit 數據爲 0010110000001010100000111010101101010011100001110101010101000111,根據上述規則,最後 5 個 bit 位 00111,對應的十進制爲 7,所以此 key 落在第 8 個桶(編號爲 7 的桶),然後在根據最高 8 位是 00101100,對應的十進制是 44,在桶 7 中從左向右找到一個未被佔用的槽位,找到了第 5 個槽位(綠色標註的)未被佔用,然後將 44 放在第 5 桶的 hashtop 中,最後將原本的 key 和 value 放到第 5key 槽位和第 5 個 value 槽位。

// bucketMask桶的掩碼,值爲2^b-1
func bucketMask(b uint8) uintptr {
 return bucketShift(b) - 1
}
// mapaccess1 返回map中key對應的value,如果key在map中不存在,不會返回nil值,而是返回
// value類型的零值,需要注意的是返回指針可能導致map長時間存活,所以不能長時間持有返回的指針
// 對應到應用層的使用方式是 value:=m[key]
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
 if raceenabled && h != nil {
  callerpc := getcallerpc()
  pc := funcPC(mapaccess1)
  racereadpc(unsafe.Pointer(h), callerpc, pc)
  raceReadObjectPC(t.key, key, callerpc, pc)
 }
 if msanenabled && h != nil {
  msanread(key, t.key.size)
 }
 // h爲nil表示map還沒初始化,h.count爲0說明map中還沒有元素
 // 這裏兩種情況下,直接返回value類型的零值,並不是返回nil
 if h == nil || h.count == 0 {
  if t.hashMightPanic() {
   t.hasher(key, 0) // see issue 23734
  }
  return unsafe.Pointer(&zeroVal[0])
 }
 // map正在擴容,即map已經有寫操作,現在又在進行讀操作,不被允許,panic
 if h.flags&hashWriting != 0 {
  throw("concurrent map read and map write")
 }
 // 計算key的哈希值
 hash := t.hasher(key, uintptr(h.hash0))
 // 計算桶的掩碼,假如B爲5,那桶的數量=2^B=2^5=32個,編號從0到31
 // bucketMask計算出來值爲32-1,即5個1
 m := bucketMask(h.B)
 // 所有的桶是一個數組,根據數組第一個元素即第一個桶h.buckets的位置
 // hash&m計算出了key所在桶數組的下標,桶的大小t.bucketsize
 // 根據第一個桶的位置,加上桶的偏移量(下標)*桶的大小,可以定位到key所在桶
 // 的地址
 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
 if c := h.oldbuckets; c != nil {
  //  判斷map是否正常擴容,如果沒有正在擴容,將m擴大一倍
  if !h.sameSizeGrow() {
   
  // m變爲原來的一半,新桶backet是舊桶oldbuckets的兩倍大
   m >>= 1
  }
  // 獲取到key在舊桶中的地址
  oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
  if !evacuated(oldb) {
   b = oldb
  }
 }
 top := tophash(hash)
bucketloop:
 for ; b != nil; b = b.overflow(t) {
  for i := uintptr(0); i < bucketCnt; i++ {
   // key的哈希值高位不匹配
   if b.tophash[i] != top {
    // 當前索引位置已經沒有數據,所以不用繼續循環查找更大索引位置,也不用再overflow中繼續查找了
    if b.tophash[i] == emptyRest {
     break bucketloop
    }
    continue
   }
   // 根據基地址b和偏移量計算出key所在的位置
   k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
   // 如果key是間接引用,獲取它間接引用的值
   if t.indirectkey() {
    k = *((*unsafe.Pointer)(k))
   }
   // 比較用戶傳入的key和map存儲的k是否相同,如果相等,獲取value的值並返回
   if t.key.equal(key, k) {
    // 根據基地址的位置+8個key佔用的空間大小+當前value的偏移量計算出value所在的位置
    e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
    // 如果value元素是間接引用,獲取它間接引用的值
    if t.indirectelem() {
     e = *((*unsafe.Pointer)(e))
    }
    // 返回value
    return e
   }
  }
 }
 // key不在map中,返回value類型的零值,並不是返回nil
 return unsafe.Pointer(&zeroVal[0])
}

// mapaccess2 也是查詢key是否在map中,與mapaccess1不同的是有兩個返回值,第二個返回值
// 表示key在不在map中,對應到應用使用方式是  value,ok:=m[key]
func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool) {
 ...
    // mapaccess2處理邏輯除多了一個bool類型的返回值,其他與mapaccess1一模一樣,這裏省略了
 return unsafe.Pointer(&zeroVal[0])false
}

tophash 值的特殊值含義

// emptyRest 當前的索引位置是空的,比當前位置還大的索引位置沒有數據了,在溢出桶overflows中的bucket也沒有數據了
 emptyRest      = 0 
 // 表示槽位是空的,即還沒有放數據
 emptyOne       = 1 
 // 表示槽位正在擴容,數據會被搬遷到新桶的前半部分,前後半部分是指新桶是舊的2倍大小,新桶的前一半爲前半部分
 evacuatedX     = 2 
 // 表示槽位正在擴容,數據會被搬遷到新桶的後半部分
 evacuatedY     = 3 
 // 表示該槽位已經搬遷完畢
 evacuatedEmpty = 4 
 // tophash的最小值
 minTopHash     = 5

上面這張圖描述了 mapaccess 的處理流程,結合前面 key 的定位過程,就非常容易理解,整個大體流程就是找到 key 所在的桶之後,在外層循環訪問桶,內層循環訪問桶中的槽位,先判斷 tophash 值是否相等,不相等肯定不是同一個 key,相等之後要進一步取出 key 進行完全比較,只有全相等纔算找到了。如果當前桶沒有找到,繼續在當前桶指向的溢出桶中繼續查找。

map 賦值

賦值操作是在 mapassign 中處理的,當添加的鍵值對中的 key 存在時,更新 value 的值,如果不存在,找到 key 落在的桶的合適槽位,將其 key 和 value 的值保存起來。

// mapassign 向map中添加元素
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
 // h爲nil,即map還未初始化,沒有執行make操作,向裏面添加數據,直接產生panic
 if h == nil {
  panic(plainError("assignment to entry in nil map"))
 }
 // 檢查data race
 if raceenabled {
  callerpc := getcallerpc()
  pc := funcPC(mapassign)
  racewritepc(unsafe.Pointer(h), callerpc, pc)
  raceReadObjectPC(t.key, key, callerpc, pc)
 }
 if msanenabled {
  msanread(key, t.key.size)
 }
 // map 正在擴容,拋出異常
 if h.flags&hashWriting != 0 {
  throw("concurrent map writes")
 }
 // 計算key的哈希值
 hash := t.hasher(key, uintptr(h.hash0))

 // 設置map正在寫入標記,從這裏開始,其他地方不能再對map進行讀取和寫入操作了
 // 設置標記這一步操作要放在計算hash之後,因爲t.hasher可能產生panic.但是
 // map還未開始真正寫入
 h.flags ^= hashWriting

 // 桶buckets還未初始化,調用newobject先初始化buckets,桶的大小爲1
 if h.buckets == nil {
  h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
 }

again:
 // 計算key落在哪個桶
 bucket := hash & bucketMask(h.B)
 // 檢查map是否正在擴容,判斷依據是oldbuckets是否不爲nil
 if h.growing() {
  growWork(t, h, bucket)
 }
 // 計算出bucket桶所在的內存地址,計算法方法=基地址(h.buckets)+落在哪個桶(bucket)*桶的大小(t.bucketsize)
 b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
 // 提取出hash最高位8個bit對應的值,如果值小於5會被設置爲基礎值5
 top := tophash(hash)

 var inserti *uint8
 var insertk unsafe.Pointer
 var elem unsafe.Pointer
bucketloop:
 for {
  // 在桶內按照槽位cell從0到7順序查找是否有key存在,每個桶裝有8個k-v數據
  for i := uintptr(0); i < bucketCnt; i++ {
   // hash出來的top值不相同,進一步判斷b.tophash[i]是不是空的
   if b.tophash[i] != top {
    // 判斷當前的第i個槽位是不是空的,如果是空的,很開心終於找到一個可以插入的位置
    // inserti、insertk和elem是一組賦值變量記錄插入元素的槽位地址、插入的key所在的地址、插入的value所在的地址
    if isEmpty(b.tophash[i]) && inserti == nil {
     inserti = &b.tophash[i]
     insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
     elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
    }
    // 槽位值0和1都表示是空操作,這裏進一步判斷槽位值是不是0,如果是0表示比當前還大的槽位k-v已經被重置了,
    // 就不用繼續循環檢查後面槽位是否存在相同的key了,因爲肯定是不存在的
    if b.tophash[i] == emptyRest {
     // 後面槽位中肯定不存在key值了,直接跳出循環
     break bucketloop
    }
    // 這裏continue,就是還要繼續走循環處理邏輯,爲啥還要繼續呢?因爲當前的槽位key不相同,並不代表後面槽位中
    // 不存在該key了。因爲正常來說向map中添加數據,肯定是從槽位0-7這樣的順序添加的,即
    // h[0],h[1],h[2],empty,empty,empty,empty,empty  (empty表示當前槽位還未有數據)
    // 但隨着對map的刪除,會出現空洞,即可能不像上面這樣,沒有槽位是連續的並集中在右邊,出現的是下面的情況
    // h[0],empty,h[2],empty,empty,empty,empty,empty
    continue
   }
   // 走到這裏,說明key計算出的哈希值值的高8bit值和b.tophash[i]相同
   k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
   // 如果直接存儲的不是key,進行一級尋址獲取裏面的內容
   if t.indirectkey() {
    k = *((*unsafe.Pointer)(k))
   }
   // 真正判斷key是否相同,前面只是判斷了高8bit相同,並不一定key就是相同的
   if !t.key.equal(key, k) {
    continue
   }
   // already have a mapping for key. Update it.
   // 走到這裏表示map中key值已經存在了,如果有必要更新,就更新key值
   if t.needkeyupdate() {
    typedmemmove(t.key, k, key)
   }
   // elem保存key對應value的地址
   elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
   // key已經存在了,不用繼續循環查找了,直接跳到done的地方執行後面的處理
   goto done
  }
  // 當前的桶中8個槽位沒有找到,繼續在溢出桶中查找
  ovf := b.overflow(t)
  // 如果沒有溢出桶,跳出循環,key在當前的map中不存在的
  if ovf == nil {
   break
  }
  b = ovf
 }

 // 走到這裏,說明在當前桶和它的溢出桶中都沒有找到合適的槽位來保存key和value
 // 檢查map是否需要擴容,擴容的觸發條件有overLoadFactor和tooManyOverflowBuckets
 // 滿足任何一個都會觸發,overLoadFactor檢查裝填因子是否大於6.5,大於6.5會觸發擴容
 // tooManyOverflowBuckets檢查map中溢出桶的數量是否過多,過多也會觸發擴容,
 // 詳細分析見map擴容
 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
  hashGrow(t, h)
  goto again // Growing the table invalidates everything, so try again
 }

 // 走到這裏key肯定是不在map中的,如果key在map中,會直接跳到下面done的位置
 // inserti爲空,就是在當前桶的槽位中和當前桶的溢出桶的槽位中都沒有找到插入位置
 // 表明已經沒有插入位置,需要新添加一個溢出桶,將數據添加到裏面
 if inserti == nil {
  // all current buckets are full, allocate a new one.
  // 新建一個溢出桶,將數據插在0號槽位
  newb := h.newoverflow(t, b)
  inserti = &newb.tophash[0]
  insertk = add(unsafe.Pointer(newb), dataOffset)
  elem = add(insertk, bucketCnt*uintptr(t.keysize))
 }

 // 將key-value值添加到待添加到insertk和elme保存的地址中
 if t.indirectkey() {
  // new一個key類型的對象kmem,將insertk中的內容設置爲kmem地址
  kmem := newobject(t.key)
  *(*unsafe.Pointer)(insertk) = kmem
  insertk = kmem
 }
 if t.indirectelem() {
  // new一個elem類型的對象vmem,將elem中的內容設置爲vmem地址
  vmem := newobject(t.elem)
  *(*unsafe.Pointer)(elem) = vmem
 }
 // 將key內容拷貝到insertk的位置
 // 注意這裏只看到有將key的內容拷貝到insertk保存起來,但沒有看到將elem
 // 保存起來,是漏了嗎?是編譯器做了額外的處理,可以通過反彙編查看
 typedmemmove(t.key, insertk, key)
 *inserti = top
 // map中元素+1
 h.count++

done:
 if h.flags&hashWriting == 0 {
  throw("concurrent map writes")
 }
 // 清理掉map的真正寫操作標記
 h.flags &^= hashWriting
 if t.indirectelem() {
  elem = *((*unsafe.Pointer)(elem))
 }
 // 返回數據elem的地址
 return elem
}

func (h *hmap) newoverflow(t *maptype, b *bmap) *bmap {
 var ovf *bmap
 // makemap的時候有給h.extra和h.extra.nextOverflow賦值
 // h.extra.nextOverflow記錄的是溢出桶的位置,在make創建
 // 一個新map的時候,會預分配一些溢出桶,這裏在需要新的桶的時候
 // 先從預分配的溢出桶裏面獲取,如果沒有獲取到,纔會new一個新
 // bucket
 if h.extra != nil && h.extra.nextOverflow != nil {
  ovf = h.extra.nextOverflow
  // 檢查ovf是不是預分配溢出桶的最後一個桶,預分配的溢出桶的最後一個桶的
  // overflow做了特殊的指向處理,指向的是map的一個bucket地址,不是最後
  // 一個桶的overflow是空的
  if ovf.overflow(t) == nil {
   // 不是最後一個預分配溢出桶的最後一個桶,那就使用當前的這個桶,並將記錄下一個可供使用的
   // 桶的標記nextOverflow向後移動一個位置,供下一次調用時使用
   h.extra.nextOverflow = (*bmap)(add(unsafe.Pointer(ovf), uintptr(t.bucketsize)))
  } else {
   // 如果桶的overflow爲非空,那說明當前的這個桶已經是預分配溢出桶的最後一個桶了,
   // 將當前桶overflow指向標記去掉(makemap的時候指向的是第一個桶的位置),並
   // 將記錄預分溢出桶標記nextOverflow置空,因爲所有的預分配溢出桶都已經使用完了
   // 後續在調用newoverflow方法時就需要進行下面的new分配了
   ovf.setoverflow(t, nil)
   h.extra.nextOverflow = nil
  }
 } else {
  // 預分配的溢出桶都已經使用完了,new一個新的桶
  ovf = (*bmap)(newobject(t.bucket))
 }
 h.incrnoverflow()
 // map中的存儲的數據value不含有指針的情況
 if t.bucket.ptrdata == 0 {
  h.createOverflow()
  // 溢出桶保存在extra.overflow切片中,如果存儲數據value含有指針
  // 不會存在在extra中,這樣處理是在不含有指針時不用進行GC檢查
  *h.extra.overflow = append(*h.extra.overflow, ovf)
 }
 // b的overflow指向新的桶ovf,這樣新的桶ovf可以跟前面的桶串起來了
 b.setoverflow(t, ovf)
 return ovf
}

// incrnoverflow 方法用來增加h.noverflow的值,noverflow字段保存的是溢出桶的數量
// 該方法是給tooManyOverflowBuckets調用的,目的是爲了觸發同等map的擴容
// 爲了讓hmap比較小,noverflow使用的是uint16類型,當桶的數量比較小時,
// noverflow記錄的溢出桶數是一個精確值,當桶的數量很多時,noverflow記錄的是一個近似值
func (h *hmap) incrnoverflow() {
 // 如果溢出桶與桶一樣多,會觸發相同大小的mpa擴容,
 // noverflow能夠計數到2^B
 if h.B < 16 {
  // 直接加1
  h.noverflow++
  return
 }
 // 當B值大於等於16時,概率性增加noverflow的值,概率大小爲1/(2^(h.B-15))
 // 也就是當B的值達到2^15-1時候,近似記錄溢出桶的數量,因爲noverflow是uint16類型
 // 如果還是直接不斷加1,會存在溢出
 mask := uint32(1)<<(h.B-15) - 1
 // 例如當h.B=18的時候,mask的值爲7,然後調用fastrand()產生一個隨機數,再對該隨機數
 // 與7進行相&處理,即對該數除8求其餘數,如果餘數是0,纔對noverflow加1,可見這個概率爲1/8
 if fastrand()&mask == 0 {
  h.noverflow++
 }
}

看完了 mapassign 邏輯,可能有點疑惑,函數簽名中咋看不到 value 參數呢?函數返回的值是 elem 指向地址,即 value 所在的內存地址,代碼中看到有對 key 保存處理,typedmemmove(t.key, insertk, key) 這個行就是,但是沒有看到對 value 的保存處理。下面寫個 demo 翻譯成彙編,看看能否有新發現。

package main

func main() {
 m := make(map[int]int)
 m[1] = 123
}

上述代碼保存在 main.go 文件中,將其翻譯成彙編代碼,執行 go tool compile -S main.go 的關鍵內容如下圖所示,最後一步 MOVQ $123, (AX) 就是保存 value,這是由編譯器額外生成的彙編指令來完成的,可見靠 runtime 有些工作是沒有做完的。

map 擴容

搞清楚四個問題,我們就理解了 map 擴容。這四個問題分別是:1 爲什麼要擴容?2 什麼時候擴容?3 怎麼進行擴容?4 擴容會產生哪些影響?下面將依次回答這個幾個問題。爲什麼要擴容:我們使用 map 存取數據的目的就是因爲它非常高效,哈希函數取的非常好,理想情況下,獲取一個鍵值對只需要 O(1) 時間複雜度。但是實際是很難達到的,隨着 map 中存放的數據越來越多,向裏面添加數據發送碰撞的概率會變大,訪問數據效率會下降。爲了保證訪問效率,就需要在效率不高的時候進行擴容,衡量效率不高的依據是裝填因子。還是一種情況是,當一個大型 map 中大量數據被刪除之後,會留下很多 empty 的空槽位,在查找 key 的時候,不得不與這些大量空的 tophash 進行比較,但是這些比較是無意義的,只會浪費時間。所以將這種稀疏的桶進行搬遷之後,進行重新整理,儘量讓他們相鄰存儲,以提升查詢效率。

什麼時候擴容:在 map 賦值和刪除 key 的時候,會檢查是否需要擴容。觸發擴容有兩種情況:一種是裝填因子超過臨界值(6.5)時,即 map 中元素的數量 / 桶的數量 > 6.5, 因爲一個桶裝載 8 個元素,這說明大部分桶都快滿了,有些甚至有在使用溢出桶了。這個時候添加新元素,碰撞的概率就比較大。

func overLoadFactor(count int, B uint8) bool {
 return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

還有一種是溢出桶是否太多,當桶的數量超過 2^15 個時,如果溢出桶數量也超過 2^15,就判斷爲溢出桶太多,當桶的數量小於等於 2^15 時,如果溢出桶的數量大於桶的數量,也判斷爲溢出桶太多。

// tooManyOverflowBuckets 判斷溢出桶的數量是不是跟桶(2^B)的數量一樣多, 如果溢出桶的數量很多,
// 也就是出現了桶有大量槽位是空的情況,頻繁的刪除會導致槽位變空。大量的空槽位會使得查詢的時候效率很低,
// 可能要遍歷找完所以的槽位,爲了讓槽位變得密集,不留什麼"空洞",需要觸發擴容操作,新創建桶,之前的桶
// 稱爲舊桶,將舊桶中的數據搬遷到新桶中,這樣新桶中槽位是密集型的
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {

 // 判斷是否有太多桶的閾值不能太大或太小,太小就進行擴容並沒有大的意義,做的是無用功
 // 如果太大,無論是擴容還是縮容可能有大量分配的空間沒有被使用,白白浪費內存
 if B > 15 {
  B = 15
 }
 
 // 如果B大於15,強行設置B爲15,即B超過15,不管有多少,noverflow直接與2^15比較,如果大於表示map很稀疏
 // 如果B小於等於15,noverflow直接與2^B進行比較,如果大於表示map很稀疏
 return noverflow >= uint16(1)<<(B&15)
}

怎麼擴容:前面介紹了觸發擴容的兩種情況,根據觸發情況不同,在實現擴容的時候採用了不同策略。對於裝填因子超過臨界值 6.5 引起的擴容,會新建一個桶數組,B 值會加 1,即新桶的數量是原來桶的 2 倍,這是一種增量擴容,然後將舊桶中的數據搬遷到新桶中。對於有太多溢出桶引起的擴容,也會新建一個桶數組,但 B 值不變,即新桶的數量與舊桶的一樣,然後將舊桶中的數據搬遷到新桶,這個過程可以理解爲一個整理過程,相當於磁盤碎片整理,舊桶中的數據很分散,搬到新桶之後會相鄰存儲,新桶中數據密度很高。這種並不需要擴大桶的數量,因爲桶的數量是夠的,只不過是裝的數據稀疏。稱這種擴容爲等量擴容,其實是等量搬遷。下面的兩幅圖展示了增量擴容和等量擴容的流程,可以將圖結合下面的代碼一起看,更容易理解。

上圖展示的是增量擴容的流程,擴容之前,B 爲 2,即有 4 個桶,key0 和 key1 計算出來的哈希值分別爲 1001001010010010010101001110001001101010100001101001111000000010 和 1001001000010010010101001110001001101010100001101001111000000110,它們的最後 2 個 bit 都是 10,轉成十進制就是 2,所以它們落在 2 號桶,當增量擴容後,此時 B 爲 3,也就擴容之後有 8 個桶,現在需要根據 key 哈希值的最後 3 個 bit,決定它們落在新桶的位置,key0 的最後 3 位爲 010,對應的十進制還是 2,key1 的最後 3 位爲 110,對應的十進制爲 6,所以它們分別落在新桶的 2 號桶和 6 號桶,從根據最後 2bit 到看最後 3bit 的這個過程,稱之爲 rehash。總結起來,對於增量擴容,一個數據落在新桶的位置取決於它的 key 算出來的哈希值的第 B(擴容器前的)+1 位,設它在舊桶的桶號爲 X,如果 B+1 位爲 1,則它落在新桶的 X+2^B 號桶,如果 B+1 位爲 0,則它落在新桶的 X 號桶,即保存桶號不變。

等量擴容,新桶的數量和舊桶的數量相同,所以一個元素在老桶的桶號爲 X,則它在新桶的桶號也爲 X,變化的只是在桶中槽位,例如上圖中,key0 和 key7,它們都在一個桶號 2 中,只不過 key7 在溢出桶中,當進行擴容搬遷後,它們放在了新桶中的 2 號桶的相鄰位置,進行重新排位整合後,溢出桶可能就不需要了。下面來看擴容的代碼,跟擴容相關的函數有兩個,hashGrow 和 growWork 函數。hashGrow 函數只是分配好了新的 buckets,並沒有做搬遷工作,並將老的 buckets 掛到了 oldbuckets 字段上。真正搬遷 buckets 的動作在 growWork 函數中,而調用 growWork 函數的動作是在 mapassign 和 mapdelete 函數中。也就是插入 / 更新、刪除 key 的時候,都會嘗試進行搬遷 buckets 的工作。根據 oldbuckets 是否非 nil,來進行搬遷工作。

// hashGrow 進行擴容字段的創建初始化分配工作
func hashGrow(t *maptype, h *hmap) {
 
 // 如果是裝填超過裝填因子導致的map擴容,說明之前的桶數量小了,需要擴容更多的桶,
 // 新分配桶的數量是原來的2倍,如果是太多溢出桶導致的擴容,則進行等量擴容,不用
 // 申請更大數量的桶,只是之前的桶中數據太稀疏了,進行一次重新整理即可
 bigger := uint8(1)
 if !overLoadFactor(h.count+1, h.B) {
  // 是有太多溢出桶導致的擴容,進行等量擴容,bigger調整爲0
  bigger = 0
  h.flags |= sameSizeGrow
 }
 // 將現在的桶h.buckets掛載到h.oldbuckets中,新創建的桶保存在h.buckets中
 oldbuckets := h.buckets
 newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

 // 將h.flags中的iterator和oldIterator對應位置清0保存在flags
 flags := h.flags &(iterator | oldIterator)
 if h.flags&iterator != 0 {
  // 如果有在進行迭代操作,將迭代標記設置在舊桶上,因爲h.buckets中數據將放入到h.oldbuckets中
  flags |= oldIterator
 }
 // commit the grow (atomic wrt gc)
 // 更新一批h字段內容
 h.B += bigger
 h.flags = flags
 h.oldbuckets = oldbuckets
 h.buckets = newbuckets
 h.nevacuate = 0
 h.noverflow = 0

 // 對h.extra進行處理,如果非空,該掛載到h.extra.oldoverflow就掛載在它下面
 if h.extra != nil && h.extra.overflow != nil {
  // Promote current overflow buckets to the old generation.
  if h.extra.oldoverflow != nil {
   throw("oldoverflow is not nil")
  }
  h.extra.oldoverflow = h.extra.overflow
  h.extra.overflow = nil
 }
 // h.extra.nextOverflow指向新分配桶的溢出桶的位置
 if nextOverflow != nil {
  if h.extra == nil {
   h.extra = new(mapextra)
  }
  h.extra.nextOverflow = nextOverflow
 }
}

// growWork 進行真正的搬遷工作,調用一次growWork搬遷兩個桶(如只剩一個桶未搬遷,只搬遷一個)
func growWork(t *maptype, h *hmap, bucket uintptr) {
 // evacuate進行搬遷工作,執行一次該函數調用只會搬遷一個桶
 evacuate(t, h, bucket&h.oldbucketmask())

 // 如果h處在擴容狀態,還未搬遷完成
 if h.growing() {
  // 又調用用一次evacuate,再搬遷一個桶
  evacuate(t, h, h.nevacuate)
 }
}

// evacuate 進行搬遷工作,調用該函數一次只搬遷一個桶
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
 // 獲取要搬遷桶的地址,搬遷是將舊桶oldbuckets中數據搬遷到新桶buckets
 b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
 // 計算舊桶的桶數
 newbit := h.noldbuckets()
 // 如果b這個桶中的數據還未被搬遷,執行搬遷操作
 if !evacuated(b) {
  // xy是一個兩個元素的數組,x[0]中的bmap指向新桶的前半部分,x[1]中的bmap指向新桶的後半部分
  // 舉個例子,擴容前h.B爲2,那麼擴容前桶的數量爲2^2=4個,即h.oldbuckets中有4個桶,當加倍
  // 擴容之後,即h.B增加到3(2+1),擴容之後桶的數量爲2^3=8個,即h.buckets中有8個桶。這8
  // 個桶分爲兩部分,每個部分有4個
  // 假如h.oldbuckets舊桶數據tophash, 傳入的oldbucket爲3,指向210這個桶,e表示桶是空的
  //   0 1  2  3
  //  |7|e|10|210|
  // h.buckets新桶情況爲,因爲這是第一次搬遷,所以都是tophash空的
  //   0 1 2 3 4 5 6 7
  //  |e|e|e|e|e|e|e|e|
  var xy [2]evacDst
  x := &xy[0]
  // 設傳入的oldbucket爲3,x.b指向新桶第4(索引3)這個桶位置
  x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
  // x.k指向索引3這個桶第一個key的位置
  x.k = add(unsafe.Pointer(x.b), dataOffset)
  // x.e指向索引3這個桶第一個value的位置
  x.e = add(x.k, bucketCnt*uintptr(t.keysize))

  // 如果是非等量擴容
  if !h.sameSizeGrow() {
   y := &xy[1]
   // y指向後半部分中的桶,計算公式爲 基地址+偏移量, 基地址爲h.buckets
   // 偏移量爲(3+4)*uintptr(t.bucketsize), 即指向新桶索引爲7的位置
   y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
   // y.k指向索引7這個桶第一個key的位置
   y.k = add(unsafe.Pointer(y.b), dataOffset)
   // y.e指向索引7這個桶第一個value的位置
   y.e = add(y.k, bucketCnt*uintptr(t.keysize))
  }

  // 將桶b中的數據搬遷到x或y的桶中,具體是x還是y,根據key對應hash的h.B位的值
  for ; b != nil; b = b.overflow(t) {
   // k和e分別指向待搬遷桶b中第一個key和第一個value的位置
   k := add(unsafe.Pointer(b), dataOffset)
   e := add(k, bucketCnt*uintptr(t.keysize))
   // 依次對桶b中的8個k-v進行搬遷
   for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
    top := b.tophash[i]
    // 第i個槽位是不是沒有放數據,如果沒有放數據,continue跳過後面處理
    if isEmpty(top) {
     // 標記該槽位狀態爲已進行搬遷處理
     b.tophash[i] = evacuatedEmpty
     continue
    }
    // top值理論上不會爲2,3,4,因爲b肯定是沒有搬遷過的,2,3,4表示桶已進了搬遷處理
    if top < minTopHash {
     throw("bad map state")
    }
    k2 := k
    // 如果k是指針,進行解引用
    if t.indirectkey() {
     k2 = *((*unsafe.Pointer)(k2))
    }
    var useY uint8
    // 如果是非等量擴容,重新確定key在新桶中的位置
    if !h.sameSizeGrow() {
     // 計算key的哈希值
     hash := t.hasher(k2, uintptr(h.hash0))
     if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {

      // 對於NaN類型的key,每次對它計算 hash,得到的結果都不一樣
      // 這個 key是通過math.NaN()產生的,它的含義是 not a number,類型是 float64。
      // 把NaN類型作爲map的key時,會遇到一個問題:再次計算它的哈希值和它當初插入map時計算出來的哈希值不同
      // 這種類型的key一旦放入map是無法被Get操作獲取的,當使用 m[math.NaN()]語句的時候,是查不出來結果的
      // 這種類型的key只有在對map進行遍歷的時候才能被找到,並且,可以向一個 map插入多個數量的math.NaN()作爲key,
      // 它們並不會被互相覆蓋。當搬遷碰到 math.NaN() 的 key 時,只通過tophash的最低位決定分配到
      // 擴容後桶的前半部分還是後版本部分。如果tophash的最低位是0 ,分配到前半部分,如果是1分配到後半部分
      useY = top & 1
      // 計算hash值的tophash
      top = tophash(hash)
     } else {
      // 假設庫容前h.B爲2,在劃分桶的時候根據的是hash值的最後的h.B個bit位,即最後2個bit位
      // 現在容量擴大一倍了,要看新的h.B爲3(2+1),現在要看最後3個bit位。hash&newbit計算出
      // 就是判斷倒數第3個bit是1還是0,如果還是0,在新桶的位置在前半部分,如果是1在新桶的後半
      // 部分
      if hash&newbit != 0 {
       useY = 1
      }
     }
    }

    if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
     throw("bad evacuatedN")
    }
    //對b.tophash[i]進行標記,標記是當前槽位擴容後在新桶的前半部分還是後半部分
    b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
    // dst爲拷貝到的目標位置,可能是新桶的前半部分還是後半部分,取決於前面判斷
    dst := &xy[useY] // evacuation destination

    // dst.i是從0開始的,當到達8時,表示dst中bmap中已經裝滿8個數據
    if dst.i == bucketCnt {
     // 對桶dst.b創建溢出桶
     dst.b = h.newoverflow(t, dst.b)
     dst.i = 0
     // 設置dst.k,dst.e分別爲溢出桶的第一個key和value的位置
     dst.k = add(unsafe.Pointer(dst.b), dataOffset)
     dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
    }
    // 將top值填充到目標桶的tophash中
    dst.b.tophash[dst.i&(bucketCnt-1)] = top 
    // 分別將源桶的key和value拷貝給目標桶的key和value中,如果key或value是指針,進行解引用操作
    if t.indirectkey() {
     *(*unsafe.Pointer)(dst.k) = k2 
    } else {
     typedmemmove(t.key, dst.k, k)
    }
    if t.indirectelem() {
     *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
    } else {
     typedmemmove(t.elem, dst.e, e)
    }
    // 目標桶的記錄key/value的索引+1
    dst.i++
    // 目標桶dst的key和value分別向後偏移1個位置,因爲bmap結構體中最後有一個overflow指針
    // 保護,這裏不用擔心偏移之後出現踩內存的問題
    dst.k = add(dst.k, uintptr(t.keysize))
    dst.e = add(dst.e, uintptr(t.elemsize))
   }
  }
  
  if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
   b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
   ptr := add(b, dataOffset)
   n := uintptr(t.bucketsize) - dataOffset
   memclrHasPointers(ptr, n)
  }
 }

 // h.nevacuate記錄的是當前已經搬遷完桶的下一個未搬遷桶的位置,表示的是搬遷的進度,如果當前被搬遷的桶oldbucket
 // 與h.nevacuate相同,進行h.nevacuate更新操作,這裏注意一下,什麼時候出現oldbucket與h.nevacuate相同呢?
 // 回到growWork中看,第二次調用evacuate(t, h, h.nevacuate),傳遞的參數就是h.nevacuate
 if oldbucket == h.nevacuate {
  advanceEvacuationMark(h, t, newbit)
 }
}

// advanceEvacuationMark處理核心是檢查所有的桶是否全部搬遷完成
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
 // 小於等於h.nevacuate的桶都已經搬遷完,這裏h.nevacuate+1,接下來會檢查從
 // h.nevacuate+1的桶是否已完成搬遷
 h.nevacuate++
 // 經過試驗驗證,將stop設置爲當前進度+1024比較合理,因爲1024至少會比newbit高出一個數量級
 // 所以,用當前進度加上1024用於確保O(1)行爲
 stop := h.nevacuate + 1024
 if stop > newbit {
  // stop強行設置爲newbit,最大的桶才爲newbit
  stop = newbit
 }
 // 在stop的範圍內,探測出一個還未搬遷桶的位置
 for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
  h.nevacuate++
 }
 // 當還未搬遷的桶在newbit位置時,說明所有的桶都搬遷完成
 if h.nevacuate == newbit { // newbit == # of oldbuckets
  // 舊桶全部搬遷完成,置爲nil,舊桶的數據可以被GC回收了
  h.oldbuckets = nil
  // 如果map中存儲數據的key和value都不含指針,保存它們的bucket桶數組是存儲在hmap.extra中的
  // 這種情況下,搬遷的桶其實是h.extra.oldoverflow中的桶,現在已經搬遷完畢,需要將
  // h.extra.oldoverflow置爲nil
  if h.extra != nil {
   h.extra.oldoverflow = nil
  }
  // 擴容搬遷完畢,清理擴容的標誌位
  h.flags &^= sameSizeGrow
 }
}

上面的擴容代碼已做了詳盡的註解,結合前面的擴容流程圖,理解不難。這裏稍微說明下代碼中出現的 xy 區,對於增量擴容來說,新桶的數量會是舊桶的 2 倍大,新桶的前一半區域稱爲 x 區,後一半區域稱爲 y 區。對於等量擴容來說,新桶數量和舊桶一樣大,可以理解成只有 x 區沒有 y 區。在擴容的時候,每次只搬遷 2 個桶,growWork 中調了兩次 evacuate,evacuate 每次只搬遷一個桶。這麼做是爲了一種性能考量,如果 map 存儲了數以億計的鍵值對,一次性全部搬遷將會造成比較大的延時,在添加元素或刪除元素的時候出現卡頓,因此採用逐步搬遷策略。

擴容會產生什麼影響:擴容會對性能有影響,在我們添加或刪除元素的時候,存在擴容的可能,希望擴容越快越好,所以官方實現的時候採用了逐步搬遷的辦法,每次只搬遷兩個桶。還有一個是對遍歷有影響,逐步搬遷是一個過程,當我們對 map 進行遍歷的時候,map 可能沒有擴容完畢,這時候遍歷的時候要防止不能從老桶中獲取數據又從新桶中獲取到了數據。擴容會增加遍歷代碼實現的難度。

map 遍歷

通過 for range 遍歷 map 的時候,實現是通過初始化一個迭代器對象,迭代器結構如下,每次從迭代器對象中找到要訪問的 bmap 和當前的槽位。map 在每次遍歷的時候,返回的順序是不一樣的,這是怎麼做到的呢?每次遍歷的時候,會隨機產生從哪個桶開始迭代,並且從頭 bmap 中的哪個槽位開始也是隨機的,這些信息都記錄在 hiter 的 startBucket 和 offset 中,具體實現細節看下面的代碼註解。

遍歷 map 的迭代器結構如下

// map的迭代器結構,遍歷map的時候使用
type hiter struct {
 // 指向迭代map的key
 key unsafe.Pointer 
 // 指向迭代map的value
 elem unsafe.Pointer 
 // 存儲迭代map的類型
 t *maptype
 // 存儲迭代的map的結構體頭
 h *hmap
 // 記錄迭代的map中hmap.buckets的值
 buckets unsafe.Pointer 
 // 記錄當前正在訪問的bmap
 bptr *bmap 
 // 保存hmap.buckets的值
 overflow *[]*bmap 
 // 保存hmap.oldbuckets的值
 oldoverflow *[]*bmap 
 // 記錄開始是從哪個桶開始遍歷的,起始桶的選擇是隨機的
 startBucket uintptr 
 // offset記錄開始遍歷時是從桶中哪個槽位開始的,起始槽位的選擇也是隨機的
 offset uint8 
 // 記錄是否已經到桶數組中最後一個桶了,是用來輔助終止遍歷桶用的
 wrapped bool 
 // 迭代的map中的桶數的log2,與hmap中B的值相同
 B uint8
 // 記錄下一次要訪問的元素在哪個槽位
 i uint8
 // 記錄桶數組中下一代訪問桶的偏移值
 bucket      uintptr
 checkBucket uintptr
}

遍歷實現代碼在 mapiterinit 和 mapiternext 函數里,從起始爲 startBucket 桶中的 offset 槽位開始遍歷,整個遍歷都是在新桶 buckets 中進行的。每對一個新桶進行遍歷的時候,先檢查該桶對應老桶的狀態,當該桶位於前半部分的時候,該桶的序號和老桶的是相同,直接檢查老桶中對應序號桶的狀態,如果是已搬遷狀態,不用遍歷該老桶了。如果是還未搬遷,需要將老桶中以後會搬遷到當前新桶的元素提取出來完成遍歷,如果沒有直接跳過。當該桶位於後半部分的時候,該桶的序號和老桶相差 2^(B-1),直接減去 2^(B-1) 後定位到老桶,然後檢查老桶的狀態,如果是已搬遷,不用遍歷老桶,如果還未搬遷,將老桶中以後會搬遷到當前新桶的元素提取出來完成遍歷。

如上圖所示,起始桶是 4 號桶,桶的遍歷順序爲 4->5->6->7->0->1->2->3. 桶內的起始槽位爲 1。先檢查 4 號對應老桶序號爲 0(4-2^(3-1)), 發現老桶 0 號桶已搬遷完,直接不管老桶,遍歷 4 號桶,從槽位 1->2->3->...->7->1 訪問到槽位 0 中的 {2,b} 鍵值對,然後遍歷 5 號桶,發現 5 號桶對應的老桶 1 號桶已搬遷,5 號桶又沒有元素,繼續遍歷 6 號桶,發現 6 號桶對應的老桶 2 還未搬遷,開始對老桶 2 號桶進行遍歷,也是從槽位 1 開始, 槽位 1 的 key 值爲 6,計算哈希值後發現它將搬遷到新桶的 6 號桶,所以訪問它,然後繼續查看槽位 2,3... 直到槽位 0,槽位 0 的 key 值爲 5,計算哈希值後發現它將搬遷到新桶的 2 號桶,跳過不訪問它。這樣遍歷一圈之後,得到的鍵值對爲{2,b},{6,f},{1,a},{4,d},{3,c},{5,e}

// mapiterinit初始化一個對map進行for range迭代使用到的hiter結構對象it
func mapiterinit(t *maptype, h *hmap, it *hiter) {
 // data race檢查
 if raceenabled && h != nil {
  callerpc := getcallerpc()
  racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiterinit))
 }
 // 如果map還未初始化,或者還沒有存放數據,直接返回
 if h == nil || h.count == 0 {
  return
 }
 // 檢查hiter{}大小是否爲96
 if unsafe.Sizeof(hiter{})/sys.PtrSize != 12 {
  throw("hash_iter size incorrect") 
 }

 // 對it做一些初始化,將t和h信息賦值給it
 it.t = t
 it.h = h

 // 保存當前map的B值以及桶的位置
 it.B = h.B
 it.buckets = h.buckets
 // 桶不含指針類型數據
 if t.bucket.ptrdata == 0 {
  
  h.createOverflow()
  it.overflow = h.extra.overflow
  it.oldoverflow = h.extra.oldoverflow
 }

 // 生產一個隨機數,確定從map的哪個桶開始遍歷
 r := uintptr(fastrand())
 // h.B大於28
 if h.B > 31-bucketCntBits {
  r += uintptr(fastrand()) << 31
 }
 // r&bucketMask(h.B)其實就是r%(2^h.B)操作,隨機確定一個起始位置保存在startBucket中
 // 然後會從當前位置的桶開始向後遍歷,走一圈後又回到到startBucket的時候,表示遍歷結束
 it.startBucket = r & bucketMask(h.B)
 // offset記錄的是從哪個槽位開始,可以看到offset產生與r有關,r是一個隨機數,那說明offset
 // 也是一個隨機值。map的for range遍歷是從隨機的桶中隨機的一個槽位開始的,這就是爲什麼對
 // map進行遍歷的時候,產生的順序不同的原因
 it.offset = uint8(r >> h.B & (bucketCnt - 1))

 // it.bucket記錄的是當前要正在等待要迭代訪問的桶
 it.bucket = it.startBucket

 // 當前的迭代是可以和另一個迭代並行的,將h的flags標記設置爲3
 if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
  atomic.Or8(&h.flags, iterator|oldIterator)
 }

 mapiternext(it)
}

func mapiternext(it *hiter) {
 // 拿到it中存儲的hmap
 h := it.h
 if raceenabled {
  callerpc := getcallerpc()
  racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiternext))
 }
 // 檢查map是不是有寫操作,迭代是讀操作,不能與寫操作並行,如果正在對map進行寫操作,拋出錯誤
 if h.flags&hashWriting != 0 {
  throw("concurrent map iteration and map write")
 }
 // 拿到map的類型
 t := it.t
 // 當前待訪問的桶
 bucket := it.bucket
 // b爲nil
 b := it.bptr
 // i爲0
 i := it.i
 checkBucket := it.checkBucket

next:
 if b == nil {
  // 如果當前的桶地址和開始遍歷的桶地址相同,並且順序遍歷了一圈(it.wrapped),表明已經遍歷完成了
  if bucket == it.startBucket && it.wrapped {
   it.key = nil
   it.elem = nil
   // 遍歷完成,直接退出
   return
  }
  // 如果已在擴容
  if h.growing() && it.B == h.B {
   // bucket&操作是爲了防止溢出,oldbucket爲當前要處理桶的位置,可以理解爲桶數組的下標
   oldbucket := bucket & it.h.oldbucketmask()
   // b爲舊桶數組中距離第一桶偏移量爲oldbucket個桶的內存地址
   b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
   // 檢查桶b數據是否已搬遷
   if !evacuated(b) {
    // 如果未搬遷,checkBucket記錄下偏移量(bucket)
    checkBucket = bucket
   } else {
    // 如果已搬遷,要訪問的桶爲新桶數組中偏移量爲bucket地方的桶
    b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
    // noCheck爲1<<(8*8)-1,標記用
    checkBucket = noCheck
   }
  } else {
   // 沒有進行擴容,即oldbuckets是空的,所有的數據都保存當前桶it.buckets中
   b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
   checkBucket = noCheck
  }
  bucket++
  // bucket已經走到桶數組的末尾了,將回到開頭的位置,wrapped爲true表示走了一圈了
  if bucket == bucketShift(it.B) {
   bucket = 0
   it.wrapped = true
  }
  i = 0
 }
 for ; i < bucketCnt; i++ {
  // offi槽位的偏移量
  offi := (i + it.offset) & (bucketCnt - 1)
  // 如果槽位offi沒有數據或數據已搬遷,跳過後面處理,直接判斷下一個槽位
  if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
   continue
  }
  // 獲取槽位中的key
  k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
  // 如果是指針類型,進行解引用
  if t.indirectkey() {
   k = *((*unsafe.Pointer)(k))
  }
  // 獲取槽位中的value
  e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))
  // checkBucket不爲noCheck,需要進一步檢查該桶是否已搬遷還是未搬遷
  if checkBucket != noCheck && !h.sameSizeGrow() {
 
   // 計算兩次key是否相等,對於math.NaN()值的key計算哈希值每次得到都會不一樣
   if t.reflexivekey() || t.key.equal(k, k) {
    // 計算key的哈希值保存在hash中
    hash := t.hasher(k, uintptr(h.hash0))
    // it.B是擴容後的B,將hash與it.B的桶掩碼進行相與處理之後,得到是落在新桶的
    // 位置,注意checkBucket值來自於bucket,bucket是當前要被訪問的桶,bucket
    // 範圍是新桶的範圍,所以這裏將hash與B桶掩碼相與之後,如果hash值的第B個bit位爲1
    // 說明落在擴容後的桶的後半部分,如果爲0則落在前半部分。如果當前訪問的桶checkBucket
    // 在後半部分,恰好hash值的第B個bit位爲1,它們是相等的,會繼續走後面的邏輯,否則不會
    // 處理
    if hash&bucketMask(it.B) != checkBucket {
     continue
    }
   } else {
    // 對於math.NaN()值爲key的處理,由於計算出來的哈希值每次都在變化,
    // checkBucket有可能位於y區(後半區)也有可能位於x區(前半區),當前
    // checkBucket位於前半區時,checkBucket>>(it.B-1)後得到值始終都是
    // 0,當前checkBucket位於後半區時,得到值始終都是1. 此時hash值每次都是
    // 變化的不能再根據它決定是否要訪問,得到找一個固定不變的值,對,這裏找的就是
    // b.tophash[offi],將它&1之後也會得到兩種值,要麼是0要麼1.下面處理得到
    // 的效果是,如果當前訪問的bucket位於後半區,並且offi的tophash值最低位爲1
    // 這次就訪問它,否則不訪問,將offi的tophash值最低位爲0留在bucket在前半區
    // 的時候訪問
    if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
     continue
    }
   }
  }
  if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
   !(t.reflexivekey() || t.key.equal(k, k)) {
   // 槽位offi沒有處在擴容的狀態,直接獲取它的key和value值給it
   it.key = k
   if t.indirectelem() {
    e = *((*unsafe.Pointer)(e))
   }
   it.elem = e
  } else {
   // mapaccessK獲取map中鍵爲k的鍵值對,整個mapaccessK的處理邏輯與mapaccess1基本一樣
   rk, re := mapaccessK(t, h, k)
   // 如果key已被刪除,直接continue,跳過當前的key的訪問
   if rk == nil {
    continue 
   }
   // 將獲取到的key和value值賦值給迭代器it
   it.key = rk
   it.elem = re
  }
  // 更新迭代器中待訪問的桶bucket位置
  it.bucket = bucket
  if it.bptr != b { 
   // it.bptr記錄當前正在訪問哪個桶,下一次迭代的時候,繼續從it.bptr桶中取數據
   it.bptr = b
  }
  // 更迭代器中的槽位位置,爲下一次迭代做準備
  it.i = i + 1
  it.checkBucket = checkBucket
  return
 }
 // 檢查是否有溢出桶,如果由,繼續遍歷溢出桶
 b = b.overflow(t)
 i = 0
 goto next
}

map 刪除

map 刪除代碼是在 mapdelete 函數中實現的,代碼比較簡單,由於篇幅原因,這裏就不貼代碼詳細註解了,理解起來有不明白的非常歡迎與我溝通交流。刪除的整體邏輯也是根據 key 定位到存儲 key-value 的位置,然後對應位置 "置空", 在定位 key 的位置前,檢查 map 是否處在擴容過程中,如果已在擴容中,進行一次搬遷操作,最後將 key 對應位置的 tophash 設置爲 empty 狀態,將記錄 map 中元素數據的計數器 count 減一。

總結

Go 中 map 實現採用的哈希表,通過拉鍊法來解決衝突問題,物理實現結構是用一個固定大小的桶數組加鏈表構成。在實現的過程中採用了很多優化點,總結如下。

開放尋址法 VS 鏈表法 [1]treemap[2]Go 之讀懂 map 的底層設計 [3] 深入解析 Golang 的 map 設計 [4]

Reference

[1] 開放尋址法 VS 鏈表法: https://blog.csdn.net/weixin_41563161/article/details/105104239

[2] treemap: https://wizardforcel.gitbooks.io/think-dast/content/12.html

[3] Go 之讀懂 map 的底層設計: http://blog.newbmiao.com/2020/02/04/dig101-golang-map.html

[4] 深入解析 Golang 的 map 設計: https://zhuanlan.zhihu.com/p/273666774

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/17iEnkfyRE4yKQzWdFmTBA