Go 內存管理 - 下篇

本文是 Go 內存管理的下篇,主要圍繞內存分配源碼分析其工作原理,分析的版本是 Go1.14 版。在 Go1.11 之後內存分配做了不少改動,網上的很多資料分析的都是舊版的代碼,跟當前最新的版本差異比較大。Go 內存分配涉及的代碼很多,代碼在 runtime 包下的 malloc.go、mcache.go、mcentral.go、mgc.go、mheap.go、mpagealloc.go、mpagealloc_64bit.go、mpagecache.go、mpallocbits.go、mbitmap.go,閱讀需要花費不少時間,本文是小編閱讀完上述代碼做的一個分析總結輸出。在閱讀本文之前,建議讀者先看 Go 內存管理 - 上篇, 上篇總結了 TCMalloc 的工作原理,Go 的內存管理基於的是 TCMalloc,所以看完上篇之後再來看本文更容易理解。

基本概念

在開始介紹 Go 內存管理組件和具體對象是如何進行分配的之前,我們先來了解 mspan、heapArena、對象大小几個基本概念,方便後文的理解。

mspan

mspan 是 golang 內存管理的基本單位,每個 span 管理指定規格(以 page 爲單位)的內存塊,具體規格以 page 爲單位,page 的概念可以看 Go 內存管理 - 上篇文章,內存池分配出不同規格的內存塊就是通過 span 體現出來的,應用程序創建對象就是通過找到對應規格的 span 來存儲的. 通過下面 mspan 結構定義可以看到,它主要包含 startAddr 和 npages 兩個核心參數,startAddr 記錄了 mspan 管理的頁面對應的內存首地址,npages 表明了該 mspan 管理了多少個頁面。

type mspan struct {
 // 鏈表中下一個mspan
 next *mspan 
 // 鏈表中上一個mspan
 prev *mspan     
 list *mSpanList // For debugging. TODO: Remove.
 // 該mspan的起始地址
 startAddr uintptr 
 // mspan中包含多少頁
 npages uintptr 
    ...
}
heapArena

Go 中的堆被表示成由多個 arena 組成,每個 arena 在 64 位機器上爲 64MB,且起始地址與 arena 的大小對齊,所有的 arena 覆蓋了整個 Go 堆的地址空間。運行時使用二維的 runtime.heapArena 數組管理所有的內存,每個單元都會管理 64MB 的內存空間。heapArena 中 bitmap 用於標識當前 arena 區域中的那些地址保存了對象,位圖中的每個 bit 都會表示堆區中的字節是否空閒, spans 數存儲了 mspan 的指針,每個內存單元會管理幾頁的內存空間,每頁大小爲 8KB。

type heapArena struct {
 
 bitmap [heapArenaBitmapBytes]byte 
 // pagesPerArena=8192,一個mspan與一個page 一一對應, 一個page爲8k
 // 所以一個heapArena管理的大小爲8192*8k=64M
 spans [pagesPerArena]*mspan
 // pageInUse [1024]uint8 即1024個Byte,每個bit標識一個page是否被使用
 pageInUse [pagesPerArena / 8]uint8
 // pageMarks [1024]uint8,即1024個Byte,每個bit標識一個page中是否有對象被marked
 pageMarks [pagesPerArena / 8]uint8
 zeroedBase uintptr
}
對象的大小

Go 中的內存分配器會根據申請分配的內存大小選擇不同的處理邏輯,運行時根據對象的大小將對象分成微對象、小對象和大對象三種。微對象大小在 (0,16B), 小對象的大小在 [16B,32KB],大對象大小在 (32KB,+∞)。大多數對象的大小都在 32KB 以下,而申請的內存大小影響 Go 運行時分配內存的過程和開銷,分別處理大對象和小對象有利於提高內存分配器的性能。

內存管理組件

內存分配由內存分配器完成,Go 中的內存分配器組件有 mheap、mcentral 和 mcache,可以看到 Go 中的組件與 TCMalloc 是一致的,建議看完 Go 內存管理 - 上篇, 在看下面的內容,方便理解。

                                         Go 內存管理
mcache

每個 P(GPM 中的 P)都會有一個 mcache。mcache 保存了本地可用的 mspan 對象,這樣相同 P 下的 goroutine 在申請內存 (小對象和微對象) 的時候,直接從本地的 mcache 中分配。同一個 P 一次只能運行一個 goroutine,所以相同 P 下的 goroutine 在申請內存的時候不存在競爭的情況,不用加鎖處理。

mcache 結構定義如下:

type mcache struct {
 // 分配一定字節後觸發堆採樣
 next_sample uintptr 
 // 分配的可掃描堆字節數,GC的時候會用
 local_scan  uintptr 
 // 申請微對象(<16B)的起始地址
 tiny uintptr
 //從起始地址tiny開始的偏移量
 tinyoffset uintptr
 //tiny對象分配的數量
 local_tinyallocs uintptr 
 // 分配的mspan list,其中numSpanClasses=134
 alloc [numSpanClasses]*mspan 
 //棧緩存
 stackcache [_NumStackOrders]stackfreelist

 // 大對象釋放字節數
 local_largefree uintptr 
 // 釋放的大對象數量
 local_nlargefree uintptr 
 // 每種規格小對象釋放的個數
 local_nsmallfree [_NumSizeClasses]uintptr 

 // 掃描計數
 flushGen uint32
}

type p struct {
 ...
 mcache      *mcache
 ...
}

mcache 結構重點關注 alloc 字段,這是一個 * mspan 類型的數組,數組大小 numSpanClasses 的值爲 134,因爲 SpanClasses 一共有 67 種,爲了滿足指針對象和非指針對象,這裏爲每種規格的 span 同時準備 scan 和 noscan 兩個,因此一共有 134 個 mspan 緩存鏈表,分別用於存儲指針對象和非指針對象,這樣對非指針對象掃描的時候不需要繼續掃描它是否引用其他對象,GC 掃描對象的時候對於 noscan 的 span 可以不去查看 bitmap 區域來標記子對象, 這樣可以大幅提升標記的效率。

mcentral

mcentral 與 TCMalloc 中的 CentralCache 類似,是所有線程共享的緩存,訪問時需要加鎖。它按 spanclass 級別對 span 分類,然後串聯成鏈表,當 mcache 的某個級別 span 的內存被分配光時,它會向 mcentral 申請 1 個當前級別的 span。

mcentral 結構定義如下:

type mcentral struct {
 lock mutex
 // span大小規格,當前有67種
 spanclass spanClass

 // nonmepty表示還沒有空object,可以繼續從這裏獲取到有空object的span
 nonempty mSpanList 
 // empty表示已經沒有空object了,所有的span都已經分配出去了,或者劃分給了mcache
 empty mSpanList 
 // 分配的對象數
 nmalloc uint64
}

mcentral 維護了一種 spanClass 的 span 對象鏈表,它有兩個 mSpanList 鏈表,分別是 nonempty 和 empty,nonempty 鏈表中 span 表示還有空閒的 object,empty 表示它裏面的 span 已經沒有空閒 object 了,都已分配出去了。

mheap

mheap 與 TCMalloc 中的 PageHeap 類似,它是堆內存的抽象,把從 OS 申請出的內存頁組織成 Span,並保存起來。每個 Go 程序使用一個 mheap 的全局對象 mheap_來管理堆內存。當 mcentral 的 Span 不夠用時會向 mheap 申請內存,而 mheap 的 Span 不夠用時會向 OS 申請內存。mheap 主要用於大對象的內存分配,以及管理未切割的 mspan,用於給 mcentral 切割成小對象。mheap 結構定義如下:

