Golang 垃圾回收源碼走讀
0 前言
近期在和大家一起探討 Golang 內存管理機制相關的內容.
此前分別介紹了 Golang 內存模型及分配機制和 Golang 的垃圾回收原理有關的內容. 本篇會基於源碼走讀的方式,對 Golang 垃圾回收的理論進行論證和補充. 本文走讀的源碼版本爲 Golang 1.19.
由於內容之間具有強關聯性,建議大家先完成前兩篇內容的閱讀,再開啓本篇的學習.
1 源碼導讀
1.1 源碼框架
首先給出整體的源碼走讀框架,供大家總覽全局,避免暈車.
1.2 文件位置
GC 中各子流程聚焦於不同源碼文件中,目錄供大家一覽,感興趣可以連貫閱讀.
2 觸發 GC
下面順沿源碼框架,開啓走讀流程. 本章首先聊聊,GC 階段是如何被觸發啓動的.
2.1 觸發 GC 類型
觸發 GC 的事件類型可以分爲如下三種:
在觸發 GC 時,會通過 gcTrigger.test 方法,結合具體的觸發事件類型進行觸發條件的校驗,校驗條件展示於上表,對應的源碼如下:
type gcTriggerKind int
const (
// 根據堆分配內存情況,判斷是否觸發GC
gcTriggerHeap gcTriggerKind = iota
// 定時觸發GC
gcTriggerTime
// 手動觸發GC
gcTriggerCycle
}
func (t gcTrigger) test() bool {
// ...
switch t.kind {
case gcTriggerHeap:
// ...
trigger, _ := gcController.trigger()
return atomic.Load64(&gcController.heapLive) >= trigger
case gcTriggerTime:
if gcController.gcPercent.Load() < 0 {
return false
}
lastgc := int64(atomic.Load64(&memstats.last_gc_nanotime))
return lastgc != 0 && t.now-lastgc > forcegcperiod
case gcTriggerCycle:
// ...
return int32(t.n-work.cycles) > 0
}
return true
}
2.2 定時觸發 GC
定時觸發 GC 的源碼方法及文件如下表所示:
(1)啓動定時觸發協程並阻塞等待
runtime 包初始化的時候,即會異步開啓一個守護協程,通過 for 循環 + park 的方式,循環阻塞等待被喚醒.
當被喚醒後,則會調用 gcStart 方法進入標記準備階段,嘗試開啓新一輪 GC,此時觸發 GC 的事件類型正是 gcTriggerTime(定時觸發).
在 gcStart 方法內部,還會通過 gcTrigger.test 方法進一步校驗觸發 GC 的條件是否滿足,留待第 3 章再作展開.
// runtime 包下的全局變量
var forcegc forcegcstate
type forcegcstate struct {
lock mutex
g *g
idle uint3
func init() {
go forcegchelper()
}
func forcegchelper() {
forcegc.g = getg()
lockInit(&forcegc.lock, lockRankForcegc)
for {
lock(&forcegc.lock)
// ...
atomic.Store(&forcegc.idle, 1)
// 令 forcegc.g 陷入被動阻塞,g 的狀態會設置爲 waiting,當達成 gc 條件時,g 的狀態會被切換至 runnable,方法纔會向下執行
goparkunlock(&forcegc.lock, waitReasonForceGCIdle, traceEvGoBlock, 1)
// g 被喚醒了,則調用 gcStart 方法真正開啓 gc 主流程
gcStart(gcTrigger{kind: gcTriggerTime, now: nanotime()})
}
}
(2)喚醒定時觸發協程
runtime 包下的 main 函數會通過 systemstack 操作切換至 g0(g0 是 Golang GMP 模型中的概念,如有疑惑,可參見我之前的文章:Golang GMP 原理),並調用 sysmon 方法,輪詢嘗試將 forcegchelper 協程添加到 gList 中,並在 injectglist 方法內將其喚醒:
func main() {
// ...
systemstack(func() {
newm(sysmon, nil, -1)
})
// ...
}
func sysmon() {
// ...
for {
// 通過 gcTrigger.test 方法檢查是否需要發起 gc,觸發類型爲 gcTriggerTime:定時觸發
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
// 需要發起 gc,則將 forcegc.g 注入 list 中, injectglist 方法內部會執行喚醒操作
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
// ...
}
}
(3)定時觸發 GC 條件校驗
在 gcTrigger.test 方法中,針對 gcTriggerTime 類型的觸發事件,其校驗條件則是觸發時間間隔達到 2 分鐘以上.
// 單位 nano,因此實際值爲 120s = 2min
var forcegcperiod int64 = 2 * 60 * 1e9
func (t gcTrigger) test() bool {
// ...
switch t.kind {
// ...
// 每 2 min 發起一輪 gc
case gcTriggerTime:
// ...
lastgc := int64(atomic.Load64(&memstats.last_gc_nanotime))
return lastgc != 0 && t.now-lastgc > forcegcperiod
// ...
}
return true
}
2.3 對象分配觸發 GC
該流程源碼方法及文件如下表所示:
在分配對象的 malloc 方法中,倘若滿足如下兩個條件之一,都會發起一次觸發 GC 的嘗試:
-
需要初始化一個大小超過 32KB 的大對象
-
待初始化對象在 mcache 中對應 spanClass 的 mspan 空間已用盡(如對有關概念存在疑惑,請優先閱讀我的文章:Golang 內存模型與分配機制)
此時觸發事件類型爲 gcTriggerHeap,並在調用 gcStart 方法的內部執行 gcTrigger.test 進行條件檢查.
(1)對象分配觸發 GC
mallocgc 是分配對象的主流程方法:
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
// ...
shouldhelpgc := false
// ...
if size <= maxSmallSize {
if noscan && size < maxTinySize {
// ...
if v == 0 {
// 倘若 mcache 中對應 spanClass 的 mspan 已滿,置 true
v, span, shouldhelpgc = c.nextFree(tinySpanClass)
}
// ...
} else {
// ...
if v == 0 {
// 倘若 mcache 中對應 spanClass 的 mspan 已滿,置 true
v, span, shouldhelpgc = c.nextFree(spc)
}
// ...
}
} else {
// 申請大小大於 32KB 的大對象,直接置爲 true
shouldhelpgc = true
// ...
}
// ...
// 嘗試觸發 gc,類型爲 gcTriggerHeap,觸發校驗邏輯同樣位於 gcTrigger.test 方法中
if shouldhelpgc {
if t := (gcTrigger{kind: gcTriggerHeap}); t.test() {
gcStart(t)
}
}
// ...
}
(2)校驗 GC 觸發條件
在 gcTrigger.test 方法中,針對 gcTriggerHeap 類型的觸發事件,其校驗條件是判斷當前堆已使用內存是否達到閾值. 此處的堆內存閾值會在上一輪 GC 結束時進行設定,具體內容將在本文 6.4 小節詳細討論.
func (t gcTrigger) test() bool {
// ...
switch t.kind {
case gcTriggerHeap:
trigger, _ := gcController.trigger()
// 倘若堆中已使用的內存大小達到了閾值,則會真正執行 gc
return atomic.Load64(&gcController.heapLive) >= trigger
// ...
}
return true
}
2.3 手動觸發 GC
最後一種觸發的 GC 形式是手動觸發,入口位於 runtime 包的公共方法:runtime.GC
用戶手動觸發 GC 時,事件類型爲 gcTriggerCycle.
func GC() {
// ...
gcStart(gcTrigger{kind: gcTriggerCycle, n: n + 1})
// ...
}
針對這種類型的校驗條件是,上一輪 GC 已經完成,此時能夠開啓新一輪 GC 任務.
func (t gcTrigger) test() bool {
// ...
switch t.kind {
// ...
case gcTriggerCycle:
return int32(t.n-work.cycles) > 0
}
return true
}
3 標記準備
本章開始步入標記準備階段的內容探討中,本章會揭祕屏障機制以及 STW 的底層實現,所涉及的源碼方法及文件位置如下表所示:
3.1 主流程
gcStart 是標記準備階段的主流程方法,方法中完成的工作包括:
-
再次檢查 GC 觸發條件是否達成
-
異步啓動對應於 P 數量的標記協程
-
Stop the world
-
控制標記協程數量和執行時長,使得 CPU 佔用率趨近 25%
-
設置 GC 階段爲 GCMark,開啓混合混合寫屏障
-
標記 mcache 中的 tiny 對象
-
Start the world
func gcStart(trigger gcTrigger) {
// ...
// 檢查是否達到 GC 條件,會根據 trigger 類型作 dispatch,常見的包括堆內存大小、GC 時間間隔、手動觸發的類型
for trigger.test() && sweepone() != ^uintptr(0) {
sweep.nbgsweep++
}
// 上鎖
semacquire(&work.startSema)
// 加鎖 double check
if !trigger.test() {
semrelease(&work.startSema)
return
}
// ...
// 由於進入了 GC 模式,會根據 P 的數量啓動多個 GC 併發標記協程,但是會先阻塞掛起,等待被喚醒
gcBgMarkStartWorkers()
// ...
// 切換到 g0,執行 Stop the world 操作
systemstack(stopTheWorldWithSema)
// ...
// 限制標記協程佔用 CPU 時間片的比例爲趨近 25%
gcController.startCycle(now, int(gomaxprocs), trigger)
// 設置GC階段爲_GCmark,啓用混合寫屏障
setGCPhase(_GCmark)
// ...
// 對 mcache 中的 tiny 對象進行標記
gcMarkTinyAllocs()
// 切換至 g0,重新 start the world
systemstack(func() {
now = startTheWorldWithSema(trace.enabled)
// ...
})
// ...
}
3.2 啓動標記協程
gcBgMarkStartWorkers 方法中啓動了對應於 P 數量的併發標記協程,並且通過 notetsleepg 的機制,使得 for 循環與 gcBgMarkWorker 內部形成聯動節奏,確保每個 P 都能分得一個 gcBgMarkWorker 標記協程.
func gcBgMarkStartWorkers() {
// 開啓對應於 P 個數標記協程,但是內部將 g 添加到全局的 pool 中,並通過 gopark 阻塞掛起
for gcBgMarkWorkerCount < gomaxprocs {
go gcBgMarkWorker()
// 掛起,等待 gcBgMarkWorker 方法中完成標記協程與 P 的綁定後喚醒
notetsleepg(&work.bgMarkReady, -1)
noteclear(&work.bgMarkReady)
gcBgMarkWorkerCount++
}
}
gcBgMarkWorker 方法中將 g 包裝成一個 node 天添加到全局的 gcBgMarkWorkerPool 中,保證標記協程與 P 的一對一關聯,並調用 gopark 方法將當前 g 掛起,等待被喚醒.
func gcBgMarkWorker() {
gp := getg()
node := new(gcBgMarkWorkerNode)
gp.m.preemptoff = ""
node.gp.set(gp)
node.m.set(acquirem())
// 喚醒外部的 for 循環
notewakeup(&work.bgMarkReady)
for {
// 當前 g 阻塞至此,直到 gcController.findRunnableGCWorker 方法被調用,會將當前 g 喚醒
gopark(func(g *g, nodep unsafe.Pointer) bool {
node := (*gcBgMarkWorkerNode)(nodep)
// ...
// 將當前 g 包裝成一個 node 添加到 gcBgMarkWorkerPool 中
gcBgMarkWorkerPool.push(&node.node)
return true
}, unsafe.Pointer(node), waitReasonGCWorkerIdle, traceEvGoBlock, 0)
// ...
}
}
3.3 Stop the world
gcStart 方法在調用 gcBgMarkStartWorkers 方法異步啓動標記協程後,會執行 STW 操作停止所有用戶協程,其實現位於 stopTheWorldWithSema 方法,核心點如下:
-
取鎖:sched.lock
-
將 sched.gcwaiting 標識置爲 1,後續的調度流程見其標識,都會阻塞掛起
-
搶佔所有 g,並將 P 的狀態置爲 syscall
-
將所有 P 的狀態改爲 stop
-
倘若部分任務無法搶佔,則等待其完成後再進行搶佔
-
調用方法 worldStopped 收尾,世界停止了
func stopTheWorldWithSema() {
_g_ := getg()
// 全局調度鎖
lock(&sched.lock)
sched.stopwait = gomaxprocs
// 此標識置 1,之後所有的調度都會阻塞等待
atomic.Store(&sched.gcwaiting, 1)
// 發送搶佔信息搶佔所有 G,後將 p 狀態置爲 syscall
preemptall()
// 將當前 p 的狀態置爲 stop
_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
sched.stopwait--
// 把所有 p 的狀態置爲 stop
for _, p := range allp {
s := p.status
if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
// ...
p.syscalltick++
sched.stopwait--
}
}
// 把空閒 p 的狀態置爲 stop
now := nanotime()
for {
p, _ := pidleget(now)
if p == nil {
break
}
p.status = _Pgcstop
sched.stopwait--
}
wait := sched.stopwait > 0
unlock(&sched.lock)
// 倘若有 p 無法被搶佔,則阻塞直到將其統統搶佔完成
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
preemptall()
}
}
// native 方法,stop the world
worldStopped()
}
3.4 控制標記協程頻率
gcStart 方法中,還會通過 gcController.startCycle 方法,將標記協程對 CPU 的佔用率控制在 25% 左右. 此時,根據 P 的數量是否能被 4 整除,分爲兩種處理方式:
-
• 倘若 P 的個數能被 4 整除,則簡單將標記協程的數量設置爲 P/4
-
• 倘若 P 的個數不能被 4 整除,則通過控制標記協程執行時長的方式,來使全局標記協程對 CPU 的使用率趨近於 25%
// 目標:標記協程對CPU的使用率維持在25%的水平
const gcBackgroundUtilization = 0.25
func (c *gcControllerState) startCycle(markStartTime int64, procs int, trigger gcTrigger) {
// ...
// P 的個數 * 0.25
totalUtilizationGoal := float64(procs) * gcBackgroundUtilization
// P 的個數 * 0.25 後四捨五入取整
c.dedicatedMarkWorkersNeeded = int64(totalUtilizationGoal + 0.5)
utilError := float64(c.dedicatedMarkWorkersNeeded)/totalUtilizationGoal - 1
const maxUtilError = 0.3
// 倘若 P 的個數不能被 4 整除
if utilError < -maxUtilError || utilError > maxUtilError {
if float64(c.dedicatedMarkWorkersNeeded) > totalUtilizationGoal {
c.dedicatedMarkWorkersNeeded--
}
// 計算出每個 P 需要額外執行標記任務的時間片比例
c.fractionalUtilizationGoal = (totalUtilizationGoal - float64(c.dedicatedMarkWorkersNeeded)) / float64(procs)
// 倘若 P 的個數可以被 4 整除,則無需控制執行時長
} else {
c.fractionalUtilizationGoal = 0
}
// ...
}
3.5 設置寫屏障
隨後,gcStart 方法會調用 setGCPhase 方法,標誌 GC 正式進入併發標記(GCmark)階段. 我們觀察該方法代碼實現,可以注意到,在_GCMark 和_GCMarkTermination 階段中,會啓用混合寫屏障.
func setGCPhase(x uint32) {
atomic.Store(&gcphase, x)
writeBarrier.needed = gcphase == _GCmark || gcphase == _GCmarktermination
writeBarrier.enabled = writeBarrier.needed || writeBarrier.cgo
}
在混合寫屏障機制中,核心是會將需要置灰的對象添加到當前 P 的 wbBuf 緩存中. 隨後在併發標記缺灰、標記終止前置檢查等時機會執行 wbBufFlush1 方法,批量地將 wbBuf 中的對象釋放出來進行置灰,保證達到預期的效果.
func wbBufFlush(dst *uintptr, src uintptr) {
// ...
systemstack(func() {
wbBufFlush1(getg().m.p.ptr())
})
}
wbBufFlush1 方法中涉及了對象置灰操作,其包含了在對應 mspan 的 bitmap 中打點標記以及將對象添加到 gcw 隊列兩步. 此處先不細究,後文 4.3 小節中,我們再作詳細介紹.
func wbBufFlush1(_p_ *p) {
// 獲取當前 P 通過屏障機制緩存的指針
start := uintptr(unsafe.Pointer(&_p_.wbBuf.buf[0]))
n := (_p_.wbBuf.next - start) / unsafe.Sizeof(_p_.wbBuf.buf[0])
ptrs := _p_.wbBuf.buf[:n]
// 將緩存的指針作標記,添加到 gcw 隊列
gcw := &_p_.gcw
pos := 0
for _, ptr := range ptrs {
// ...
obj, span, objIndex := findObject(ptr, 0, 0)
if obj == 0 {
continue
}
// 打標
mbits := span.markBitsForIndex(objIndex)
if mbits.isMarked() {
continue
}
mbits.setMarked()
// ...
}
// 所有緩存對象入隊
gcw.putBatch(ptrs[:pos])
_p_.wbBuf.reset()
}
3.6 Tiny 對象標記
gcStart 方法隨後還會調用 gcMarkTinyAllocs 方法中,遍歷所有的 P,對 mcache 中的 Tiny 對象分別調用 greyobject 方法進行置灰.
func gcMarkTinyAllocs() {
assertWorldStopped()
for _, p := range allp {
c := p.mcache
if c == nil || c.tiny == 0 {
continue
}
// 獲取 tiny 對象
_, span, objIndex := findObject(c.tiny, 0, 0)
gcw := &p.gcw
// tiny 對象置灰(標記 + 添加入隊)
greyobject(c.tiny, 0, 0, span, gcw, objIndex)
}
}
3.7 Start the world
startTheWorldWithSema 與 stopTheWorldWithSema 形成對偶. 該方法會重新恢復世界的生機,將所有 P 喚醒. 倘若缺少 M,則構造新的 M 爲 P 補齊(M 和 P 是 Golang GMP 模型中的概念,可參見我的文章 Golang GMP 原理).
func startTheWorldWithSema(emitTraceEvent bool) int64 {
assertWorldStopped()
// ...
p1 := procresize(procs)
// 重啓世界
worldStarted()
// 遍歷所有 p,將其喚醒
for p1 != nil {
p := p1
p1 = p1.link.ptr()
if p.m != 0 {
mp := p.m.ptr()
p.m = 0
if mp.nextp != 0 {
throw("startTheWorld: inconsistent mp->nextp")
}
mp.nextp.set(p)
notewakeup(&mp.park)
} else {
newm(nil, p, -1)
}
}
// ...
return startTime
}
4 併發標記
下面比如難度曲線最陡峭的併發標記部分. 這部分內容承接上文 3.2 小節,講述標記協程在被喚醒後,需要執行的任務細節.
首先,我們先來理一下,這些標記協程是如何被喚醒的.
4.1 調度標記協程
在 GMP 調度的主幹方法 schedule 中,會通過 g0 調用 findRunnable 方法 P 尋找下一個可執行的協程,找到後會調用 execute 方法,內部完成由 g0->g 的切換,真正執行用戶協程中的任務.
func schedule() {
// ...
gp, inheritTime, tryWakeP := findRunnable()
// ...
execute(gp, inheritTime)
}
在 findRunnable 方法中,當通過全局標識 gcBlackenEnabled 發現當前開啓 GC 模式時,會調用 gcControllerState.findRunnableGCWorker 喚醒並取得標記協程.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
// ...
if gcBlackenEnabled != 0 {
gp, now = gcController.findRunnableGCWorker(_p_, now)
if gp != nil {
return gp, false, true
}
}
// ...
}
在 gcControllerState.findRunnableGCWorker 方法中,會從全局的標記協程池 gcBgMarkWorkerPool 獲取到一個封裝了標記協程的 node. 並通過 gcControllerState 中 dedicatedMarkWorkersNeeded、fractionalUtilizationGoal 等字段標識判定標記協程的標記模式,然後將標記協程狀態由_Gwaiting 喚醒爲_Grunnable,並返回給 g0 用於執行.
這裏談到的標記模式對應了上文 3.4 小節的內容,並將在下文 4.2 小節詳細介紹.
func (c *gcControllerState) findRunnableGCWorker(_p_ *p, now int64) (*g, int64) {
// ...
// 保證當前 _p_ 是可以調度標記協程的,每個 p 只能執行一個標記協程
if !gcMarkWorkAvailable(_p_) {
return nil, now
}
// 從全局標記協程池子 gcBgMarkWorkerPool 中取出 g
node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
// ...
decIfPositive := func(ptr *int64) bool {
for {
v := atomic.Loadint64(ptr)
if v <= 0 {
return false
}
if atomic.Casint64(ptr, v, v-1) {
return true
}
}
}
// 確認標記的模式
if decIfPositive(&c.dedicatedMarkWorkersNeeded) {
_p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode
} else if c.fractionalUtilizationGoal == 0 {
gcBgMarkWorkerPool.push(&node.node)
return nil, now
} else {
delta := now - c.markStartTime
if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal {
// Nope. No need to run a fractional worker.
gcBgMarkWorkerPool.push(&node.node)
return nil, now
}
// Run a fractional worker.
_p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode
}
// 將標記協程的狀態置爲 runnable,填了 gcBgMarkWorker 方法中 gopark 操作留下的坑
gp := node.gp.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
return gp, n
}
4.2 併發標記啓動
標記協程被喚醒後,主線又重新拉回到 gcBgMarkWorker 方法中,此時會根據 3.4 小節中預設的標記模式,調用 gcDrain 方法開始執行併發標記工作.
標記模式包含以下三種:
-
gcMarkWorkerDedicatedMode:專一模式. 需要完整執行完標記任務,不可被搶佔
-
gcMarkWorkerFractionalMode:分時模式. 當標記協程執行時長達到一定比例後,可以被搶佔
-
gcMarkWorkerIdleMode: 空閒模式. 隨時可以被搶佔.
值得一提的是,在執行專一模式時,會先以可被搶佔的模式嘗試執行,倘若真的被用戶協程搶佔,則會先將當前 P 本地隊列的用戶協程投放到全局 g 隊列中,再將標記模式改爲不可搶佔模式. 這樣設計的優勢是,通過負載均衡的方式,減少當前 P 下用戶協程的等待時長,提高用戶體驗.
在 gcDrain 方法中,有兩個核心的 gcDrainFlags 控制着標記協程的運行風格:
-
gcDrainIdle:空閒模式,隨時可被搶佔
-
gcDrainFractional:分時模式,執行一定比例的時長後可被搶佔
type gcDrainFlags int
const (
gcDrainUntilPreempt gcDrainFlags = 1 << iota
gcDrainFlushBgCredit
gcDrainIdle
gcDrainFractional
)
func gcBgMarkWorker() {
// ...
node.m.set(acquirem())
pp := gp.m.p.ptr() // P can't change with preemption disabled.
// ...
// 根據不同的運作模式,執行 gcDrain 方法:
systemstack(func() {
casgstatus(gp, _Grunning, _Gwaiting)
switch pp.gcMarkWorkerMode {
default:
throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
case gcMarkWorkerDedicatedMode:
// 先按照可搶佔模式執行標記協程,倘若被搶佔,則將搶佔協程添加到全局隊列中,之後再以不可搶佔模式執行標記協程
gcDrain(&pp.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
if gp.preempt {
// 將 p 本地隊列中的 g 添加到全局隊列
if drainQ, n := runqdrain(pp); n > 0 {
lock(&sched.lock)
globrunqputbatch(&drainQ, int32(n))
unlock(&sched.lock)
}
}
// Go back to draining, this time
// without preemption.
gcDrain(&pp.gcw, gcDrainFlushBgCredit)
case gcMarkWorkerFractionalMode:
gcDrain(&pp.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit)
case gcMarkWorkerIdleMode:
gcDrain(&pp.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
}
casgstatus(gp, _Gwaiting, _Grunning)
})
// ...
}
}
4.3 標記主流程
gcDrain 方法是併發標記階段的核心方法:
-
在空閒模式(idle)和分時模式(fractional)下,會提前設好 check 函數(pollWork 和 pollFractionalWorkerExit)
-
標記根對象
-
循環從 gcw 緩存隊列中取出灰色對象,執行 scanObject 方法進行掃描標記
-
定期檢查 check 函數,判斷標記流程是否應該被打斷
func gcDrain(gcw *gcWork, flags gcDrainFlags) {
// ...
gp := getg().m.curg
// 模式標記
preemptible := flags&gcDrainUntilPreempt != 0
flushBgCredit := flags&gcDrainFlushBgCredit != 0
idle := flags&gcDrainIdle != 0
// ...
var check func() bool
if flags&(gcDrainIdle|gcDrainFractional) != 0 {
// ...
if idle {
check = pollWork
} else if flags&gcDrainFractional != 0 {
check = pollFractionalWorkerExit
}
}
// 倘若根對象還未標記完成,則先進行根對象標記
if work.markrootNext < work.markrootJobs {
// Stop if we're preemptible or if someone wants to STW.
for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
job := atomic.Xadd(&work.markrootNext, +1) - 1
if job >= work.markrootJobs {
break
}
// 標記根對象
markroot(gcw, job, flushBgCredit)
// ...
}
}
// 遍歷隊列,進行對象標記
for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
// work balance
if work.full == 0 {
gcw.balance()
}
// 嘗試從 p 本地隊列中獲取灰色對象,無鎖
b := gcw.tryGetFast()
if b == 0 {
// 嘗試從全局隊列中獲取灰色對象,加鎖
b = gcw.tryGet()
if b == 0 {
// 刷新寫屏障緩存
wbBufFlush(nil, 0)
b = gcw.tryGet()
}
}
if b == 0 {
// 已無對象需要標記
break
}
// 進行對象的標記,並順延指針進行後續對象的掃描
scanobject(b, gcw)
// ...
if gcw.heapScanWork >= gcCreditSlack {
gcController.heapScanWork.Add(gcw.heapScanWork)
// ...
if checkWork <= 0 {
// ...
if check != nil && check() {
break
}
}
}
}
done:
//
}
4.4 灰對象緩存隊列
4.3 小節的源碼中,涉及到一個重要的數據結構:gcw,這是灰色對象的存儲代理和載體,在標記過程中需要持續不斷地從從隊列中取出灰色對象,進行掃描,並將新的灰色對象通過 gcw 添加到緩存隊列.
灰對象緩存隊列分爲兩層:
-
每個 P 私有的 gcWork,實現上由兩條單向鏈表構成,採用輪換機制使用
-
全局隊列 workType.full,底層是一個通過 CAS 操作維護的棧結構,由所有 P 共享
(1)gcWork
gcWork 數據結構源代碼如下所示.
type gcWork struct {
// ...
wbuf1, wbuf2 *workbuf
// ...
}
type workbuf struct {
workbufhdr
obj [(_WorkbufSize - unsafe.Sizeof(workbufhdr{})) / goarch.PtrSize]uintptr
}
type workbufhdr struct {
node lfnode
nobj int
}
type lfnode struct {
next uint64
pushcnt uint
}
在 gcDrain 方法中,會持續不斷地從當前 P 的 gcw 中獲取灰色對象,在調用策略上,會先嚐試取私有部分,再通過 gcw 代理取全局共享部分:
// 嘗試從 p 本地隊列中獲取灰色對象,無鎖
b := gcw.tryGetFast()
if b == 0 {
// 嘗試從全局隊列中獲取灰色對象,加鎖
b = gcw.tryGet()
if b == 0 {
// 因爲缺灰,會釋放寫屏障緩存,進行補灰操作
wbBufFlush(nil, 0)
b = gcw.tryGet()
}
}
gcWork.tryGetFast 方法中,會先嚐試從 gcWork.wbuf1 中獲取灰色對象.
func (w *gcWork) tryGetFast() uintptr {
wbuf := w.wbuf1
if wbuf == nil || wbuf.nobj == 0 {
return 0
}
wbuf.nobj--
return wbuf.obj[wbuf.nobj]
}
倘若 gcWork.wbuf1 缺灰,則會在 gcWork.tryGet 方法中交換 wbuf1 和 wbuf2,再嘗試獲取一次. 倘若仍然缺灰,則會調用 trygetfull 方法,從全局緩存隊列中獲取.
func (w *gcWork) tryGet() uintptr {
wbuf := w.wbuf1
if wbuf == nil {
w.init()
wbuf = w.wbuf1
// wbuf is empty at this point.
}
if wbuf.nobj == 0 {
w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
wbuf = w.wbuf1
if wbuf.nobj == 0 {
owbuf := wbuf
wbuf = trygetfull()
if wbuf == nil {
return 0
}
putempty(owbuf)
w.wbuf1 = wbuf
}
}
wbuf.nobj--
return wbuf.obj[wbuf.nobj]
}
(2)workType.full
灰色對象的全局緩存隊列是一個棧結構,調用 pop 方法時,會通過 CAS 方式依次從棧頂取出一個緩存鏈表.
var work workType
type workType struct {
full lfstack
// ...
}
type lfstack uint64
func (head *lfstack) push(node *lfnode) {
// ...
}
func (head *lfstack) pop() unsafe.Pointer {
for {
old := atomic.Load64((*uint64)(head))
if old == 0 {
return nil
}
node := lfstackUnpack(old)
next := atomic.Load64(&node.next)
if atomic.Cas64((*uint64)(head), old, next) {
return unsafe.Pointer(node)
}
}
}
func trygetfull() *workbuf {
b := (*workbuf)(work.full.pop())
if b != nil {
b.checknonempty()
return b
}
return b
}
4.5 三色標記實現
Golang GC 的標記流程基於三色標記法實現. 此時在將理論落地實踐前,我們需要先搞清楚一個細節,那就是在代碼層面,黑、灰、白這三種顏色如何實現.
在前文 Golang 內存模型與分配機制中聊過,每個對象會有其從屬的 mspan,在 mspan 中,有着兩個 bitmap 存儲着每個對象大小的內存的狀態信息:
-
allocBits:標識內存的閒忙狀態,一個 bit 位對應一個 object 大小的內存塊,值爲 1 代表已使用;值爲 0 代表未使用
-
gcmakrBits:只在 GC 期間使用. 值爲 1 代表佔用該內存塊的對象被標記存活.
在垃圾清掃的過程中,並不會真正地將內存進行回收,而是在每個 mspan 中使用 gcmakrBits 對 allocBits 進行覆蓋. 在分配新對象時,當感知到 mspan 的 allocBits 中,某個對象槽位 bit 位值爲 0,則會將其視爲空閒內存進行使用,其本質上可能是一個覆蓋操作.
type mspan struct {
// ...
allocBits *gcBits
gcmarkBits *gcBits
// ...
}
type gcBits uint8
介紹完了 bitmap 設定之後,下面回到三種標記色的實現當中:
-
黑色:對象在 mspan.gcmarkBits 中 bit 位值爲 1,且對象已經離開灰對象緩存隊列(4.4 小節談及)
-
灰色:對象在 mspan.gcmarkBits 中 bit 位值爲 1,且對象仍處於灰對象緩存隊列中
-
白色:對象在 mspan.gcmarkBits 中 bit 位值位 0.
有了以上的基礎設定之後,我們已經可以在腦海中搭建三色標記法的實現框架:
-
掃描根對象,將 gcmarkBits 中的 bit 位置 1,並添加到灰對象緩存隊列
-
依次從灰對象緩存隊列中取出灰對象,將其指向對象的 gcmarkBits 中的 bit 位置 1 並添加到會對象緩存隊列
4.6 中止標記協程
gcDrain 方法中,針對空閒模式 idle 和分時模式 fractional,會設定 check 函數,在循環掃描的過程中檢測是否需要中斷當前標記協程.
func gcDrain(gcw *gcWork, flags gcDrainFlags) {
// ...
// ...
idle := flags&gcDrainIdle != 0
// ...
var check func() bool
if flags&(gcDrainIdle|gcDrainFractional) != 0 {
// ...
if idle {
check = pollWork
} else if flags&gcDrainFractional != 0 {
check = pollFractionalWorkerExit
}
}
// ...
// 遍歷隊列,進行對象標記
for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
// ...
if gcw.heapScanWork >= gcCreditSlack {
gcController.heapScanWork.Add(gcw.heapScanWork)
// ...
if checkWork <= 0 {
// ...
if check != nil && check() {
break
}
}
}
}
done:
//
}
對應於 idle 模式的 check 函數是 pollwork,方法中判斷 P 本地隊列存在就緒的 g 或者存在就緒的網絡寫成,就會對當前標記協程進行中斷:
func pollWork() bool {
if sched.runqsize != 0 {
return true
}
p := getg().m.p.ptr()
if !runqempty(p) {
return true
}
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 {
if list := netpoll(0); !list.empty() {
injectglist(&list)
return true
}
}
return false
}
對應於 fractional 模式的 check 函數是 pollFractionalWorkerExit,倘若當前標記協程執行的時間比例大於 1.2 倍的 fractionalUtilizationGoal 閾值(3.4 小節中設置),就會中止標記協程.
func pollFractionalWorkerExit() bool {
now := nanotime()
delta := now - gcController.markStartTime
if delta <= 0 {
return true
}
p := getg().m.p.ptr()
selfTime := p.gcFractionalMarkTime + (now - p.gcMarkWorkerStartTime)
return float64(selfTime)/float64(delta) > 1.2*gcController.fractionalUtilizationGoal
}
4.7 掃描根對象
在 gcDrain 方法正式開始循環掃描前,還會先對根對象進行掃描標記. Golang 中的根對象包括如下幾項:
-
.bss 段內存中的未初始化全局變量
-
.data 段內存中的已初始化變量)
-
span 中的 finalizer
-
各協程棧
實現根對象掃描的方法是 markroot:
func markroot(gcw *gcWork, i uint32, flushBgCredit bool) int64 {
var workDone int64
var workCounter *atomic.Int64
switch {
// 處理已初始化的全局變量
case work.baseData <= i && i < work.baseBSS:
workCounter = &gcController.globalsScanWork
for _, datap := range activeModules() {
workDone += markrootBlock(datap.data, datap.edata-datap.data, datap.gcdatamask.bytedata, gcw, int(i-work.baseData))
}
// 處理未初始化的全局變量
case work.baseBSS <= i && i < work.baseSpans:
workCounter = &gcController.globalsScanWork
for _, datap := range activeModules() {
workDone += markrootBlock(datap.bss, datap.ebss-datap.bss, datap.gcbssmask.bytedata, gcw, int(i-work.baseBSS))
}
// 處理 finalizer 隊列
case i == fixedRootFinalizers:
for fb := allfin; fb != nil; fb = fb.alllink {
cnt := uintptr(atomic.Load(&fb.cnt))
scanblock(uintptr(unsafe.Pointer(&fb.fin[0])), cnt*unsafe.Sizeof(fb.fin[0]), &finptrmask[0], gcw, nil)
}
// 釋放已終止的 g 的棧
case i == fixedRootFreeGStacks:
systemstack(markrootFreeGStacks)
// 掃描 mspan 中的 special
case work.baseSpans <= i && i < work.baseStacks:
markrootSpans(gcw, int(i-work.baseSpans))
default:
// ...
// 獲取需要掃描的 g
gp := work.stackRoots[i-work.baseStacks]
// ...
// 切回到 g0執行工作,掃描 g 的棧
systemstack(func() {
// ...
// 棧掃描
workDone += scanstack(gp, gcw)
// ...
})
}
// ...
return workDone
}
其中,棧掃描方法鏈路展開如下:
func scanstack(gp *g, gcw *gcWork) int64 {
// ...
scanframe := func(frame *stkframe, unused unsafe.Pointer) bool {
scanframeworker(frame, &state, gcw)
return true
}
gentraceback(^uintptr(0), ^uintptr(0), 0, gp, 0, nil, 0x7fffffff, scanframe, nil, 0)
// ...
}
func scanframeworker(frame *stkframe, state *stackScanState, gcw *gcWork) {
// ...
// 掃描局部變量
if locals.n > 0 {
size := uintptr(locals.n) * goarch.PtrSize
scanblock(frame.varp-size, size, locals.bytedata, gcw, state)
}
// 掃描函數參數
if args.n > 0 {
scanblock(frame.argp, uintptr(args.n)*goarch.PtrSize, args.bytedata, gcw, state)
}
// ...
}
不論是全局變量掃描還是棧變量掃描,底層都會調用到 scanblock 方法. 在掃描時,會通過位圖 ptrmask 輔助加速流程. 在 ptrmask 當中,每個 bit 位對應了一個指針大小(8B)的位置的標識信息,指明當前位置是否是指針,倘若非指針,則直接跳過掃描.
此外, 在標記一個對象時, 需要獲取到該對象所在 mspan, 這一過程會使用到 heapArena 中關於頁和 mspan 間的映射索引(如有存疑可以看我的文章 Golang 內存模型與分配機制),這部分內容放在 4.7 小節中集中闡述.
func scanblock(b0, n0 uintptr, ptrmask *uint8, gcw *gcWork, stk *stackScanState) {
// ...
b := b0
n := n0
// 遍歷待掃描的地址
for i := uintptr(0); i < n; {
// 找到 bitmap 對應的 byte. ptrmask 輔助標識了 .data 一個指針的大小,bit 位爲 1 代表當前位置是一個指針
bits := uint32(*addb(ptrmask, i/(goarch.PtrSize*8)))
// 非指針,跳過
if bits == 0 {
i += goarch.PtrSize * 8
continue
}
for j := 0; j < 8 && i < n; j++ {
if bits&1 != 0 {
// Same work as in scanobject; see comments there.
p := *(*uintptr)(unsafe.Pointer(b + i))
if p != 0 {
if obj, span, objIndex := findObject(p, b, i); obj != 0 {
greyobject(obj, b, i, span, gcw, objIndex)
} else if stk != nil && p >= stk.stack.lo && p < stk.stack.hi {
stk.putPtr(p, false)
}
}
}
bits >>= 1
i += goarch.PtrSize
}
}
}
4.7 掃描普通對象
gcDrain 方法中,會持續從灰對象緩存隊列中取出灰對象,然後採用 scanobject 方法進行處理.
func gcDrain(gcw *gcWork, flags gcDrainFlags) {
// ...
// 遍歷隊列,進行對象標記
for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
// 嘗試從 p 本地隊列中獲取灰色對象,無鎖
b := gcw.tryGetFast()
if b == 0 {
// 嘗試從全局隊列中獲取灰色對象,加鎖
b = gcw.tryGet()
if b == 0 {
// 刷新寫屏障緩存
wbBufFlush(nil, 0)
b = gcw.tryGet()
}
}
if b == 0 {
// 已無對象需要標記
break
}
// 進行對象的標記,並順延指針進行後續對象的掃描
scanobject(b, gcw)
}
done:
//
}
scanobject 方法遍歷當前灰對象中的指針,依次調用 greyobject 方法將其指向的對象進行置灰操作.
const (
bitPointer = 1 << 0
bitScan = 1 << 4
)
func scanobject(b uintptr, gcw *gcWork) {
// 通過地址映射到所屬的頁
// 通過 heapArena 中的映射信息,從頁映射到所屬的 mspan
hbits := heapBitsForAddr(b)
s := spanOfUnchecked(b)
n := s.elemsize
// ...
// 順延當前對象的成員指針,掃描後續的對象
var i uintptr
for i = 0; i < n; i, hbits = i+goarch.PtrSize, hbits.next() {
// 通過 heapArena 中的 bitmap 記錄的信息,加速遍歷過程
bits := hbits.bits()
if bits&bitScan == 0 {
break // no more pointers in this object
}
if bits&bitPointer == 0 {
continue // not a pointer
}
obj := *(*uintptr)(unsafe.Pointer(b + i))
if obj != 0 && obj-b >= n {
// 對於遍歷到的對象,將其置灰,並添加到隊列中,等待後續掃描
if obj, span, objIndex := findObject(obj, b, i); obj != 0 {
greyobject(obj, b, i, span, gcw, objIndex)
}
}
}
// ...
}
在 scanobject 方法中還涉及到兩項細節:
(1)如何通過對象地址找到其所屬的 mspan
首先根據對象地址,可以定位到對象所屬的頁,進一步可以通過地址偏移定位到其所屬的 heapArena. 在 heapArena 中,已經提前建立好了從頁映射到 mspan 的索引,於是我們通過這一鏈路,實現從對象地址到 mspan 的映射. 從而能夠獲得 mspan.gcmarkBits 進行 bitmap 標記操作.
type heapArena struct {
spans [pagesPerArena]*mspan
}
func findObject(p, refBase, refOff uintptr) (base uintptr, s *mspan, objIndex uintptr) {
s = spanOf(p)
// ...
return
}
func spanOf(p uintptr) *mspan {
// ...
ri := arenaIndex(p)
// ...
l2 := mheap_.arenas[ri.l1()]
// ...
ha := l2[ri.l2()]
// ...
return ha.spans[(p/pageSize)%pagesPerArena]
}
(2)如何加速掃描過程
在 heapArena 中,通過一個額外的 bitmap 存儲了內存信息:
bitmap 中,每兩個 bit 記錄一個指針大小的內存空間的信息(8B),其中一個 bit 標誌了該位置是否是指針;另一個 bit 標誌了該位置往後是否還存在指針,於是在遍歷掃描的過程中,可以通過這兩部分信息推進 for 循環的展開速度.
const heapArenaBitmapBytes untyped int = 2097152
type heapArena struct {
bitmap [heapArenaBitmapBytes]byte
// ...
}
func heapBitsForAddr(addr uintptr) (h heapBits) {
// 2 bits per word, 4 pairs per byte, and a mask is hard coded.
arena := arenaIndex(addr)
ha := mheap_.arenas[arena.l1()][arena.l2()]
if ha == nil {
return
}
h.bitp = &ha.bitmap[(addr/(goarch.PtrSize*4))%heapArenaBitmapBytes]
h.shift = uint32((addr / goarch.PtrSize) & 3)
h.arena = uint32(arena)
h.last = &ha.bitmap[len(ha.bitmap)-1]
return
}
func (h heapBits) bits() uint32 {
// The (shift & 31) eliminates a test and conditional branch
// from the generated code.
return uint32(*h.bitp) >> (h.shift & 31)
}
4.8 對象置灰
對象置灰操作位於 greyobject 方法中. 如 4.5 小節所屬,置灰分兩步:
-
將 mspan.gcmarkBits 對應 bit 位置爲 1
-
將對象添加到灰色對象緩存隊列
func greyobject(obj, base, off uintptr, span *mspan, gcw *gcWork, objIndex uintptr) {
// ...
// 在其所屬的 mspan 中,將對應位置的 gcMark bitmap 位置爲 1
mbits.setMarked()
// ...
// 將對象添加到當前 p 的本地隊列
if !gcw.putFast(obj) {
gcw.put(obj)
}
}
4.9 新分配對象置黑
此外,值得一提的是,GC 期間新分配的對象,會被直接置黑,呼應了混合寫屏障中的設定.
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
// ...
if gcphase != _GCoff {
gcmarknewobject(span, uintptr(x), size, scanSize)
}
// ...
}
func gcmarknewobject(span *mspan, obj, size, scanSize uintptr) {
// ...
objIndex := span.objIndex(obj)
// 標記對象
span.markBitsForIndex(objIndex).setMarked()
// ...
}
5 輔助標記
5.1 輔助標記策略
在併發標記階段,由於用戶協程與標記協程共同工作,因此在極端場景下可能存在一個問題——倘若用戶協程分配對象的速度快於標記協程標記對象的速度,這樣標記階段豈不是永遠無法結束?
爲規避這一問題,Golang GC 引入了輔助標記的策略,建立了一個兜底的機制:在最壞情況下,一個用戶協程分配了多少內存,就需要完成對應量的標記任務.
在每個用戶協程 g 中,有一個字段 gcAssisBytes,象徵 GC 期間可分配內存資產的概念,每個 g 在 GC 期間輔助標記了多大的內存空間,就會獲得對應大小的資產,使得其在 GC 期間能多分配對應大小的內存進行對象創建.
type g struct {
// ...
gcAssistBytes int64
}
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
// ...
var assistG *g
if gcBlackenEnabled != 0 {
assistG = getg()
if assistG.m.curg != nil {
assistG = assistG.m.curg
}
// 每個 g 會有資產
assistG.gcAssistBytes -= int64(size)
if assistG.gcAssistBytes < 0 {
gcAssistAlloc(assistG)
}
}
}
5.2 輔助標記執行
由於各對象中,可能存在部分不包含指針的字段,這部分字段是無需進行掃描的. 因此真正需要掃描的內存量會小於實際的內存大小,兩者之間的比例通過 gcController.assistWorkPerByte 進行記錄.
於是當一個用戶協程在 GC 期間需要分配 M 大小的新對象時,實際上需要完成的輔助標記量應該爲 assistWorkPerByte*M.
輔助標記邏輯位於 gcAssistAlloc 方法. 在該方法中,會先嚐試從公共資產池 gcController.bgScanCredit 中偷取資產,倘若資產仍然不夠,則會通過 systemstack 方法切換至 g0,並在 gcAssistAlloc1 方法內調用 gcDrainN 方法參與到併發標記流程當中.
func gcAssistAlloc(gp *g) {
// ...
// 計算待完成的任務量
debtBytes := -gp.gcAssistBytes
assistWorkPerByte := gcController.assistWorkPerByte.Load()
scanWork := int64(assistWorkPerByte * float64(debtBytes))
if scanWork < gcOverAssistWork {
scanWork = gcOverAssistWork
debtBytes = int64(assistBytesPerWork * float64(scanWork))
}
// 先嚐試從全局的可用資產中偷取
bgScanCredit := atomic.Loadint64(&gcController.bgScanCredit)
stolen := int64(0)
if bgScanCredit > 0 {
if bgScanCredit < scanWork {
stolen = bgScanCredit
gp.gcAssistBytes += 1 + int64(assistBytesPerWork*float64(stolen))
} else {
stolen = scanWork
gp.gcAssistBytes += debtBytes
}
atomic.Xaddint64(&gcController.bgScanCredit, -stolen)
scanWork -= stolen
// 全局資產夠用,則無需輔助標記,直接返回
if scanWork == 0 {
return
}
}
// 切換到 g0,開始執行標記任務
systemstack(func() {
gcAssistAlloc1(gp, scanWork)
})
// 輔助標記完成
completed := gp.param != nil
gp.param = nil
if completed {
gcMarkDone()
}
// ...
}
6 標記終止
6.1 標記完成
在併發標記階段的 gcBgMarkWorker 方法中,當最後一個標記協程也完成任務後,會調用 gcMarkDone 方法,開始執行併發標記後處理的邏輯.
func gcBgMarkWorker() {
// ...
for{
// ...
if incnwait == work.nproc && !gcMarkWorkAvailable(nil) {
// ...
gcMarkDone()
}
}
}
gcMarkDone 方法中,會遍歷釋放所有 P 的寫屏障緩存,查看是否存在因屏障機制遺留的灰色對象,如果有,則會推出 gcMarkDone 方法,回退到 gcBgMarkWorker 的主循環中,繼續完成標記任務.
倘若寫屏障中也沒有遺留的灰對象,此時會調用 STW 停止世界,並步入 gcMarkTermination 方法進入標記終止階段.
func gcMarkDone()
top:
if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) {
semrelease(&work.markDoneSema)
return
}
// ...
// 切換到 p0
systemstack(func() {
gp := getg().m.curg
casgstatus(gp, _Grunning, _Gwaiting)
forEachP(func(_p_ *p) {
// 釋放一波寫屏障的緩存,可能會新的待標記任務產生
wbBufFlush1(_p_)
})
casgstatus(gp, _Gwaiting, _Grunning)
})
// 倘若有新的標記對象待處理,則調回 top 處,可能會回退到併發標記階段
if gcMarkDoneFlushed != 0 {
// ...
goto top
}
// 正式進入標記完成階段,會STW
systemstack(stopTheWorldWithSema)
// ...
// 在 STW 狀態下,進入標記終止階段
gcMarkTermination()
}
6.2 標記終止
gcMarkTermination 方法包括幾個核心步驟:
-
設置 GC 進入標記終止階段_GCmarktermination
-
切換至 g0,設置 GC 進入標記關閉階段_GCoff
-
切換至 g0,調用 gcSweep 方法,喚醒後臺清掃協程,執行標記清掃工作
-
切換至 g0,執行 gcControllerCommit 方法,設置觸發下一輪 GC 的內存閾值
-
切換至 g0,調用 startTheWorldWithSema 方法,重啓世界
func gcMarkTermination() {
// 設置GC階段進入標記終止階段
setGCPhase(_GCmarktermination)
// ...
systemstack(func() {
// ...
// 設置GC階段進入標記關閉階段
setGCPhase(_GCoff)
// 開始執行標記清掃動作
gcSweep(work.mode)
})
// 提交下一輪GC的內存閾值
systemstack(gcControllerCommit)
// ...
systemstack(func() { startTheWorldWithSema(true) })
// ...
}
6.3 標記清掃
gwSweep 方法的核心是調用 ready 方法,喚醒了因爲 gopark 操作陷入被動阻塞的清掃協程 sweep.g.
func gcSweep(mode gcMode) {
assertWorldStopped()
// ...
// 喚醒後臺清掃任務
lock(&sweep.lock)
if sweep.parked {
sweep.parked = false
ready(sweep.g, 0, true)
}
unlock(&sweep.lock)
}
那麼 sweep.g 是在何時被創建,又是在何時被 park 的呢?
我們重新回到 runtime 包的 main 函數中,開始向下追溯:
func main() {
// ...
gcenable()
// ...
}
func gcenable() {
// ...
go bgsweep(c)
<-c
// ...
}
可以看到,在異步啓動的 bgsweep 方法中,會首先將當前協程 gopark 掛起,等待被喚醒.
當在標記終止階段被喚醒後,會進入 for 循環,每輪完成一個 mspan 的清掃工作,隨後就調用 Gosched 方法主動讓渡 P 的執行權,採用這種懶清掃的方式逐步推進標記清掃流程.
func bgsweep(c chan int) {
sweep.g = getg()
lockInit(&sweep.lock, lockRankSweep)
lock(&sweep.lock)
sweep.parked = true
c <- 1
// 執行 gopark 操作,等待 GC 併發標記階段完成後將當前協程喚醒
goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)
for {
// 每清掃一個 mspan 後,會發起主動讓渡
for sweepone() != ^uintptr(0) {
sweep.nbgsweep++
Gosched()
}
// ...
lock(&sweep.lock)
if !isSweepDone() {
// This can happen if a GC runs between
// gosweepone returning ^0 above
// and the lock being acquired.
unlock(&sweep.lock)
continue
}
// 清掃完成,則繼續 gopark 被動阻塞
sweep.parked = true
goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)
}
}
sweepone 方法每次清掃一個協程,清掃邏輯核心位於 sweepLocked.sweep 方法中,正是將 mspan 的 gcmarkBits 賦給 allocBits,並創建出一個空白的 bitmap 作爲新的 gcmarkBits. 這一實現呼應了本文 4.5 小節談到的設定.
func sweepone() uintptr {
// ...
sl := sweep.active.begin()
// ...
for {
// 查找到一個待清掃的 mspan
s := mheap_.nextSpanForSweep()
// ...
if s, ok := sl.tryAcquire(s); ok {
npages = s.npages
// 對一個 mspan 進行清掃
if s.sweep(false) {
// Whole span was freed. Count it toward the
// page reclaimer credit since these pages can
// now be used for span allocation.
mheap_.reclaimCredit.Add(npages)
} else {
// Span is still in-use, so this returned no
// pages to the heap and the span needs to
// move to the swept in-use list.
npages = 0
}
break
}
}
sweep.active.end(sl)
// ...
return npages
}
func (sl *sweepLocked) sweep(preserve bool) bool {
// ...
s.allocBits = s.gcmarkBits
s.gcmarkBits = newMarkBits(s.nelems)
// ...
}
6.4 設置下輪 GC 閾值
在 gcMarkTermination 方法中,還會通過 g0 調用 gcControllerCommit 方法,完成下輪觸發 GC 的內存閾值的設定.
func gcMarkTermination() {
// ...
// 提交下一輪GC的內存閾值
systemstack(gcControllerCommit)
// ...
}
func gcControllerCommit() {
assertWorldStoppedOrLockHeld(&mheap_.lock)
gcController.commit(isSweepDone())
// ...
}
在 gcControllerState.commit 方法中,會讀取 gcControllerState.gcPercent 字段值作爲觸發 GC 的堆使用內存增長比例,並結合當前堆內存的使用情況,推算出觸發下輪 GC 的內存閾值,設置到 gcControllerState.gcPercentHeapGoal 字段中.
func (c *gcControllerState) commit(isSweepDone bool) {
// ...
gcPercentHeapGoal := ^uint64(0)
// gcPercent 值,用戶可以通過環境變量 GOGC 顯式設置. 未設時,默認值爲 100.
if gcPercent := c.gcPercent.Load(); gcPercent >= 0 {
gcPercentHeapGoal = c.heapMarked + (c.heapMarked+atomic.Load64(&c.lastStackScan)+atomic.Load64(&c.globalsScan))*uint64(gcPercent)/100
}
// ...
c.gcPercentHeapGoal.Store(gcPercentHeapGoal)
// ...
}
在新一輪嘗試觸發 GC 的過程中,對於 gcTriggerHeap 類型的觸發事件,會調用 gcController.trigger 方法,讀取到 gcControllerState.gcPercentHeapGoal 中存儲的內存閾值,進行觸發條件校驗.
func (t gcTrigger) test() bool {
// ...
switch t.kind {
case gcTriggerHeap:
// ...
trigger, _ := gcController.trigger()
return atomic.Load64(&gcController.heapLive) >= trigger
// ...
}
return true
}
func (c *gcControllerState) trigger() (uint64, uint64) {
goal, minTrigger := c.heapGoalInternal()
// ...
var trigger uint64
runway := c.runway.Load()
// ...
trigger = goal - runway
// ...
return trigger, goal
}
func (c *gcControllerState) heapGoalInternal() (goal, minTrigger uint64) {
goal = c.gcPercentHeapGoal.Load()
// ...
return
}
7 系統駐留內存清理
Golang 進程從操作系統主內存(Random-Access Memory,簡稱 RAM)中申請到堆中進行復用的內存部分稱爲駐留內存(Resident Set Size,RSS). 顯然,RSS 不可能只借不還,應當遵循實際使用情況進行動態擴縮.
Golang 運行時會異步啓動一個回收協程,以趨近於 1% CPU 使用率作爲目標,持續地對 RSS 中的空閒內存進行回收.
7.1 回收協程啓動
在 runtime 包下的 main 函數中,會異步啓動回收協程 bgscavenge,源碼如下:
func main() {
// ...
gcenable()
// ...
}
func gcenable() {
// ...
go bgscavenge(c)
<-c
// ...
}
7.2 執行頻率控制
在 bgscavenge 方法中,通過 for 循環 + sleep 的方式,控制回收協程的執行頻率在佔用 CPU 時間片的 1% 左右. 其中回收 RSS 的核心邏輯位於 scavengerState.run 方法.
func bgscavenge(c chan int) {
scavenger.init()
c <- 1
scavenger.park()
// 如果當前操作系統分配內存>目標內存
for {
// 釋放內存
released, workTime := scavenger.run()
if released == 0 {
scavenger.park()
continue
}
atomic.Xadduintptr(&mheap_.pages.scav.released, released)
scavenger.sleep(workTime)
}
}
7.3 回收空閒內存
scavengerState.run 方法中,會開啓循環,經歷 pageAlloc.scavenge -> pageAlloc.scavengeOne 的調用鏈,最終通過 sysUnused 方法進行空閒內存頁的回收.
func (s *scavengerState) run() (released uintptr, worked float64) {
// ...
for worked < minScavWorkTime {
// ...
const scavengeQuantum = 64 << 10
r, duration := s.scavenge(scavengeQuantum)
// ...
}
return
}
func (p *pageAlloc) scavenge(nbytes uintptr, shouldStop func() bool) uintptr {
released := uintptr(0)
for released < nbytes {
ci, pageIdx := p.scav.index.find()
if ci == 0 {
break
}
systemstack(func() {
released += p.scavengeOne(ci, pageIdx, nbytes-released)
})
if shouldStop != nil && shouldStop() {
break
}
}
return released
}
在 pageAlloc.scavengeOne 方法中,通過 findScavengeCandidate 方法尋找到待回收的頁,通過 sysUnused 方法發起系統調用進行內存回收.
func (p *pageAlloc) scavengeOne(ci chunkIdx, searchIdx uint, max uintptr) uintptr {
// ...
lock(p.mheapLock)
if p.summary[len(p.summary)-1][ci].max() >= uint(minPages) {
// 找到待回收的部分
base, npages := p.chunkOf(ci).findScavengeCandidate(pallocChunkPages-1, minPages, maxPages)
// If we found something, scavenge it and return!
if npages != 0 {
// Compute the full address for the start of the range.
addr := chunkBase(ci) + uintptr(base)*pageSize
// ...
unlock(p.mheapLock)
if !p.test {
// 發起系統調用,回收內存
sysUnused(unsafe.Pointer(addr), uintptr(npages)*pageSize)
// 更新狀態信息
nbytes := int64(npages) * pageSize
gcController.heapReleased.add(nbytes)
gcController.heapFree.add(-nbytes)
stats := memstats.heapStats.acquire()
atomic.Xaddint64(&stats.committed, -nbytes)
atomic.Xaddint64(&stats.released, nbytes)
memstats.heapStats.release()
}
// 更新基數樹信息
lock(p.mheapLock)
p.free(addr, uintptr(npages), true)
p.chunkOf(ci).scavenged.setRange(base, npages)
unlock(p.mheapLock)
return uintptr(npages) * pageSize
}
}
//
}
前文 Golang 內存模型與分配機制中,我們有介紹過,在 Golang 堆中會基於基數樹的形式建立空閒頁的索引,且基數樹每個葉子節點對應了一個 chunk 塊大小的內存 (512 * 8KB = 4MB).
其中 chunk 的封裝類 pallocData 中有還兩個核心字段,一個 pallocBits 中標識了一個頁是否被佔用了(1 佔用、0 空閒),同時還有另一個 scavenged bitmap 用於表示一個頁是否已經被操作系統回收了(1 已回收、0 未回收). 因此,回收協程的目標就是找到某個頁,當其 pallocBits 和 scavenged 中的 bit 都爲 0 時, 代表其可以回收.
由於回收時,可能需要同時回收多個頁. 此時會利用基數樹的特性, 幫助快速找到連續的空閒可回收的頁位置.
type pallocData struct {
pallocBits
scavenged pageBits
}
8 總結
至此,Golang 內存管理系列的三篇文章全部結束. 如有紕漏,歡迎批評指正.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Db19tKNer8D6FX6UG-Yujw