type mheap struct {
 lock mutex
 // page分配器
 pages pageAlloc 
 // GC時清掃標記值
 sweepgen uint32 
 // 標記清掃是否完成
 sweepdone uint32 
 // 有效的清掃調用數
 sweepers uint32

 // 保存了所有的的mspan指針對象的數組
 allspans []*mspan 
 sweepSpans [2]gcSweepBuf

 // 有多少頁正在被使用
 pagesInUse uint64 
 // 掃描的頁面數量
 pagesSwept uint64 
 // 用做掃描比例的初始基點
 pagesSweptBasis uint64 
 // 用做掃描比例的初始處於存活狀態的初始基點
 sweepHeapLiveBasis uint64 
 //掃描比
 sweepPagesPerByte float64 
 scavengeGoal uint64

 reclaimIndex uint64
 
 reclaimCredit uintptr

 // 大對象分配的字節數
 largealloc uint64 
 // 大對象分配的數量
 nlargealloc uint64 
 // 大對象釋放的字節數
 largefree uint64 
 // 大對象釋放的數量
 nlargefree uint64 
 // 小對象釋放的數量
 nsmallfree [_NumSizeClasses]uint64 

 //arenas數組集合,管理各個heapArena
 // arenas是二級數組,數組中的元素是*heapArena,在mac下第一維大小爲1
 // 第二維大小爲1<<22 = 1024*1024*4
 // 一個heapArena管理的內存大小爲64M,所以arenas管理的內存爲 1024*1024*4*64M=256T
 arenas [<< arenaL1Bits]*[1 << arenaL2Bits]*heapArena
 // heapArenaAlloc內存分配器,32位系統上會用這個分配
 heapArenaAlloc linearAlloc
 // arena內存分配器,32系統上會用這個分配arena
 arena linearAlloc

 //所有arena序號集合,可以根據arenaIdx算出對應arenas中的哪一個heapArena
 allArenas []arenaIdx

 //掃描週期開始時allArenas的一個快照
 sweepArenas []arenaIdx

 curArena struct {
  base, end uintptr
 }

 _ uint32 
 // 各個規格的mcentral集合,numSpanClasses=134
 central [numSpanClasses]struct {
  mcentral mcentral
  pad      [cpu.CacheLinePadSize - unsafe.Sizeof(mcentral{})%cpu.CacheLinePadSize]byte
 }

 // span*的內存分配器,只是分配空結構
 spanalloc fixalloc 
 // mcache*的內存分配器
 cachealloc fixalloc 
 // specialfinalizer*的內存分配器
 specialfinalizeralloc fixalloc 
 // specialprofile*的內存分配器
 specialprofilealloc fixalloc 
 speciallock         mutex    
 // arenaHintAlloc的內存分配器
 arenaHintAlloc fixalloc 
 unused *specialfinalizer 
}

大對象分配

分配對象的大小超過 32KB 視爲大對象分配,大對象的分配流程與小對象和 tiny 對象的分配是不一樣的。大對象的分配直接走 heap 進行分配。

無論是大對象還是小對象的分配統一入口都是 newobject,newobject 內部直接調用了 mallocgc 函數,mallocgc 函數根據分配對象的大小做不同的處理邏輯,下面的代碼只保留了大對象分配的核心邏輯,去掉了干擾項,方便我們理解大對象分配的主要流程。可以看到,對於大對象的處理,走的是 largeAlloc 函數,該函數傳入 3 個參數:要分配的內存大小、是否需要對分配的內存清 0、分配的對象是否含有指針。函數的返回值是一個 mspan 對象,而 mallocgc 函數要返回的是一個內存起始地址,mspan 的 base 方法返回的就是 mspan 管理的 pages 的首地址,正是我們需要的,所以 x = unsafe.Pointer(s.base()) 中的 x 是將要分配對象的起始地址。

func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
 ...
 // 小對象,<=32KB
 if size <= maxSmallSize {
  // 16B,且不含指針
  if noscan && size < maxTinySize {
   ...
  } else { //含有指針 或要分配的內存大於等於16B
   ...
  }
 } else { // >32KB對象的內存分配,直接走heap分配
  var s *mspan
  shouldhelpgc = true
  // 切換到系統棧上執行,傳入參數爲要分配的內存大小,是否需要清0,申請的對象是否包含指針
  // 返回分配到的mspan
  systemstack(func() {
   // 調用largeAlloc分配一個大對象的mspan
   s = largeAlloc(size, needzero, noscan)
  })
  // 設置mspan的一些輔助字段,mspan存儲大對象只放一個,所以s.allocCount賦值爲1
  // freeindex表示下一個空閒的對象是mspan中第幾個對象,freeindex從0開始,因爲已分配
  // 出去了一個對象,下一個空閒對象index爲1,所以這裏s.freeindex賦值爲1,實際上mspan已分配完了
  // 因爲大對象,一個mspan只放一個object
  s.freeindex = 1
  s.allocCount = 1
  x = unsafe.Pointer(s.base())
  size = s.elemsize
 }
 ...

 return x
}

largeAlloc 函數處理大對象的分配,根據分配對象的大小,轉換成要分配多少個 page,一個 page 大小爲 8KB, 如果分配的大小不是 8KB 的整數倍,向上取整,即分配的頁數對應管理的內存不小於 size. 例如 size=32KB+1, 則需要分配(32KB+1)/8KB=5 個 page。然後創建 spanClass, 對應大對象來說,spanClass 爲 0 或 1,如果待分配的對象中含有指針,spanClass 爲 1,不含指針,spanClass 爲 0. 最後調用 mheap_.alloc 分配一個 mspan 對象。

// largeAlloc用於大對象的分配,>32KB對象內存分配爲大對象
func largeAlloc(size uintptr, needzero bool, noscan bool) *mspan {
 // 分配的內存過大溢出了,拋出異常
 if size+_PageSize < size {
  throw("out of memory")
 }
 // 計算需要多少個頁
 npages := size >> _PageShift
 // size&_PageMask等價於size%_PageSize, _PageSize=_PageMask+1
 // 即不是_PageSize(8KB)的整數倍,npages+1個,確保分配的大小不能小於size
 if size&_PageMask != 0 {
  npages++
 }

 // 先做一些sweep工作
 deductSweepCredit(npages*_PageSize, npages)
 // makeSpanClass將sizeclass轉換成spanClass, 大對象的sizeclass爲0
 // 大對象的spanClass不是0就是1,如果分配的對象中含有指針,spanClass就是1,不含指針,spanClass就是0
 s := mheap_.alloc(npages, makeSpanClass(0, noscan), needzero)
 if s == nil {
  throw("out of memory")
 }
 // limit保存了分配內存的上限地址位置
 s.limit = s.base() + size
 // bitmap設置,用於GC的
 heapBitsForAddr(s.base()).initSpan(s)
 return s
}

alloc 方法分配一個新的 mspan,真正分配是 h.allocSpan 做的,它本身除了的邏輯不多,主要是在進行 h.allocSpan 前進對不用的頁面進行清掃回收,對分配到的內存根據是否需要清 0 做一些清理操作。

// alloc分配一個新的mspan,mspan的對象規格爲入參指定的規格,管理的頁面數爲npages
func (h *mheap) alloc(npages uintptr, spanclass spanClass, needzero bool) *mspan {
 var s *mspan
 systemstack(func() {
  // 爲了防止堆過度增長,在分配n個頁面之前,我們需要清除並回收至少n個頁面
  if h.sweepdone == 0 {
   h.reclaim(npages)
  }
  // 調用allocSpan分配一個mspan
  s = h.allocSpan(npages, false, spanclass, &memstats.heap_inuse)
 })

 if s != nil {
  if needzero && s.needzero != 0 {
// 對分配到的內存進行清理操作   memclrNoHeapPointers(unsafe.Pointer(s.base()), s.npages<<_PageShift)
  }
  s.needzero = 0
 }
 return s
}

allocSpan 根據要分配的頁面數目、分配對象的規格、是否是手動分配參數分配一個 mspan 對象。sysStat 是統計內存使用情況的參數,跟本文要分析的內容關係不大,這裏不用關心。如果是手動分配,即 manual 爲 true, 需要調用者負責與其使用 span 相關的信息記錄維護,如果是 manual 爲 false, 自動在 span 中更新了堆的使用信息。總結起來,allocSpan 分配一個 span 對象要完成三步操作:

  1. 申請一個有 npages 個頁面的內存空間,得到這片空間的首地址 base

  2. 分配一個 span 對象

  3. 將 1 中分配的頁面與 2 中的 span 對象綁定關聯起來,具體就是對 span 中的字段進行一些賦值操作,例如將 base 給到 span 的 startAddr,npages 給到 span 的 npages 等 本小節內容講大對象分配,所以下面的 allocSpan 方法僅保留了大對象分配相關的處理邏輯,其他關係不大的內容省略掉了,方便我們搞清大對象分配流程。

func (h *mheap) allocSpan(npages uintptr, manual bool, spanclass spanClass, sysStat *uint64) (s *mspan) {
 // Function-global state.
 gp := getg()
 base, scav := uintptr(0), uintptr(0)

 // 獲取當前的P中的pageCache, pageCachePages爲64
 pp := gp.m.p.ptr()
 // 如果分配的page數小於16個,嘗試從P的pageCache中分配
 if pp != nil && npages < pageCachePages/4 {
  c := &pp.pcache
      // pageCache每個P有1個,它表示64*8KB=512KB大小的一塊內存空間,在該內存空間中可能有0個或多個空閒頁面。
  // c中cache全爲0,表示所有所有的頁面都已分配出去了,沒有空閒的頁面了
  if c.empty() {
   lock(&h.lock)
   // 申請分配一個新的pageCache給P
   *c = h.pages.allocToCache()
   unlock(&h.lock)
  }

  // 嘗試從P的pageCache中分配npages個頁面的內存
  base, scav = c.alloc(npages)
  if base != 0 {
   // 嘗試分配一個mspan對象
   s = h.tryAllocMSpan()

   if s != nil && gcBlackenEnabled == 0 && (manual || spanclass.sizeclass() != 0) {
    goto HaveSpan
   }
  }
 }


 lock(&h.lock)
    // 走到這裏base還是0,表明前面從pageCache中沒有分配到內存
 if base == 0 {
  // 調用pageAlloc分配器進行頁分配
  base, scav = h.pages.alloc(npages)
  if base == 0 {
   // 如果還是沒有分配到,進行擴容
   if !h.grow(npages) {
    unlock(&h.lock)
    return nil
   }
           // 擴容之後再次調用pageAlloc分配器分配page
   base, scav = h.pages.alloc(npages)
           // 還是沒有分配到,拋出錯誤
   if base == 0 {
    throw("grew heap, but no adequate free space found")
   }
  }
 }
 if s == nil {
  // 分配一個mspan對象
  s = h.allocMSpanLocked()
 }
 // 如果對象不是手動分配
 if !manual {
  // 下面是內存分配統計操作,可以不關注
  memstats.heap_scan += uint64(gp.m.mcache.local_scan)
  gp.m.mcache.local_scan = 0
  memstats.tinyallocs += uint64(gp.m.mcache.local_tinyallocs)
  gp.m.mcache.local_tinyallocs = 0

  // 記錄大對象分配的個數和分配的內存大小
  if spanclass.sizeclass() == 0 {
   mheap_.largealloc += uint64(npages * pageSize)
   mheap_.nlargealloc++
   atomic.Xadd64(&memstats.heap_live, int64(npages*pageSize))
  }

  if gcBlackenEnabled != 0 {
   gcController.revise()
  }
 }
 unlock(&h.lock)

HaveSpan:
 // 將分配到的內存base和span對象s綁定起來,因爲span對象管理的就是一個或多個頁面
 s.init(base, npages)
 if h.allocNeedsZero(base, npages) {
  s.needzero = 1
 }
 // 分配到的內存大小,一共npages*8KB個字節
 nbytes := npages * pageSize
 if manual {
      // 大對象不是手動分配,即分配大對象不會走到這裏,這裏分析大對象分配流程,所以這個分支的內容不關心
  ...
 } else {
  // 根據規格(spanClass),設置span s的一些字段內容,主要有元素大小(s.elemsize)
  // 元素個數(s.nelems)
  s.spanclass = spanclass
  // 對於大對象來說,它的spanClass對應的sizeClass爲0
  if sizeclass := spanclass.sizeclass(); sizeclass == 0 {
   // 大對象獨佔一個span,所以s.nelems=1, 元素的大小就是整個span管理的頁面對應的所有空間
   s.elemsize = nbytes
   s.nelems = 1

   s.divShift = 0
   s.divMul = 0
   s.divShift2 = 0
   s.baseMask = 0
  } else {
        // 這裏也是處理非大對象邏輯,先不關心,具體分析放到小對象和tiny對象分配中去分析
            ...
  }
    ...
 // 統計內存信息,這裏不用關心
 mSysStatInc(sysStat, nbytes)
 mSysStatDec(&memstats.heap_idle, nbytes)
   ...
 return s
}

分配 npages 個頁面的邏輯比較複雜,這裏對其進行一個分析。每個 P(GMP 中的 P, 邏輯處理器)有一個 pageCache 類型的字段 pcache,見下面的 p 結構(省略了其他字段). pcache 理解爲頁的緩存,就是它保存了一組等待被使用的連續的頁。結合 pageCache 的結構定義,base 表示這一組連續頁的起始地址,cache 是一個 uint64 對象,在 64 位機器上是 8 個字節,即 64 個 bit。cache 中每個 bit 表示一個 page 是否是空閒的還是被分配出去了,1 表示空閒,0 表示已分配出去了。scav 表示已清除頁面的 64 位位圖。每個頁面大小爲 8KB, 所以 pcache 管理的這片內存大小爲 64*8K=512KB。

type p struct {
    ... //其他省略
 pcache      pageCache
    ...
}

type pageCache struct {
 base  uintptr // base address of the chunk
 cache uint64  // 64-bit bitmap representing free pages (1 means free)
 scav  uint64  // 64-bit bitmap representing scavenged pages (1 means scavenged)
}

對於要分配的的 page 數小於 16 個 page 的,從 pcache 中查找是否有空閒的空間可供分配. 此時有兩種情況:

  1. pcache 中的 page 都已分配出去了,沒有空閒 page 了

  2. pcache 中還有空閒 page 對於情況 1,pcache 中沒有空閒 page, 這個時候會調用 pageAlloc 分配器分配 pageCache 給 pcache. pageAlloc 分配器顧名思義就是專門分配 page 的分配器,代碼實現非常複雜,pageAlloc 具體分配細節見前面的 pageAlloc 分配原理中已有講解。新分配 pageCache 給 pcache 之後,pcache 中就有空閒的 page 了,接下來會走情況 2

對於情況 2,pcache 還有空閒 page, 嘗試從 pcache 中分配 npages 個頁面。調用的函數就是 alloc. alloc 函數根據要分配的頁數 (npages) 是 1 還是大於 1 走不同的處理邏輯。細節比較複雜,下面分別詳細說明

分配 1 個頁面的處理流程 就是從 c.cache 中找到一個 bit 爲 1 的位置,該位置對應的 page 是未分配出去的,怎麼找呢?這裏有一個算法,就是從 cache 右邊向左邊找 (理解爲從低位向高位),對應的函數就是 sys.TrailingZeros64,該函數接收 1 個 uint64 的整數,實現的功能是計算傳入參數中尾隨零位的總數。例如,對於 15,它的二進制爲 1111, 二進制末尾 0 的個數爲 0 個,對於 12,它的二進制爲 1100, 二進制末尾的 0 的個數爲 2 個,對於 10,它的二進制爲 1010,二進制末尾的 0 的個數爲 1 個。通俗來說,就是對於一個數 x, 它對應的二進制爲 bx,然後從右往左統計 bx 中 0 的個數,遇到 1 則結束統計。

下圖中 cache 的值爲 ffff ffff ffff fffa,對應的二進制爲 1111111111111111111111111111111111111111111111111111111111111010,對其計算 sys.TrailingZeros64 得到的值爲 1,因爲它的二進制中尾隨零位的總數爲 1,也就是第 1 個 page 是未分配的, 通過圖也可以看到除了第 0 個和第 2 個 page 已分配外,其他都是未分配的。

然後執行 c.cache &^= 1 << i,上面得到的 i 值爲 1,即 c.cache &^=1<<1, 將 c.cache 的第 1 個 bit 爲設置爲 0. 這裏說下 &^ 的含義,將運算符左邊數據相異的位保留,相同位清零。左邊 c.cache 的第 1 個 bit 位置爲 1,1<<1 對應的二級制除第 1 個 bit 位爲 1 外,其他都爲 0,所以執行 c.cache &^ = 1<<1 之後,相當於將 c.cache 的第 1 個 bit 位置爲 0,其他 bit 位保持不變。同理執行 c.scav &^= 1 << i,將 scav 中第 1 個 bit 位設置爲未清理狀態即設置爲 0,c.scav 中 bit 爲 1 表示對應的 page 已清理,0 表示未清理。所以如果 c.scav 之前第 1 個 bit 位的值爲 0,執行 c.scav &^ = 1<<1 之後,第 1 個 bit 位的值還是 0,即之前是未清理狀態現在還是未清理狀態,如果 c.scav 之前的第 1 個 bit 位的值爲 1,執行執行 c.scav &^ = 1<<1 之後,第 1 個 bit 的值將變爲 0,即之前爲已清理狀態現在變爲未清理狀態。經過上面的操作之後,上圖 pageCache 狀態變爲下圖所示。

最後返回分配的內存起始地址,上面的例子中分配的是第 1 個 page,根據 pageCache 的起始地址 base 和偏移(i 的值)可計算得到第 1 個 page 的地址爲 c.base+1*pageSize, 返回的第二參的值爲 pageSize 或爲 0,當分配前 c.scav 第 i 個 bit 位值爲 1 即第 i 個 page 頁已清理時,返回 pageSzie, 當分配前 c.scav 第 i 個 bit 位值爲 0 即第 i 個 page 頁未清理時,返回 0.

func (c *pageCache) alloc(npages uintptr) (uintptr, uintptr) {
    // c中所有的page都已分配出去了,直接返回0,0
 if c.cache == 0 {
  return 0, 0
 }
    // 如果要分配的頁數是1個,走分配1個page的流程
 if npages == 1 {
  i := uintptr(sys.TrailingZeros64(c.cache))
  scav := (c.scav >> i) & 1
        // 將c.cache的第i個bit位置爲0
  c.cache &^= 1 << i // set bit to mark in-use
        // 將c.cache
  c.scav &^= 1 << i  // clear bit to mark unscavenged
  return c.base + i*pageSize, uintptr(scav) * pageSize
 }
    // 要分配的頁數>1個, 走c.allocN
 return c.allocN(npages)
}

分配 > 1 個頁面的處理流程 分配的頁面數 > 1 走 c.allocN 函數,該函數的處理邏輯也比較簡單,就是從 c.cache 中找到一個 bit 位連續爲 1 且連續的數量 >=npages 的位置。然後將 c.cache 中對應連續 npages 個 bit 位爲 1 的值修改爲 0,表示這 npages 個 bit 位對應的頁面已分配出去了。同理將 c.scav 中對應連續 npages 個 bit 位的值修改爲 0,表示這 npages 個 bit 位對應的頁面爲未清理狀態。sys.OnesCount64 採用 Population Count 算法統計一個二進制數中 1 的個數,感覺很巧妙,它能夠在 Log(N) 時間複雜度內得到一個二進制數中 1 的個數。最後通過偏移量計算出分配內存的地址並將其返回,scav 爲分配前在 mask 對應位置爲 1 的頁面中已清理的頁面數,並返回 scav*pageSize 的值

下圖以分配 3 個 pages 爲例,分配前 c.cache 的值如下圖所示爲 0xffff ffff ffff fffe, 執行完 findBitRange64 返回 1,即從 c.cache 的第 1 個索引位置的 3 個連續的 bit 值都是 1,然後將其分配出去,即將它們的值都修改爲 0,得到新的值如分配後的圖所示。

分配前

分配後

func (c *pageCache) allocN(npages uintptr) (uintptr, uintptr) {
 i := findBitRange64(c.cache, uint(npages))
 // i>=64 表示從c.cache中未找到連續npages個bit值爲1的位置,直接返回0,0
 if i >= 64 {
  return 0, 0
 }

 // mask爲找到的連續npages個bit值爲1的位置的值爲1,其他位置爲0
 // 例如如果i=3,npages=4,則mask的值爲 0x0000 0000 0000 0078
 mask := ((uint64(1) << npages) - 1) << i
 // sys.OnesCount64採用Population Count算法統計c.scav & mask數中1的個數
 scav := sys.OnesCount64(c.scav & mask)
 // 將c.cache中與mask二進制中值爲1的相同位置的值設爲0
 c.cache &^= mask // mark in-use bits
 // 將c.scav中與mask二進制中值爲1的相同位置的值設爲0
 c.scav &^= mask // clear scavenged bits
 // 通過偏移量計算出分配內存的地址
 // scav爲分配前在mask對應位置爲1的頁面中已清理的頁面數
 return c.base + uintptr(i*pageSize), uintptr(scav) * pageSize
}


// findBitRange64從c中找到一個連續bit值爲1且連續的數量>=n的位置
func findBitRange64(c uint64, n uint) uint {
 i := uint(0)
 cont := uint(sys.TrailingZeros64(^c))
 // 當c的二進制中連續位置爲1的數量(cont)大於或等於n時,表示找到一個滿足條件的位置
 // 如果沒有找到,i的值最後會大於64,所以推出循環條件爲找到了位置或i>64
 for cont < n && i < 64 {
  i += cont
  i += uint(sys.TrailingZeros64(c >> i))
  // cont記錄連續1的數量
  cont = uint(sys.TrailingZeros64(^(c >> i)))
 }
 return i
}

前面分析了分配的頁面數 <16 個 page 的情況,小於 16 個 page 是從 P 的 pageCache 中分配的。下面分析分配的頁面數>=16 個 page 的情況,大於等於 16 個 page 是通過 pageAlloc 分配器來分配的,順便說一下,小於 16 個 page 通過 pageCache 分配失敗的情況下也會走這裏的 >=16 個 page 的分配邏輯。下面結合源碼分析 >=16 個 page 的分配流程。整個 >=16 個 page 的分配流程包含兩個關鍵操作:

  1. 利用 pageAlloc 分配器分配 npages(npages>=16) 頁面大小的內存

  2. 如果分配器分配失敗,向操作系統申請新內存進行擴容

alloc 利用 pageAlloc 分配器分配 npages 個頁面的內存地址,分配流程比較複雜,核心是要搞懂 pageAlloc 分配器的工作原理。

// alloc通過pageAlloc分配器分配npages個頁面的內存地址
func (s *pageAlloc) alloc(npages uintptr) (addr uintptr, scav uintptr) {
 // s.end記錄了已經從操作系統申請的內存最高地址的地方,pageAlloc中保存的[s.start,s.end]這段
 // 內存空間是已處於sysUsed狀態了(pageAlloc從操作系統申請的內存會先設置爲sysReserve ,然後在設置爲sysMap,最後設置爲sysUsed
 // 這幾個狀態), s.searchAddr對應的是一個堆空間地址,pageAlloc按chuck(4M)大小管理從操作系統的申請的內存
 // chunkIndex(s.searchAddr)將內存地址轉換成按4M切分後的索引位置,如果這個位置比s.end還大
 // 說明pageAlloc暫存的page已都分配完了,這裏直接返回,需要繼續向操作系統申請一塊內存了。
 if chunkIndex(s.searchAddr) >= s.end {
  return 0, 0
 }
 searchAddr := uintptr(0)
 //pallocChunkPages爲512,每個page的大小爲8KB,所以一個chunk的大小爲512*8KB=4M
 //chunkPageIndex(s.searchAddr)得到當前的s.searchAddr的對應的第幾個page頁面。
 //因爲小於s.searchAddr地址的內存已分配出去了,pallocChunkPages-chunkPageIndex(s.searchAddr)得到
 // 當前chuck剩餘的頁面,只有當前剩餘的頁面數>=npages個,纔有可能分配出去npages個頁面大小的空間
 if pallocChunkPages-chunkPageIndex(s.searchAddr) >= uint(npages) {
  // i表示s.searchAddr地址對應的是第幾個chuck塊
  i := chunkIndex(s.searchAddr)
  // s.summary[len(s(s.summary)-1][i]中的每個pallocSum維護了一個chunk塊中page的分配信息
  // pallocSum有3部分構成:start,max,end,start表示512個page中可以分配出去page的起始索引下標,
  // end表示512個page中可以分配出去page的結束索引下標,max表示這512個page中連續爲0(即未分配出去)
  // 一片區域最大有多少個page
  if max := s.summary[len(s.summary)-1][i].max(); max >= uint(npages) {
   // 找到可以分配的起始page是第j個page
   j, searchIdx := s.chunkOf(i).find(npages, chunkPageIndex(s.searchAddr))
   if j < 0 {
    print("runtime: max = ", max, ", npages = ", npages, "\n")
    print("runtime: searchIdx = ", chunkPageIndex(s.searchAddr)", s.searchAddr = ", hex(s.searchAddr)"\n")
    throw("bad summary data")
   }
   // 根據chunkIndex和chunk內的偏移量j計算出實際的地址
   addr = chunkBase(i) + uintptr(j)*pageSize
   searchAddr = chunkBase(i) + uintptr(searchIdx)*pageSize
   goto Found
  }
 }
 
 // 走到這裏,說明上面從s.searchAddr中未分配成功,接下來會從s.summary管理的page中
 // 通過遍歷找到一個有npages頁面的空閒區域
 addr, searchAddr = s.find(npages)
 if addr == 0 {
  // 還是沒有分配成功
  if npages == 1 {
   s.searchAddr = maxSearchAddr
  }
  return 0, 0
 }
Found:
 
 scav = s.allocRange(addr, npages)
 if s.compareSearchAddrTo(searchAddr) > 0 {
  s.searchAddr = searchAddr
 }
 return addr, scav
}

pageAlloc 分配器的數據結構和基本工作原理在組件分配部分已有介紹,這裏主要是對 alloc 函數做一個詳細分析,相信大家都能看懂。

如上圖所示,pageAlloc 將整個虛擬地址空間(2^48)劃分成塊來管理,爲了方便快速找到可分配的內存,pageAlloc.summary 用二維數組表示分配的摘要信息,此數組的第一層有 5 級,對應上圖的 L0 到 L4, 可以看到 L0 層每個塊的大小爲 16G,L1 層每個塊的大小爲 2G,依次下一層每個塊是上一層的 1/8 大小,直到最後一層 L4 每個塊大小爲 4M, 我們知道每個 pageSize 爲 8KB, 也就是 L4 中每個塊有 512 個 page. 因爲每層塊的大小是不同的,所以數組的第二維大小是不同的,L0 層第二維的大小爲 2^48/2^34=2^14 個,L4 層最多,爲 2^48/2^22=2^26 個。pageAlloc.summary 數組中元素類型爲 pallocSum,它的定義爲 type pallocSum uint64, 其實就是一個 uint64 數字。一個 pallocSum 有 8 個字節,也就是有 64 個 bit。將 pallocSum 劃分成 3 部分:start,max,end. 如下圖所示。start、max、end 每一個都是位圖的摘要。64/3=21, 也就是每部分能分到 21 個 bit. 每個值的最大值可能爲 2^21 - 1,或者所有三個值都可能等於 2^21。後一種情況僅通過設置第 64 位來表示.

pallocSum 中 start、max 和 end 的含義是啥呢?表示的是塊內的 page 的分配情況。以 L4 層爲例,每個塊 chunk 大小爲 4M. 也就是有 512 個 page,編號 (索引)從 0 到 511。start 表示這 512 個 page 中從第幾個 page 是空閒的,小於 start 編號的 page 是都已分配出去了。end 表示 512 個 page 中可以分配出去 page 的結束索引編號。max 表示這 512 個 page 中連續爲 0(即未分配出去) 一片區域最大有多少個 page。

通過 pallocSum 可以快速找到空閒塊分配出去供應用程序使用,上圖中最大連續的空閒塊數即 max 的值爲 3,這時候假設要分配一個 npages 的值爲 3 的空間,因爲 max>=npages,所以肯定可以從當前的 chuck 中分配到內存。假如 npages 的值爲 4,這時候 max<npages,顯然是從這個 chuck 中找不到一個塊有 npages 頁面的區域的,所以不用一個一個遍歷當前 chuck 中的每個 page(pageAlloc.chuck 中的 pallocData.pallocBits)查找是否滿足分配要求了。

L4 中每個 chuck 大小是 4M, 對應 512 個 page, 所以用 9 個 bit 就可以描述這 512 個 page 的分配還是空閒清,而 pallocSum 每部分是 21 個 bit,是完全夠用的。因爲 L0 層每個塊最大,那 21 個 bit 要能描述 L0 中塊的分配情況纔行。L0 中每個塊爲 16G,每個 page 爲 8KB, 所以需要 2^34/2^13=2^21 個數,而 21bit 能表示的數最大恰好爲 2^21-1,因此是滿足的。

下面分析內存地址和這裏塊的關係,有兩個問題?

  1. 對於給定的地址 addr,怎麼知道這個 addr 在哪個塊中;

  2. 從某個塊中找到了空閒的 page, 怎麼得到空閒 page 對應的內存地址 addr.

因爲對整個虛擬內存空間採用的是平坦劃分,所以對於任意給定的地址 addr, 除以塊的大小,就可以定位到這個地址屬於哪個塊。對於 L4 來說,每個塊爲 4M, addr/4M 就可以得到它在哪個塊中, 然後 addr%4M/8KB 就可以定位到它屬於塊中的哪個 page. 對於給定的 page,採用逆向運算乘法就可以計算得到 page 對應的內存地址。假設空閒的 page 是第 i 個塊的中的第 j 個頁面。對於 L4 來說,它對於的地址計算方法爲 addr=i * 4M+j * 8KB=i*(512*8KB)+j * 8KB=i * (512 * pageSize)+j * pageSize.

看懂這裏的分析,在結合上面的代碼中的註解,理解 pageAlloc.alloc 方法就很容易了。

下面講解 pageAllo.alloc 在分配失敗時的處理,如果 alloc 分配失敗,需要從操作系統新申請一片空間,然後繼續走 alloc 進行分配。申請新空間處理邏輯在下面的 grow 方法,分爲 2 個操作步驟:

  1. 從操作系統申請 64M 整數倍的內存空間,新申請到的內存從哪裏開始,具體大小是多少等元數據信息保存在 h.curArena 中。

  2. 從剛分配的 arena 內存中切分出 npages 大小的內存分給 pageAlloc,因爲 pageAlloc 中缺少可供分配的頁了。

// grow 嘗試操作系統至少申請npage頁的內存添加到堆中
func (h *mheap) grow(npage uintptr) bool {
 //alignUp將npage向上舍入爲512的倍數,也就是說npage<512,也按512個算
 // 512<npages<2014時,按1024個算
 ask := alignUp(npage, pallocChunkPages) * pageSize

 totalGrowth := uintptr(0)
 nBase := alignUp(h.curArena.base+ask, physPageSize)
 // h.curArena上的內存不夠,從操作系統申請更多的內存
 if nBase > h.curArena.end {
  // 向操作系統申請,ask爲4M的整數倍
  av, asize := h.sysAlloc(ask)
  if av == nil {
   print("runtime: out of memory: cannot allocate ", ask, "-byte block (", memstats.heap_sys, " in use)\n")
   return false
  }

  // 新分配的內存與之前的是連續的,直接擴展h.curArena的end值
  if uintptr(av) == h.curArena.end {
   h.curArena.end = uintptr(av) + asize
  } else {
   // 新空間是不連續的。跟蹤當前空間的剩餘部分,並且將記錄新申請的空間
   if size := h.curArena.end - h.curArena.base; size != 0 {
    h.pages.grow(h.curArena.base, size)
    totalGrowth += size
   }
   // 將新申請的空間記錄在h.curArena中
   h.curArena.base = uintptr(av)
   h.curArena.end = uintptr(av) + asize
  }

  // 統計分配信息,這裏不關心
  mSysStatInc(&memstats.heap_released, asize)
  mSysStatInc(&memstats.heap_idle, asize)

  // 重新計算base的值
  nBase = alignUp(h.curArena.base+ask, physPageSize)
 }

 v := h.curArena.base
 // 從nBase到h.curArea.end的空間是還未分配給pageAlloc的
 h.curArena.base = nBase
 // 從v到nBase的這一片空間分配給pageAlloc
 h.pages.grow(v, nBase-v)
 totalGrowth += nBase - v

    ...
 return true
}

向操作系統申請內存空間調用的是 sysAlloc 函數,在 linux 系統上具體調用的是 sysReserve 函數,傳入給此函數的是從哪裏分配,分配的空間大小是多少。如果分配失敗,還會嘗試 sysReserveAligned 進行分配,該函數與 sysReserve 類型,不同是由內核給我們的任何充分對齊的地址。

// sysAlloc向操心繫統申請內存,申請的大小爲64M的整數倍
func (h *mheap) sysAlloc(n uintptr) (v unsafe.Pointer, size uintptr) {
 n = alignUp(n, heapArenaBytes)

 // 32位系統會走這裏的邏輯,本文分析64位系統,這裏可以忽略
 v = h.arena.alloc(n, heapArenaBytes, &memstats.heap_sys)
 if v != nil {
  size = n
  goto mapped
 }
 // 64位系統會走這裏,arenaHints 是嘗試添加更多堆區域的地址列表。
 // 這最初由一組通用提示地址填充,並隨着實際堆區域範圍的邊界而增長
 for h.arenaHints != nil {
  hint := h.arenaHints
  p := hint.addr
  if hint.down {
   p -= n
  }
  if p+n < p {
   v = nil
  } else if arenaIndex(p+n-1) >= 1<<arenaBits {
   v = nil
  } else {
   // 調用sysReserve向操作系統申請內存
   v = sysReserve(unsafe.Pointer(p), n)
  }
  if p == uintptr(v) {
   // 成功分配到內存
   if !hint.down {
    p += n
   }
   hint.addr = p
   size = n
   break
  }
  
  if v != nil {
   sysFree(v, n, nil)
  }
  h.arenaHints = hint.next
  h.arenaHintAlloc.free(unsafe.Pointer(hint))
 }

 if size == 0 {
  if raceenabled {
   throw("too many address space collisions for -race mode")
  }

  // 走到這裏,說明上面的sysReserve(unsafe.Pointer(p), n)分配失敗了,
  // 注意這個分配是從地址p處分配n字節大小的空間,接下來嘗試採用內核給我們的任何充分對齊的地址
  // 調用sysReserveAligned分配內存沒有指定從哪裏開始分配,由內核給我們的任何充分對齊的地址
  v, size = sysReserveAligned(nil, n, heapArenaBytes)
  if v == nil {
   // 還是分配失敗,只能失敗了
   return nil, 0
  }

  // Create new hints for extending this region.
  // 採用頭插法將hint插入到mheap_.arenaHints鏈表中
  hint := (*arenaHint)(h.arenaHintAlloc.alloc())
  hint.addr, hint.down = uintptr(v)true
  hint.next, mheap_.arenaHints = mheap_.arenaHints, hint
  hint = (*arenaHint)(h.arenaHintAlloc.alloc())
  hint.addr = uintptr(v) + size
  hint.next, mheap_.arenaHints = mheap_.arenaHints, hint
 }

 ...

 if uintptr(v)&(heapArenaBytes-1) != 0 {
  throw("misrounded allocation in sysAlloc")
 }

 // 將申請的內存從預留過渡到準備的狀態
 sysMap(v, size, &memstats.heap_sys)

mapped:
 // 下面是創建管理剛分配的內存的元數據信息,h.arenas記錄了分配的arena內存的對應到哪個arena塊等信息
 for ri := arenaIndex(uintptr(v)); ri <= arenaIndex(uintptr(v)+size-1); ri++ {
  l2 := h.arenas[ri.l1()]
  if l2 == nil {
   // 分配l2,通過persistentalloc分配器進行分配,該分配器分配的內存是永久的
   l2 = (*[<< arenaL2Bits]*heapArena)(persistentalloc(unsafe.Sizeof(*l2), sys.PtrSize, nil))
   if l2 == nil {
    throw("out of memory allocating heap arena map")
   }
   atomic.StorepNoWB(unsafe.Pointer(&h.arenas[ri.l1()]), unsafe.Pointer(l2))
  }

  if l2[ri.l2()] != nil {
   throw("arena already initialized")
  }
  var r *heapArena
  r = (*heapArena)(h.heapArenaAlloc.alloc(unsafe.Sizeof(*r), sys.PtrSize, &memstats.gc_sys))
  if r == nil {
   // 嘗試永久分配heapArena
   r = (*heapArena)(persistentalloc(unsafe.Sizeof(*r), sys.PtrSize, &memstats.gc_sys))
   if r == nil {
    throw("out of memory allocating heap arena metadata")
   }
  }

  ...
 }

    ...

 return
}

至此,整個大對象(>32KB)的分配流程已經分析完,下面用一個流程圖對整個過程進行一個梳理,便於我們從宏觀上掌握分配的關鍵流程。

小對象分配

小對象是指大小在 [16B,32KB] 範圍或者小於 16B 但是包含指針的對象。小對象的分配也是走的 mallocgc 函數,下面的代碼摘錄了小對象的分配的邏輯。概括起來小對象的分配分爲三個步驟:

  1. 創建 spanclass, 根據分配對象的大小,結合 size_to_class8 或 size_to_class128 表,得到對象的 sizeclass, 然後根據 sizeclass 以及對象是否包含指針計算得到 spanclass

  2. 拿到 1 中計算得到的 spanclass 從 mcache 保存的對應規格的 span,然後從 span 對象中獲取一個空閒的 object

  3. 如果 2 中獲取失敗,則會從 mcentral 獲取一個新的 span 對象,加入到 mcache 中,然後在從剛申請的 span 中分配一個空的 object

    3.1.  在步驟 3 中又分爲 2 個小步驟,先從 mcentral 中找到一個可用的 span,mcentral 維護了 2 個 span 鏈表 mSpanList,分別是 nonempty 和 empty, 先從 nonempty 鏈表中查找是否有可用的 span, 如果沒有在查找 empty 鏈表 3.2.  如果 3.1 中沒有找到可用的 span, 則會通過 mheap 分配一個新的 span 加入到 central 的 empty 鏈表,並將此 span 給到 mcache,最後從剛申請的 span 中分配一個空的 object。

func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
 ...
 mp := acquirem()
 if mp.mallocing != 0 {
  throw("malloc deadlock")
 }
 if mp.gsignal == getg() {
  throw("malloc during signal")
 }
 // 設置mp在分配內存的標準位,阻止被GC回收
 mp.mallocing = 1

 shouldhelpgc := false
 dataSize := size
 c := gomcache()
 var x unsafe.Pointer
 // 申請的對象是否包含指針
 noscan := typ == nil || typ.ptrdata == 0
 // 小對象,<=32KB
 if size <= maxSmallSize {
  // 小於16B且不含指針對象分配
   ...
  } else {
   //含有指針 或要分配的內存在[16B,32KB]範圍內,走這裏的小對象分配
   // 首先根據分配對象的大小size決定它的sizeclass,然後在根據sizeclass
   // 確定它的spanClass
   var sizeclass uint8
   // size<=1024-8=1016
   if size <= smallSizeMax-8 {
    sizeclass = size_to_class8[(size+smallSizeDiv-1)/smallSizeDiv]
   } else {
    sizeclass = size_to_class128[(size-smallSizeMax+largeSizeDiv-1)/largeSizeDiv]
   }
   size = uintptr(class_to_size[sizeclass])
   // 根據sizeclass計算spanClass, 計算公式爲 sizeclass*2+(noscan?1:0)
   // noscan表示分配的對象中是否含有指針,如果有指針,spanClass爲sizeclass*2
   // 如果沒有指針spanClass爲sizeclass*2+1
   spc := makeSpanClass(sizeclass, noscan)
   // 從mcache中分配一個span對象
   span := c.alloc[spc]
   // 嘗試從span對象中快速找到一個空的object
   v := nextFreeFast(span)
   if v == 0 {
    // 分配失敗,可能需要從mcentral或者mheap中獲取,如果從mcentral或者mheap獲取了
    // 新的span,shouldhelpgc會被設置爲true,表示會在下面的判斷是否需要觸發GC
    v, span, shouldhelpgc = c.nextFree(spc)
   }
   x = unsafe.Pointer(v)
   if needzero && span.needzero != 0 {
    memclrNoHeapPointers(unsafe.Pointer(v), size)
   }
  }
 } else { 
    // >32KB對象的內存分配
  ...
 }
    ...
 return x
}

nextFreeFast 快速從 mspan 中找到一個空閒的 object,傳入的 mspan 來至 mcache,mcache 中有一個 * mspan 數組 alloc,緩存了各個規格 (span class) 的 span 對象,每個 span 管理的 page 已按預定的 size 切分好了。

// nextFreeFast快速從mspan中找到下一個空閒的object,沒有的話直接返回
func nextFreeFast(s *mspan) gclinkptr {
 // 獲取allocCache二進制中尾部0的個數,也就是獲取第一個非0的bit是第幾個ibt
 // 在alloCache中1表示未分配,0表示已分配
 theBit := sys.Ctz64(s.allocCache) // Is there a free object in the allocCache?
 // 找到未分配的元素
 if theBit < 64 {
  result := s.freeindex + uintptr(theBit)
  // 索引值要小於元素的數量
  if result < s.nelems {
   // 下一個freeidx即爲當前分配的result位置的下一個
   freeidx := result + 1
   if freeidx%64 == 0 && freeidx != s.nelems {
    return 0
   }
   // 對allocCache進行右移,將右邊(低位)已分配的bit值(爲0)移除掉
   // 方便下一次快速算出allocache二級制中尾部0的個數
   s.allocCache >>= uint(theBit + 1)
   s.freeindex = freeidx
   // 分配出去的元素數量+1
   s.allocCount++
   // 根據偏移量計算得到分配出去的內存地址
   return gclinkptr(result*s.elemsize + s.base())
  }
 }
 return 0
}

nextFree 也是分配一個 span 對象,當上面的 nextFreeFast 分配 object 失敗時,會調用下面的 nextFree 方法,該方法會嘗試從 mcentral 中申請一個 mspan, 如果申請成功會加入到 mcache 中,並從中分配一個 object.

func (c *mcache) nextFree(spc spanClass) (v gclinkptr, s *mspan, shouldhelpgc bool) {
 // 獲取一個目標規格的span
 s = c.alloc[spc]
 shouldhelpgc = false
 freeIndex := s.nextFreeIndex()
 // span中的所有元素是否已分配完,如果多已被分配,則需要獲取新的span
 if freeIndex == s.nelems {
  // The span is full.
  if uintptr(s.allocCount) != s.nelems {
   println("runtime: s.allocCount=", s.allocCount, "s.nelems=", s.nelems)
   throw("s.allocCount != s.nelems && freeIndex == s.nelems")
  }
  // 申請新的span
  c.refill(spc)
  // 設置是否需要執行GC的標識爲true,即需要檢查
  shouldhelpgc = true
  s = c.alloc[spc]

  freeIndex = s.nextFreeIndex()
 }

 if freeIndex >= s.nelems {
  throw("freeIndex is not valid")
 }

 // 計算對象的起始地址:s.base()爲span的首地址,freeIndex爲偏移量,每個對象的大小爲s.elemsize
 // 所以freeIndex*s.elemsize爲偏移後的地址,加上s.base()爲真正的起始地址(相對於堆空間)
 v = gclinkptr(freeIndex*s.elemsize + s.base())
 // 分配的元素+1
 s.allocCount++
 if uintptr(s.allocCount) > s.nelems {
  println("s.allocCount=", s.allocCount, "s.nelems=", s.nelems)
  throw("s.allocCount > s.nelems")
 }
 return
}

refill 爲 mcache 獲取一個新的指定規格的 span 對象。此 span 對象中至少有一個空閒對象.

// refill爲mcache獲取一個新的指定規格的span對象。此span對象中至少有一個空閒對象。
func (c *mcache) refill(spc spanClass) {
 s := c.alloc[spc]

 // 當前span中還存在沒有分配出去的對象,即span中還有空閒剩餘的對象,拋出異常
 if uintptr(s.allocCount) != s.nelems {
  throw("refill of span with free space remaining")
 }
 if s != &emptymspan {
  // 將此span標記爲不再緩存,新分配的span的sweepgen值爲mheap_.sweepgen+3
  if s.sweepgen != mheap_.sweepgen+3 {
   throw("bad sweepgen in refill")
  }
  atomic.Store(&s.sweepgen, mheap_.sweepgen)
 }

 // 從mcentral獲取一個新的span
 s = mheap_.central[spc].mcentral.cacheSpan()
 if s == nil {
  throw("out of memory")
 }

 if uintptr(s.allocCount) == s.nelems {
  throw("span has no free space")
 }

 //指示此跨度已緩存並防止在下一個掃描階段進行異步掃描
 // mspan.sweepgen=mheap.sweepgen-2 表示還未被清掃,mspan.sweepgen=mheap.sweepgen-1 
 // 表示已經正處於清掃中
 s.sweepgen = mheap_.sweepgen + 3

 // 將新的span s 放入到mcache中
 c.alloc[spc] = s
}

cacheSpan 從 mcentral 分配一個可用的 span 給 mcache, 每個 mcentral 有兩個 mSpanList, 分別爲 nonempty 和 empty。mSpanList 是 span 對象的鏈表。先從 nonempty 分配可用的 span, 會做一些清理操作。如果沒有分配成功,接下來會從 empty 中分配。如果從 nonempty 和 empty 鏈表中都沒有找到可用的 span,則需要從 mheap 中進行分配。

// cacheSpan分配一個可用的span給mcache
func (c *mcentral) cacheSpan() *mspan {
 ...
 sg := mheap_.sweepgen
retry:
 var s *mspan
 // 首先從nonempty鏈表中查找,nonempty鏈表表示確定該span最少有一個未分配的元素
 for s = c.nonempty.first; s != nil; s = s.next {
  // sweepgen每次GC都會增加2,如果s.sweepgen==mheap_.sweepgen表示span已經清掃過
  // s.sweepgen==mheap_.sweepgen-1表示span正在被清理
  // s.sweepgen==mheap_.sweepgen-2表示span等待被清理

  // 如果span等待被清理,嘗試原子修改s.sweepgen爲mheap_.sweepgen-1
  if s.sweepgen == sg-2 && atomic.Cas(&s.sweepgen, sg-2, sg-1) {
   // 修改成功,則把該span移動到empty鏈表,然後執行清理操作,此時擁有了一個span
   c.nonempty.remove(s)
   c.empty.insertBack(s)
   unlock(&c.lock)
   s.sweep(true)
   goto havespan
  }
  // 如果此span正在被其他線程清理,直接跳過
  if s.sweepgen == sg-1 {
   continue
  }
  
  // 該span已經被清理過,並且該span在nonempty鏈表中,也就是此span中最少有一個未分配的元素,
  // 這裏可以直接使用它
  c.nonempty.remove(s)
  c.empty.insertBack(s)
  unlock(&c.lock)
  goto havespan
 }

 // 走到這裏說明從上面的nonempty鏈表中沒有找到span,接下來查找empty鏈表
 for s = c.empty.first; s != nil; s = s.next {
  if s.sweepgen == sg-2 && atomic.Cas(&s.sweepgen, sg-2, sg-1) {
  
   // 將span s從empty中移除
   c.empty.remove(s)
   
   // 將span s重新插入到empty鏈表尾部
   c.empty.insertBack(s)
   unlock(&c.lock)
   // 執行清理操作
   s.sweep(true)
   freeIndex := s.nextFreeIndex()
   // 對s嘗試進行清理之後,如果裏面有未分配的對象,則可以使用該span
   if freeIndex != s.nelems {
    s.freeindex = freeIndex
    goto havespan
   }
   lock(&c.lock)
  
   goto retry
  }
  // 如果此span正在被其他goroutine清理,直接跳過
  if s.sweepgen == sg-1 {
   // the span is being swept by background sweeper, skip
   continue
  }
  // 走這裏說明s.sweepgen == sg,即span s已經被清理過,因爲清理過的span放在empty的尾部
  // 所以不用繼續循環了,後面的也是被清理過的,跳出循環
  break
 }
 if trace.enabled {
  traceGCSweepDone()
  traceDone = true
 }
 unlock(&c.lock)

 // 從nonempty和empty鏈表中都沒有找到可用的span,則需要從mheap中進行分配
 // 分配完之後加入到empty鏈表中
 s = c.grow()
 if s == nil {
  return nil
 }
 lock(&c.lock)
 c.empty.insertBack(s)
 unlock(&c.lock)

havespan:
 if trace.enabled && !traceDone {
  traceGCSweepDone()
 }
 n := int(s.nelems) - int(s.allocCount)
 if n == 0 || s.freeindex == s.nelems || uintptr(s.allocCount) == s.nelems {
  throw("span has no free objects")
 }
 // 統計span中未分配的元素數量(n), 加入到mcentral.nmalloc中
 // 統計span中未分配的元素總大小,加入到memstats.heap_live中
 atomic.Xadd64(&c.nmalloc, int64(n))
 usedBytes := uintptr(s.allocCount) * s.elemsize
 atomic.Xadd64(&memstats.heap_live, int64(spanBytes)-int64(usedBytes))
 if trace.enabled {
  traceHeapAlloc()
 }
 if gcBlackenEnabled != 0 {
  gcController.revise()
 }
    // 根據freeindex更新allocCache
 freeByteBase := s.freeindex &(64 - 1)
 whichByte := freeByteBase / 8
 
 s.refillAllocCache(whichByte)
 s.allocCache >>= s.freeindex % 64

 return s
}

grow 方法從 mheap 中申請 mspan 對象,具體分配調用的是 mheap_.alloc,mheap_.alloc 的處理分析見前面大對象的分配中已做了詳細的解析。

func (c *mcentral) grow() *mspan {
 // 根據mcentral的類型(spanclass)計算需要申請的span的大小和可以保存的元素的數量
 npages := uintptr(class_to_allocnpages[c.spanclass.sizeclass()])
 size := uintptr(class_to_size[c.spanclass.sizeclass()])
 // 向mheap申請一個新的span, mheap_.alloc在大對象的分配中有詳細分析
 s := mheap_.alloc(npages, c.spanclass, true)
 if s == nil {
  return nil
 }

 n := (npages << _PageShift) >> s.divShift * uintptr(s.divMul) >> s.divShift2
 s.limit = s.base() + size*n
 // 分配並初始化span的allocBits和gcmarkBits
 heapBitsForAddr(s.base()).initSpan(s)
 return s
}

到此,小對象 [16B,32KB] 的分配流程已經分析完,下面用一個流程圖對整個過程進行一個梳理,便於我們從宏觀上掌握小對象分配的關鍵流程。

微對象分配

微對象指分配的對象小於 16B,且不含指針。微對象採用 tiny 分配器進行分配。在分配的時候,如果分配器中保存的空間夠分配,就從分配器中分配,否則從 span 中取一個 16byte 的對象,然後對其進行切分,一部分分配出去剩下的部分保存着,供下一次分配。剩下的部分保存在 tiny 分配器中,mcache 中有個一個 tiny 分配器。從 mcache 中分配 span 以及從 span 中申請空閒對象的邏輯與分配小對象一樣,在上面分配小對象流程分析中已說明,此處不再重複。下面主要分析 tiny 分配器的分配原理。在下面微對象分配的過程中,object 對象已分配 12B,還剩 4B,接下來申請一個 2B 的內存空間,剩餘的空間還夠,繼續從 4B 中分出去 2B 的空間,如下圖 2 所示。

當前的分配情況如上圖所示,接下來申請一個 8B 的內存空間,當前 tiny 分配器中只剩下 4B 空間,不夠分配,則從 mcache 中取一個 16 空 object,從裏面切出來 8B 分配出去,還剩 8B 保存在 tiny 分配器中,供後續分配使用。

func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
 ...
 mp := acquirem()
 if mp.mallocing != 0 {
  throw("malloc deadlock")
 }
 if mp.gsignal == getg() {
  throw("malloc during signal")
 }
 // 設置mp在分配內存的標準位,阻止被GC回收
 mp.mallocing = 1

 shouldhelpgc := false
 dataSize := size
 c := gomcache()
 var x unsafe.Pointer
 // 申請的對象是否包含指針
 noscan := typ == nil || typ.ptrdata == 0
 if size <= maxSmallSize {
  if noscan && size < maxTinySize {
  // 分配的對象小於16B,且不含指針,稱之它爲微對象,採用tiny分配器進行分配
  // 因爲span中最小元素的大小爲8byte,如果分配的對象很小,例如分配2字節,就有6個字節空間
  // 被浪費掉,這裏對小對象做了特殊處理,用tiny分配器進行分配。微對象大小整合在tinyClass span中
  // span中每個對象的大小爲16byte.在分配的時候,取一個16byte的對象,然後對其進行切分,一部分分配出去
  // 切分剩下的部分保存着,供下一次分配。這個過程就是tiny分配器做的事情。
   off := c.tinyoffset
   // Align tiny pointer for required (conservative) alignment.
   // 做點對其處理
   if size&7 == 0 { // size小於16爲8的倍數
    off = alignUp(off, 8)
   } else if size&3 == 0 { //size小於16且爲4的倍數
    off = alignUp(off, 4)
   } else if size&1 == 0 { //size小於16且爲2的倍數
    off = alignUp(off, 2)
   }
   // 偏移off加上當前分配的大小小於等於16B,說明之前切分剩下的還夠分配,就從之前剩下的
   // 裏面切分出size個byte
   if off+size <= maxTinySize && c.tiny != 0 {
    x = unsafe.Pointer(c.tiny + off)
    c.tinyoffset = off + size
    // 分配的小對象數+1
    c.local_tinyallocs++
    // 取消mp正在分配的的標誌
    mp.mallocing = 0
    releasem(mp)
    return x
   }
  
   // 從mcache中申請一個span
   span := c.alloc[tinySpanClass]
   // 從span中找到一個空的object,這個object的大小爲16byte
   v := nextFreeFast(span)
   if v == 0 {
    // 嘗試從mcentral中分配
    v, _, shouldhelpgc = c.nextFree(tinySpanClass)
   }
   x = unsafe.Pointer(v)
   // 將申請的16byte的地址空間清理爲0
   (*[2]uint64)(x)[0] = 0
   (*[2]uint64)(x)[1] = 0
   // 更新分配器中的字段,這裏的意思,如果要分配的對象大小size比之前分配的要小
   // 即這次分配的object切分後剩下的空間比之前的要大,則保存這次剩下的空間
   // 實際效果比較這次分配剩下的空間和前次分配剩下空間,那個大保留那個。
   if size < c.tinyoffset || c.tiny == 0 {
    c.tiny = uintptr(x)
    c.tinyoffset = size
   }
   size = maxTinySize
  } else {
          // 小對象分配
     ...
  }
 } else { 
  // 大對象分配,>32KB對象的內存分配
        ...
 }
    ...
 return x
}

還是照舊,對微對象分配畫一個流程圖,如下圖所示。對源碼細節不感興趣的可以看這張流程圖,從整體瞭解分配流程。

到這裏,大對象、小對象和微對象的所有分配細節已經分析完,下面對 Go 對象分配整體做一個概覽,得到下面的全局分配流程圖,方便我們以後回顧。

Golang 源碼探索 (三) GC 的實現原理 [1] 詳解 Go 語言的內存模型及堆的分配管理 [2] 內存分配 [3] 內存分配器 [4]

Reference

[1]

Golang 源碼探索 (三) GC 的實現原理: https://www.cnblogs.com/zkweb/p/7880099.html

[2]

詳解 Go 語言的內存模型及堆的分配管理: https://zhuanlan.zhihu.com/p/76802887

[3]

內存分配: https://golang.design/under-the-hood/zh-cn/part2runtime/ch07alloc/basic/#mheap

[4]

內存分配器: https://draveness.me/golang/docs/part3-runtime/ch07-memory/golang-memory-allocator/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YwBQ731mcQ-y-3FiHu4crA