Golang 垃圾回收源碼走讀

0 前言

近期在和大家一起探討 Golang 內存管理機制相關的內容.

此前分別介紹了 Golang 內存模型及分配機制和 Golang 的垃圾回收原理有關的內容. 本篇會基於源碼走讀的方式,對 Golang 垃圾回收的理論進行論證和補充. 本文走讀的源碼版本爲 Golang 1.19.

由於內容之間具有強關聯性,建議大家先完成前兩篇內容的閱讀,再開啓本篇的學習.

1 源碼導讀

1.1 源碼框架

首先給出整體的源碼走讀框架,供大家總覽全局,避免暈車.

1.2 文件位置

GC 中各子流程聚焦於不同源碼文件中,目錄供大家一覽,感興趣可以連貫閱讀.

Sv4PWv

2 觸發 GC

下面順沿源碼框架,開啓走讀流程. 本章首先聊聊,GC 階段是如何被觸發啓動的.

2.1 觸發 GC 類型

觸發 GC 的事件類型可以分爲如下三種:

tiTEKS

在觸發 GC 時,會通過 gcTrigger.test 方法,結合具體的觸發事件類型進行觸發條件的校驗,校驗條件展示於上表,對應的源碼如下:

type gcTriggerKind int


const (
    // 根據堆分配內存情況,判斷是否觸發GC
    gcTriggerHeap gcTriggerKind = iota
    // 定時觸發GC
    gcTriggerTime
    // 手動觸發GC
    gcTriggerCycle
}


func (t gcTrigger) test() bool {
    // ...
    switch t.kind {
    case gcTriggerHeap:
        // ...
        trigger, _ := gcController.trigger()
        return atomic.Load64(&gcController.heapLive) >= trigger
    case gcTriggerTime:
        if gcController.gcPercent.Load() < 0 {
            return false
        }
        lastgc := int64(atomic.Load64(&memstats.last_gc_nanotime))
        return lastgc != 0 && t.now-lastgc > forcegcperiod
    case gcTriggerCycle:
        // ...
        return int32(t.n-work.cycles) > 0
    }
    return true
}

2.2 定時觸發 GC

定時觸發 GC 的源碼方法及文件如下表所示:

pUmUZd

(1)啓動定時觸發協程並阻塞等待

runtime 包初始化的時候,即會異步開啓一個守護協程,通過 for 循環 + park 的方式,循環阻塞等待被喚醒.

當被喚醒後,則會調用 gcStart 方法進入標記準備階段,嘗試開啓新一輪 GC,此時觸發 GC 的事件類型正是 gcTriggerTime(定時觸發).

在 gcStart 方法內部,還會通過 gcTrigger.test 方法進一步校驗觸發 GC 的條件是否滿足,留待第 3 章再作展開.

// runtime 包下的全局變量
var  forcegc   forcegcstate


type forcegcstate struct {
    lock mutex
    g    *g
    idle uint3


func init() {
    go forcegchelper()
}


func forcegchelper() {
    forcegc.g = getg()
    lockInit(&forcegc.lock, lockRankForcegc)
    for {
        lock(&forcegc.lock)
        // ...
        atomic.Store(&forcegc.idle, 1)
        // 令 forcegc.g 陷入被動阻塞,g 的狀態會設置爲 waiting,當達成 gc 條件時,g 的狀態會被切換至 runnable,方法纔會向下執行
        goparkunlock(&forcegc.lock, waitReasonForceGCIdle, traceEvGoBlock, 1)
        // g 被喚醒了,則調用 gcStart 方法真正開啓 gc 主流程
        gcStart(gcTrigger{kind: gcTriggerTime, now: nanotime()})
    }
}

(2)喚醒定時觸發協程

runtime 包下的 main 函數會通過 systemstack 操作切換至 g0(g0 是 Golang GMP 模型中的概念,如有疑惑,可參見我之前的文章:Golang GMP 原理),並調用 sysmon 方法,輪詢嘗試將 forcegchelper 協程添加到 gList 中,並在 injectglist 方法內將其喚醒:

func main() {
    // ...
    systemstack(func() {
        newm(sysmon, nil, -1)
    })   
    // ...
}
func sysmon() {
    // ...
    for { 
        // 通過 gcTrigger.test 方法檢查是否需要發起 gc,觸發類型爲 gcTriggerTime:定時觸發
        if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {     
            lock(&forcegc.lock)
            forcegc.idle = 0
            var list gList
            // 需要發起 gc,則將 forcegc.g 注入 list 中, injectglist 方法內部會執行喚醒操作
            list.push(forcegc.g)
            injectglist(&list)
            unlock(&forcegc.lock)
        }
        // ...
    }
}

(3)定時觸發 GC 條件校驗

在 gcTrigger.test 方法中,針對 gcTriggerTime 類型的觸發事件,其校驗條件則是觸發時間間隔達到 2 分鐘以上.

// 單位 nano,因此實際值爲 120s = 2min
var forcegcperiod int64 = 2 * 60 * 1e9


func (t gcTrigger) test() bool {
    // ...
    switch t.kind {
    // ...
    // 每 2 min 發起一輪 gc
    case gcTriggerTime:
        // ...
        lastgc := int64(atomic.Load64(&memstats.last_gc_nanotime))
        return lastgc != 0 && t.now-lastgc > forcegcperiod
    // ...
    }
    return true
}

2.3 對象分配觸發 GC

該流程源碼方法及文件如下表所示:

B2oZGw

在分配對象的 malloc 方法中,倘若滿足如下兩個條件之一,都會發起一次觸發 GC 的嘗試:

此時觸發事件類型爲 gcTriggerHeap,並在調用 gcStart 方法的內部執行 gcTrigger.test 進行條件檢查.

(1)對象分配觸發 GC

mallocgc 是分配對象的主流程方法:

func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
    // ...
    shouldhelpgc := false
    // ...
    if size <= maxSmallSize {
        if noscan && size < maxTinySize {
            // ...
            if v == 0 {
                // 倘若 mcache 中對應 spanClass 的 mspan 已滿,置 true
                v, span, shouldhelpgc = c.nextFree(tinySpanClass)
            }
            // ...
        } else {
            // ...
            if v == 0 {
                // 倘若 mcache 中對應 spanClass 的 mspan 已滿,置 true
                v, span, shouldhelpgc = c.nextFree(spc)
            }
            // ...
        }
    } else {
        // 申請大小大於 32KB 的大對象,直接置爲 true
        shouldhelpgc = true
        // ...
    }


    // ...
    // 嘗試觸發 gc,類型爲 gcTriggerHeap,觸發校驗邏輯同樣位於 gcTrigger.test 方法中
    if shouldhelpgc {
        if t := (gcTrigger{kind: gcTriggerHeap}); t.test() {
            gcStart(t)
        }
    }


   // ...
}

(2)校驗 GC 觸發條件

在 gcTrigger.test 方法中,針對 gcTriggerHeap 類型的觸發事件,其校驗條件是判斷當前堆已使用內存是否達到閾值. 此處的堆內存閾值會在上一輪 GC 結束時進行設定,具體內容將在本文 6.4 小節詳細討論.

func (t gcTrigger) test() bool {
    // ...
    switch t.kind {
    case gcTriggerHeap:      
        trigger, _ := gcController.trigger()
        // 倘若堆中已使用的內存大小達到了閾值,則會真正執行 gc
        return atomic.Load64(&gcController.heapLive) >= trigger
    // ...
    }
    return true
}

2.3 手動觸發 GC

最後一種觸發的 GC 形式是手動觸發,入口位於 runtime 包的公共方法:runtime.GC

nahwKT

用戶手動觸發 GC 時,事件類型爲 gcTriggerCycle.

func GC() {
    // ...
    gcStart(gcTrigger{kind: gcTriggerCycle, n: n + 1})
    // ...
}

針對這種類型的校驗條件是,上一輪 GC 已經完成,此時能夠開啓新一輪 GC 任務.

func (t gcTrigger) test() bool {
    // ...
    switch t.kind {
    // ...
    case gcTriggerCycle:
        return int32(t.n-work.cycles) > 0
    }
    return true
}

3 標記準備

本章開始步入標記準備階段的內容探討中,本章會揭祕屏障機制以及 STW 的底層實現,所涉及的源碼方法及文件位置如下表所示:

RrBm7T

3.1 主流程

gcStart 是標記準備階段的主流程方法,方法中完成的工作包括:

func gcStart(trigger gcTrigger) {
    // ...
    // 檢查是否達到 GC 條件,會根據 trigger 類型作 dispatch,常見的包括堆內存大小、GC 時間間隔、手動觸發的類型
    for trigger.test() && sweepone() != ^uintptr(0) {
        sweep.nbgsweep++
    }
    
    // 上鎖
    semacquire(&work.startSema)
    // 加鎖 double check
    if !trigger.test() {
        semrelease(&work.startSema)
        return
    }
    
    // ...
    // 由於進入了 GC 模式,會根據 P 的數量啓動多個 GC 併發標記協程,但是會先阻塞掛起,等待被喚醒
    gcBgMarkStartWorkers()
    
    // ...
    // 切換到 g0,執行 Stop the world 操作
    systemstack(stopTheWorldWithSema)
    // ...
    
    // 限制標記協程佔用 CPU 時間片的比例爲趨近 25%
    gcController.startCycle(now, int(gomaxprocs), trigger)
     
    // 設置GC階段爲_GCmark,啓用混合寫屏障
    setGCPhase(_GCmark)


    // ...
    // 對 mcache 中的 tiny 對象進行標記
    gcMarkTinyAllocs()


    // 切換至 g0,重新 start the world
    systemstack(func() {
        now = startTheWorldWithSema(trace.enabled)
       // ...
    })
    // ...
}

3.2 啓動標記協程

gcBgMarkStartWorkers 方法中啓動了對應於 P 數量的併發標記協程,並且通過 notetsleepg 的機制,使得 for 循環與 gcBgMarkWorker 內部形成聯動節奏,確保每個 P 都能分得一個 gcBgMarkWorker 標記協程.

func gcBgMarkStartWorkers() {
    // 開啓對應於 P 個數標記協程,但是內部將 g 添加到全局的 pool 中,並通過 gopark 阻塞掛起
    for gcBgMarkWorkerCount < gomaxprocs {
        go gcBgMarkWorker()
        // 掛起,等待 gcBgMarkWorker 方法中完成標記協程與 P 的綁定後喚醒
        notetsleepg(&work.bgMarkReady, -1)
        noteclear(&work.bgMarkReady)
        
        gcBgMarkWorkerCount++
    }
}

gcBgMarkWorker 方法中將 g 包裝成一個 node 天添加到全局的 gcBgMarkWorkerPool 中,保證標記協程與 P 的一對一關聯,並調用 gopark 方法將當前 g 掛起,等待被喚醒.

func gcBgMarkWorker() {
    gp := getg()
    node := new(gcBgMarkWorkerNode)
    gp.m.preemptoff = ""
    node.gp.set(gp)
    node.m.set(acquirem())
    // 喚醒外部的 for 循環
    notewakeup(&work.bgMarkReady)
    
    for {
        // 當前 g 阻塞至此,直到 gcController.findRunnableGCWorker 方法被調用,會將當前 g 喚醒
        gopark(func(g *g, nodep unsafe.Pointer) bool {
            node := (*gcBgMarkWorkerNode)(nodep)
            // ...
            // 將當前 g 包裝成一個 node 添加到 gcBgMarkWorkerPool 中
            gcBgMarkWorkerPool.push(&node.node)          
            return true
        }, unsafe.Pointer(node), waitReasonGCWorkerIdle, traceEvGoBlock, 0)
        // ...
    }
}

3.3 Stop the world

gcStart 方法在調用 gcBgMarkStartWorkers 方法異步啓動標記協程後,會執行 STW 操作停止所有用戶協程,其實現位於 stopTheWorldWithSema 方法,核心點如下:

func stopTheWorldWithSema() {
    _g_ := getg()


    // 全局調度鎖
    lock(&sched.lock)
    sched.stopwait = gomaxprocs
    // 此標識置 1,之後所有的調度都會阻塞等待
    atomic.Store(&sched.gcwaiting, 1)
    // 發送搶佔信息搶佔所有 G,後將 p 狀態置爲 syscall
    preemptall()
    // 將當前 p 的狀態置爲 stop
    _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
    sched.stopwait--
    // 把所有 p 的狀態置爲 stop
    for _, p := range allp {
        s := p.status
        if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
            // ...
            p.syscalltick++
            sched.stopwait--
        }
    }
    // 把空閒 p 的狀態置爲 stop
    now := nanotime()
    for {
        p, _ := pidleget(now)
        if p == nil {
            break
        }
        p.status = _Pgcstop
        sched.stopwait--
    }
    wait := sched.stopwait > 0
    unlock(&sched.lock)




    // 倘若有 p 無法被搶佔,則阻塞直到將其統統搶佔完成
    if wait {
        for {
            // wait for 100us, then try to re-preempt in case of any races
            if notetsleep(&sched.stopnote, 100*1000) {
                noteclear(&sched.stopnote)
                break
            }
            preemptall()
        }
    }


    // native 方法,stop the world
    worldStopped()
}

3.4 控制標記協程頻率

gcStart 方法中,還會通過 gcController.startCycle 方法,將標記協程對 CPU 的佔用率控制在 25% 左右. 此時,根據 P 的數量是否能被 4 整除,分爲兩種處理方式:

// 目標:標記協程對CPU的使用率維持在25%的水平
const gcBackgroundUtilization = 0.25


func (c *gcControllerState) startCycle(markStartTime int64, procs int, trigger gcTrigger) {
    // ...
    // P 的個數 * 0.25
    totalUtilizationGoal := float64(procs) * gcBackgroundUtilization
    // P 的個數 * 0.25 後四捨五入取整
    c.dedicatedMarkWorkersNeeded = int64(totalUtilizationGoal + 0.5)
    utilError := float64(c.dedicatedMarkWorkersNeeded)/totalUtilizationGoal - 1
    const maxUtilError = 0.3
    // 倘若 P 的個數不能被 4 整除
    if utilError < -maxUtilError || utilError > maxUtilError {        
        if float64(c.dedicatedMarkWorkersNeeded) > totalUtilizationGoal {    
            c.dedicatedMarkWorkersNeeded--
        }
        // 計算出每個 P 需要額外執行標記任務的時間片比例
        c.fractionalUtilizationGoal = (totalUtilizationGoal - float64(c.dedicatedMarkWorkersNeeded)) / float64(procs)
    // 倘若 P 的個數可以被 4 整除,則無需控制執行時長
    } else {
        c.fractionalUtilizationGoal = 0
    }
    // ...
}

3.5 設置寫屏障

隨後,gcStart 方法會調用 setGCPhase 方法,標誌 GC 正式進入併發標記(GCmark)階段. 我們觀察該方法代碼實現,可以注意到,在_GCMark 和_GCMarkTermination 階段中,會啓用混合寫屏障.

func setGCPhase(x uint32) {
    atomic.Store(&gcphase, x)
    writeBarrier.needed = gcphase == _GCmark || gcphase == _GCmarktermination
    writeBarrier.enabled = writeBarrier.needed || writeBarrier.cgo
}

在混合寫屏障機制中,核心是會將需要置灰的對象添加到當前 P 的 wbBuf 緩存中. 隨後在併發標記缺灰、標記終止前置檢查等時機會執行 wbBufFlush1 方法,批量地將 wbBuf 中的對象釋放出來進行置灰,保證達到預期的效果.

func wbBufFlush(dst *uintptr, src uintptr) {
    // ...
    systemstack(func() {
        wbBufFlush1(getg().m.p.ptr())
    })
}

wbBufFlush1 方法中涉及了對象置灰操作,其包含了在對應 mspan 的 bitmap 中打點標記以及將對象添加到 gcw 隊列兩步. 此處先不細究,後文 4.3 小節中,我們再作詳細介紹.

func wbBufFlush1(_p_ *p) {
    // 獲取當前 P 通過屏障機制緩存的指針
    start := uintptr(unsafe.Pointer(&_p_.wbBuf.buf[0]))
    n := (_p_.wbBuf.next - start) / unsafe.Sizeof(_p_.wbBuf.buf[0])
    ptrs := _p_.wbBuf.buf[:n]


    // 將緩存的指針作標記,添加到 gcw 隊列
    gcw := &_p_.gcw
    pos := 0
    for _, ptr := range ptrs {
        // ...
        obj, span, objIndex := findObject(ptr, 0, 0)
        if obj == 0 {
            continue
        }
        // 打標
        mbits := span.markBitsForIndex(objIndex)
        if mbits.isMarked() {
            continue
        }
        mbits.setMarked()
        // ...
    }


    // 所有緩存對象入隊
    gcw.putBatch(ptrs[:pos])
    _p_.wbBuf.reset()
}

3.6 Tiny 對象標記

gcStart 方法隨後還會調用 gcMarkTinyAllocs 方法中,遍歷所有的 P,對 mcache 中的 Tiny 對象分別調用 greyobject 方法進行置灰.

func gcMarkTinyAllocs() {
    assertWorldStopped()


    for _, p := range allp {
        c := p.mcache
        if c == nil || c.tiny == 0 {
            continue
        }
        // 獲取 tiny 對象
        _, span, objIndex := findObject(c.tiny, 0, 0)
        gcw := &p.gcw
        // tiny 對象置灰(標記 + 添加入隊)
        greyobject(c.tiny, 0, 0, span, gcw, objIndex)
    }
}

3.7 Start the world

startTheWorldWithSema 與 stopTheWorldWithSema 形成對偶. 該方法會重新恢復世界的生機,將所有 P 喚醒. 倘若缺少 M,則構造新的 M 爲 P 補齊(M 和 P 是 Golang GMP 模型中的概念,可參見我的文章 Golang GMP 原理).

func startTheWorldWithSema(emitTraceEvent bool) int64 {
    assertWorldStopped()
   
    // ...   
    p1 := procresize(procs)
    // 重啓世界
    worldStarted()


    // 遍歷所有 p,將其喚醒
    for p1 != nil {
        p := p1
        p1 = p1.link.ptr()
        if p.m != 0 {
            mp := p.m.ptr()
            p.m = 0
            if mp.nextp != 0 {
                throw("startTheWorld: inconsistent mp->nextp")
            }
            mp.nextp.set(p)
            notewakeup(&mp.park)
        } else {           
            newm(nil, p, -1)
        }
    }


    // ...
    return startTime
}

4 併發標記

下面比如難度曲線最陡峭的併發標記部分. 這部分內容承接上文 3.2 小節,講述標記協程在被喚醒後,需要執行的任務細節.

首先,我們先來理一下,這些標記協程是如何被喚醒的.

4.1 調度標記協程

8rHIrh

在 GMP 調度的主幹方法 schedule 中,會通過 g0 調用 findRunnable 方法 P 尋找下一個可執行的協程,找到後會調用 execute 方法,內部完成由 g0->g 的切換,真正執行用戶協程中的任務.

func schedule() {
    // ...
    gp, inheritTime, tryWakeP := findRunnable()
    // ...
    execute(gp, inheritTime)
}

在 findRunnable 方法中,當通過全局標識 gcBlackenEnabled 發現當前開啓 GC 模式時,會調用 gcControllerState.findRunnableGCWorker 喚醒並取得標記協程.

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    // ...
    if gcBlackenEnabled != 0 {
        gp, now = gcController.findRunnableGCWorker(_p_, now)
        if gp != nil {
            return gp, false, true
        }
    }
    // ...
}

在 gcControllerState.findRunnableGCWorker 方法中,會從全局的標記協程池 gcBgMarkWorkerPool 獲取到一個封裝了標記協程的 node. 並通過 gcControllerState 中 dedicatedMarkWorkersNeeded、fractionalUtilizationGoal 等字段標識判定標記協程的標記模式,然後將標記協程狀態由_Gwaiting 喚醒爲_Grunnable,並返回給 g0 用於執行.

這裏談到的標記模式對應了上文 3.4 小節的內容,並將在下文 4.2 小節詳細介紹.

func (c *gcControllerState) findRunnableGCWorker(_p_ *p, now int64) (*g, int64) {
    // ...
    // 保證當前 _p_ 是可以調度標記協程的,每個 p 只能執行一個標記協程
    if !gcMarkWorkAvailable(_p_) {
        return nil, now
    }


    // 從全局標記協程池子 gcBgMarkWorkerPool 中取出 g
    node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
    // ...


    decIfPositive := func(ptr *int64) bool {
        for {
            v := atomic.Loadint64(ptr)
            if v <= 0 {
                return false
            }


            if atomic.Casint64(ptr, v, v-1) {
                return true
            }
        }
    }


    // 確認標記的模式
    if decIfPositive(&c.dedicatedMarkWorkersNeeded) {      
        _p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode
    } else if c.fractionalUtilizationGoal == 0 {
                gcBgMarkWorkerPool.push(&node.node)
        return nil, now
    } else {
        delta := now - c.markStartTime
        if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal {
            // Nope. No need to run a fractional worker.
            gcBgMarkWorkerPool.push(&node.node)
            return nil, now
        }
        // Run a fractional worker.
        _p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode
    }


   // 將標記協程的狀態置爲 runnable,填了 gcBgMarkWorker 方法中 gopark 操作留下的坑
    gp := node.gp.ptr()
    casgstatus(gp, _Gwaiting, _Grunnable)
    return gp, n
}

4.2 併發標記啓動

xObjqN

標記協程被喚醒後,主線又重新拉回到 gcBgMarkWorker 方法中,此時會根據 3.4 小節中預設的標記模式,調用 gcDrain 方法開始執行併發標記工作.

標記模式包含以下三種:

值得一提的是,在執行專一模式時,會先以可被搶佔的模式嘗試執行,倘若真的被用戶協程搶佔,則會先將當前 P 本地隊列的用戶協程投放到全局 g 隊列中,再將標記模式改爲不可搶佔模式. 這樣設計的優勢是,通過負載均衡的方式,減少當前 P 下用戶協程的等待時長,提高用戶體驗.

在 gcDrain 方法中,有兩個核心的 gcDrainFlags 控制着標記協程的運行風格:

type gcDrainFlags int
const (
    gcDrainUntilPreempt gcDrainFlags = 1 << iota
    gcDrainFlushBgCredit
    gcDrainIdle
    gcDrainFractional
)
func gcBgMarkWorker() {
        // ...


        node.m.set(acquirem())
        pp := gp.m.p.ptr() // P can't change with preemption disabled.


       // ...
        
       // 根據不同的運作模式,執行 gcDrain 方法:
        systemstack(func() {
          
            casgstatus(gp, _Grunning, _Gwaiting)
            switch pp.gcMarkWorkerMode {
            default:
                throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
            case gcMarkWorkerDedicatedMode:
               // 先按照可搶佔模式執行標記協程,倘若被搶佔,則將搶佔協程添加到全局隊列中,之後再以不可搶佔模式執行標記協程
                gcDrain(&pp.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
                if gp.preempt {
                    // 將 p 本地隊列中的 g 添加到全局隊列
                    if drainQ, n := runqdrain(pp); n > 0 {
                        lock(&sched.lock)
                        globrunqputbatch(&drainQ, int32(n))
                        unlock(&sched.lock)
                    }
                }
                // Go back to draining, this time
                // without preemption.
                gcDrain(&pp.gcw, gcDrainFlushBgCredit)
            case gcMarkWorkerFractionalMode:
                gcDrain(&pp.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit)
            case gcMarkWorkerIdleMode:
                gcDrain(&pp.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
            }
            casgstatus(gp, _Gwaiting, _Grunning)
        })


        // ...
    }
}

4.3 標記主流程

gcDrain 方法是併發標記階段的核心方法:

func gcDrain(gcw *gcWork, flags gcDrainFlags) {
    // ...
    
    gp := getg().m.curg
    // 模式標記
    preemptible := flags&gcDrainUntilPreempt != 0
    flushBgCredit := flags&gcDrainFlushBgCredit != 0
    idle := flags&gcDrainIdle != 0


    // ...
    var check func() bool
    if flags&(gcDrainIdle|gcDrainFractional) != 0 {
        // ...
        if idle {
            check = pollWork
        } else if flags&gcDrainFractional != 0 {
            check = pollFractionalWorkerExit
        }
    }


    // 倘若根對象還未標記完成,則先進行根對象標記
    if work.markrootNext < work.markrootJobs {
        // Stop if we're preemptible or if someone wants to STW.
        for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
            job := atomic.Xadd(&work.markrootNext, +1) - 1
            if job >= work.markrootJobs {
                break
            }
            // 標記根對象
            markroot(gcw, job, flushBgCredit)
            // ...
        }
    }


    // 遍歷隊列,進行對象標記
    for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
        // work balance
        if work.full == 0 {
            gcw.balance()
        }


        // 嘗試從 p 本地隊列中獲取灰色對象,無鎖
        b := gcw.tryGetFast()
        if b == 0 {
            // 嘗試從全局隊列中獲取灰色對象,加鎖
            b = gcw.tryGet()
            if b == 0 {
                // 刷新寫屏障緩存
                wbBufFlush(nil, 0)
                b = gcw.tryGet()
            }
        }
        if b == 0 {
            // 已無對象需要標記
            break
        }
        // 進行對象的標記,並順延指針進行後續對象的掃描
        scanobject(b, gcw)


        // ...
        
        if gcw.heapScanWork >= gcCreditSlack {
            gcController.heapScanWork.Add(gcw.heapScanWork)
            // ...
            if checkWork <= 0 {
                // ...
                if check != nil && check() {
                    break
                }
            }
        }
    }


done:
    // 
}

4.4 灰對象緩存隊列

4.3 小節的源碼中,涉及到一個重要的數據結構:gcw,這是灰色對象的存儲代理和載體,在標記過程中需要持續不斷地從從隊列中取出灰色對象,進行掃描,並將新的灰色對象通過 gcw 添加到緩存隊列.

灰對象緩存隊列分爲兩層:

(1)gcWork

gcWork 數據結構源代碼如下所示.

type gcWork struct {
    // ...
    wbuf1, wbuf2 *workbuf
    // ...
}


type workbuf struct {
    workbufhdr
    obj [(_WorkbufSize - unsafe.Sizeof(workbufhdr{})) / goarch.PtrSize]uintptr
}




type workbufhdr struct {
    node lfnode 
    nobj int
}




type lfnode struct {
    next    uint64
    pushcnt uint
}

在 gcDrain 方法中,會持續不斷地從當前 P 的 gcw 中獲取灰色對象,在調用策略上,會先嚐試取私有部分,再通過 gcw 代理取全局共享部分:

        // 嘗試從 p 本地隊列中獲取灰色對象,無鎖
        b := gcw.tryGetFast()
        if b == 0 {
            // 嘗試從全局隊列中獲取灰色對象,加鎖
            b = gcw.tryGet()
            if b == 0 {
                // 因爲缺灰,會釋放寫屏障緩存,進行補灰操作
                wbBufFlush(nil, 0)
                b = gcw.tryGet()
            }
       }

gcWork.tryGetFast 方法中,會先嚐試從 gcWork.wbuf1 中獲取灰色對象.

func (w *gcWork) tryGetFast() uintptr {
    wbuf := w.wbuf1
    if wbuf == nil || wbuf.nobj == 0 {
        return 0
    }


    wbuf.nobj--
    return wbuf.obj[wbuf.nobj]
}

倘若 gcWork.wbuf1 缺灰,則會在 gcWork.tryGet 方法中交換 wbuf1 和 wbuf2,再嘗試獲取一次. 倘若仍然缺灰,則會調用 trygetfull 方法,從全局緩存隊列中獲取.

func (w *gcWork) tryGet() uintptr {
    wbuf := w.wbuf1
    if wbuf == nil {
        w.init()
        wbuf = w.wbuf1
        // wbuf is empty at this point.
    }
    if wbuf.nobj == 0 {
        w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
        wbuf = w.wbuf1
        if wbuf.nobj == 0 {
            owbuf := wbuf
            wbuf = trygetfull()
            if wbuf == nil {
                return 0
            }
            putempty(owbuf)
            w.wbuf1 = wbuf
        }
    }


    wbuf.nobj--
    return wbuf.obj[wbuf.nobj]
}

(2)workType.full

灰色對象的全局緩存隊列是一個棧結構,調用 pop 方法時,會通過 CAS 方式依次從棧頂取出一個緩存鏈表.

var work workType
type workType struct {
    full lfstack
    // ...
}
type lfstack uint64


func (head *lfstack) push(node *lfnode) {
    // ...
}


func (head *lfstack) pop() unsafe.Pointer {
    for {
        old := atomic.Load64((*uint64)(head))
        if old == 0 {
            return nil
        }
        node := lfstackUnpack(old)
        next := atomic.Load64(&node.next)
        if atomic.Cas64((*uint64)(head), old, next) {
            return unsafe.Pointer(node)
        }
    }
}
func trygetfull() *workbuf {
    b := (*workbuf)(work.full.pop())
    if b != nil {
        b.checknonempty()
        return b
    }
    return b
}

4.5 三色標記實現

Golang GC 的標記流程基於三色標記法實現. 此時在將理論落地實踐前,我們需要先搞清楚一個細節,那就是在代碼層面,黑、灰、白這三種顏色如何實現.

在前文 Golang 內存模型與分配機制中聊過,每個對象會有其從屬的 mspan,在 mspan 中,有着兩個 bitmap 存儲着每個對象大小的內存的狀態信息:

在垃圾清掃的過程中,並不會真正地將內存進行回收,而是在每個 mspan 中使用 gcmakrBits 對 allocBits 進行覆蓋. 在分配新對象時,當感知到 mspan 的 allocBits 中,某個對象槽位 bit 位值爲 0,則會將其視爲空閒內存進行使用,其本質上可能是一個覆蓋操作.

type mspan struct {
    // ...
    allocBits  *gcBits
    gcmarkBits *gcBits
    // ...
}


type gcBits uint8

介紹完了 bitmap 設定之後,下面回到三種標記色的實現當中:

有了以上的基礎設定之後,我們已經可以在腦海中搭建三色標記法的實現框架:

4.6 中止標記協程

gcDrain 方法中,針對空閒模式 idle 和分時模式 fractional,會設定 check 函數,在循環掃描的過程中檢測是否需要中斷當前標記協程.

func gcDrain(gcw *gcWork, flags gcDrainFlags) {
    // ...
    // ...
    idle := flags&gcDrainIdle != 0


    // ...
    var check func() bool
    if flags&(gcDrainIdle|gcDrainFractional) != 0 {
        // ...
        if idle {
            check = pollWork
        } else if flags&gcDrainFractional != 0 {
            check = pollFractionalWorkerExit
        }
    }
    // ...
    // 遍歷隊列,進行對象標記
    for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
        // ...   
        if gcw.heapScanWork >= gcCreditSlack {
            gcController.heapScanWork.Add(gcw.heapScanWork)
            // ...
            if checkWork <= 0 {
                // ...
                if check != nil && check() {
                    break
                }
            }
        }
    }




done:
    //
}

對應於 idle 模式的 check 函數是 pollwork,方法中判斷 P 本地隊列存在就緒的 g 或者存在就緒的網絡寫成,就會對當前標記協程進行中斷:

func pollWork() bool {
    if sched.runqsize != 0 {
        return true
    }
    p := getg().m.p.ptr()
    if !runqempty(p) {
        return true
    }
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 {
        if list := netpoll(0); !list.empty() {
            injectglist(&list)
            return true
        }
    }
    return false
}

對應於 fractional 模式的 check 函數是 pollFractionalWorkerExit,倘若當前標記協程執行的時間比例大於 1.2 倍的 fractionalUtilizationGoal 閾值(3.4 小節中設置),就會中止標記協程.

func pollFractionalWorkerExit() bool {
    
    now := nanotime()
    delta := now - gcController.markStartTime
    if delta <= 0 {
        return true
    }
    p := getg().m.p.ptr()
    selfTime := p.gcFractionalMarkTime + (now - p.gcMarkWorkerStartTime)
   
    return float64(selfTime)/float64(delta) > 1.2*gcController.fractionalUtilizationGoal
}

4.7 掃描根對象

在 gcDrain 方法正式開始循環掃描前,還會先對根對象進行掃描標記. Golang 中的根對象包括如下幾項:

實現根對象掃描的方法是 markroot:

func markroot(gcw *gcWork, i uint32, flushBgCredit bool) int64 {
    var workDone int64
    var workCounter *atomic.Int64
    switch {
    // 處理已初始化的全局變量
    case work.baseData <= i && i < work.baseBSS:
        workCounter = &gcController.globalsScanWork
        for _, datap := range activeModules() {
            workDone += markrootBlock(datap.data, datap.edata-datap.data, datap.gcdatamask.bytedata, gcw, int(i-work.baseData))
        }
    // 處理未初始化的全局變量
    case work.baseBSS <= i && i < work.baseSpans:
        workCounter = &gcController.globalsScanWork
        for _, datap := range activeModules() {
            workDone += markrootBlock(datap.bss, datap.ebss-datap.bss, datap.gcbssmask.bytedata, gcw, int(i-work.baseBSS))
        }
    // 處理 finalizer 隊列
    case i == fixedRootFinalizers:
        for fb := allfin; fb != nil; fb = fb.alllink {
            cnt := uintptr(atomic.Load(&fb.cnt))
            scanblock(uintptr(unsafe.Pointer(&fb.fin[0])), cnt*unsafe.Sizeof(fb.fin[0])&finptrmask[0], gcw, nil)
        }
    //  釋放已終止的 g 的棧
    case i == fixedRootFreeGStacks:
        systemstack(markrootFreeGStacks)
    // 掃描 mspan 中的 special
    case work.baseSpans <= i && i < work.baseStacks:
        markrootSpans(gcw, int(i-work.baseSpans))


    default:
        // ...
        // 獲取需要掃描的 g
        gp := work.stackRoots[i-work.baseStacks]
        // ...
        // 切回到 g0執行工作,掃描 g 的棧
        systemstack(func() {
            // ...
            // 棧掃描
            workDone += scanstack(gp, gcw)
           // ...
        })
    }
    // ...
    return workDone
}

其中,棧掃描方法鏈路展開如下:

func scanstack(gp *g, gcw *gcWork) int64 {
    // ...


    scanframe := func(frame *stkframe, unused unsafe.Pointer) bool {
        scanframeworker(frame, &state, gcw)
        return true
    }
    gentraceback(^uintptr(0), ^uintptr(0), 0, gp, 0, nil, 0x7fffffff, scanframe, nil, 0)
   // ...
}
func scanframeworker(frame *stkframe, state *stackScanState, gcw *gcWork) {
    // ...
    // 掃描局部變量
    if locals.n > 0 {
        size := uintptr(locals.n) * goarch.PtrSize
        scanblock(frame.varp-size, size, locals.bytedata, gcw, state)
    }


    // 掃描函數參數
    if args.n > 0 {
        scanblock(frame.argp, uintptr(args.n)*goarch.PtrSize, args.bytedata, gcw, state)
    }
    // ...
}

不論是全局變量掃描還是棧變量掃描,底層都會調用到 scanblock 方法. 在掃描時,會通過位圖 ptrmask 輔助加速流程. 在 ptrmask 當中,每個 bit 位對應了一個指針大小(8B)的位置的標識信息,指明當前位置是否是指針,倘若非指針,則直接跳過掃描.

此外, 在標記一個對象時, 需要獲取到該對象所在 mspan, 這一過程會使用到 heapArena 中關於頁和 mspan 間的映射索引(如有存疑可以看我的文章 Golang 內存模型與分配機制),這部分內容放在 4.7 小節中集中闡述.

func scanblock(b0, n0 uintptr, ptrmask *uint8, gcw *gcWork, stk *stackScanState) {
  // ...
  b := b0
  n := n0
  // 遍歷待掃描的地址
  for i := uintptr(0); i < n; {
  // 找到 bitmap 對應的 byte. ptrmask 輔助標識了 .data 一個指針的大小,bit 位爲 1 代表當前位置是一個指針
  bits := uint32(*addb(ptrmask, i/(goarch.PtrSize*8)))
       // 非指針,跳過
   if bits == 0 {
        i += goarch.PtrSize * 8
        continue
    }
    for j := 0; j < 8 && i < n; j++ {
      if bits&1 != 0 {
      // Same work as in scanobject; see comments there.
        p := *(*uintptr)(unsafe.Pointer(b + i))
        if p != 0 {
          if obj, span, objIndex := findObject(p, b, i); obj != 0 {
            greyobject(obj, b, i, span, gcw, objIndex)
          } else if stk != nil && p >= stk.stack.lo && p < stk.stack.hi {
           stk.putPtr(p, false)
          }
        }
      }
      bits >>= 1
      i += goarch.PtrSize
    }
  }
 }

4.7 掃描普通對象

gcDrain 方法中,會持續從灰對象緩存隊列中取出灰對象,然後採用 scanobject 方法進行處理.

func gcDrain(gcw *gcWork, flags gcDrainFlags) {
    // ...
    // 遍歷隊列,進行對象標記
    for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
       
        // 嘗試從 p 本地隊列中獲取灰色對象,無鎖
        b := gcw.tryGetFast()
        if b == 0 {
            // 嘗試從全局隊列中獲取灰色對象,加鎖
            b = gcw.tryGet()
            if b == 0 {
                // 刷新寫屏障緩存
                wbBufFlush(nil, 0)
                b = gcw.tryGet()
            }
        }
        if b == 0 {
            // 已無對象需要標記
            break
        }
        // 進行對象的標記,並順延指針進行後續對象的掃描
        scanobject(b, gcw)


    }




done:
    //
}

scanobject 方法遍歷當前灰對象中的指針,依次調用 greyobject 方法將其指向的對象進行置灰操作.

const (
    bitPointer = 1 << 0
    bitScan    = 1 << 4
)


func scanobject(b uintptr, gcw *gcWork) {
    // 通過地址映射到所屬的頁
    // 通過 heapArena 中的映射信息,從頁映射到所屬的 mspan
    hbits := heapBitsForAddr(b)
    s := spanOfUnchecked(b)
    n := s.elemsize
    // ...


    // 順延當前對象的成員指針,掃描後續的對象
    var i uintptr
    for i = 0; i < n; i, hbits = i+goarch.PtrSize, hbits.next() {
        // 通過 heapArena 中的 bitmap 記錄的信息,加速遍歷過程
        bits := hbits.bits()
        if bits&bitScan == 0 {
            break // no more pointers in this object
        }
        if bits&bitPointer == 0 {
            continue // not a pointer
        }


        obj := *(*uintptr)(unsafe.Pointer(b + i))
      
        if obj != 0 && obj-b >= n {
            // 對於遍歷到的對象,將其置灰,並添加到隊列中,等待後續掃描
            if obj, span, objIndex := findObject(obj, b, i); obj != 0 {
                greyobject(obj, b, i, span, gcw, objIndex)
            }
        }
    }
    // ...
}

在 scanobject 方法中還涉及到兩項細節:

(1)如何通過對象地址找到其所屬的 mspan

首先根據對象地址,可以定位到對象所屬的頁,進一步可以通過地址偏移定位到其所屬的 heapArena. 在 heapArena 中,已經提前建立好了從頁映射到 mspan 的索引,於是我們通過這一鏈路,實現從對象地址到 mspan 的映射. 從而能夠獲得 mspan.gcmarkBits 進行 bitmap 標記操作.

type heapArena struct {
    spans [pagesPerArena]*mspan
}
func findObject(p, refBase, refOff uintptr) (base uintptr, s *mspan, objIndex uintptr) {
    s = spanOf(p)
    // ...
    return
}


func spanOf(p uintptr) *mspan {
    // ...
    ri := arenaIndex(p)
    // ...
    l2 := mheap_.arenas[ri.l1()]
    // ...
    ha := l2[ri.l2()]
    // ...
    return ha.spans[(p/pageSize)%pagesPerArena]
}

(2)如何加速掃描過程

在 heapArena 中,通過一個額外的 bitmap 存儲了內存信息:

bitmap 中,每兩個 bit 記錄一個指針大小的內存空間的信息(8B),其中一個 bit 標誌了該位置是否是指針;另一個 bit 標誌了該位置往後是否還存在指針,於是在遍歷掃描的過程中,可以通過這兩部分信息推進 for 循環的展開速度.

const heapArenaBitmapBytes untyped int = 2097152


type heapArena struct {
    bitmap [heapArenaBitmapBytes]byte
    // ...
}
func heapBitsForAddr(addr uintptr) (h heapBits) {
    // 2 bits per word, 4 pairs per byte, and a mask is hard coded.
    arena := arenaIndex(addr)
    ha := mheap_.arenas[arena.l1()][arena.l2()]
    if ha == nil {
        return
    }
    h.bitp = &ha.bitmap[(addr/(goarch.PtrSize*4))%heapArenaBitmapBytes]
    h.shift = uint32((addr / goarch.PtrSize) & 3)
    h.arena = uint32(arena)
    h.last = &ha.bitmap[len(ha.bitmap)-1]
    return
}
func (h heapBits) bits() uint32 {
    // The (shift & 31) eliminates a test and conditional branch
    // from the generated code.
    return uint32(*h.bitp) >> (h.shift & 31)
}

4.8 對象置灰

對象置灰操作位於 greyobject 方法中. 如 4.5 小節所屬,置灰分兩步:

func greyobject(obj, base, off uintptr, span *mspan, gcw *gcWork, objIndex uintptr) {
    // ...
    // 在其所屬的 mspan 中,將對應位置的 gcMark bitmap 位置爲 1
    mbits.setMarked()
    
    // ...
    // 將對象添加到當前 p 的本地隊列
    if !gcw.putFast(obj) {
        gcw.put(obj)
    }
}

4.9 新分配對象置黑

此外,值得一提的是,GC 期間新分配的對象,會被直接置黑,呼應了混合寫屏障中的設定.

func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
        // ...
        if gcphase != _GCoff {
            gcmarknewobject(span, uintptr(x), size, scanSize)
        }
        // ...
}
func gcmarknewobject(span *mspan, obj, size, scanSize uintptr) {
    // ...
    objIndex := span.objIndex(obj)
    // 標記對象
    span.markBitsForIndex(objIndex).setMarked()
    // ...
}

5 輔助標記

5.1 輔助標記策略

在併發標記階段,由於用戶協程與標記協程共同工作,因此在極端場景下可能存在一個問題——倘若用戶協程分配對象的速度快於標記協程標記對象的速度,這樣標記階段豈不是永遠無法結束?

爲規避這一問題,Golang GC 引入了輔助標記的策略,建立了一個兜底的機制:在最壞情況下,一個用戶協程分配了多少內存,就需要完成對應量的標記任務.

在每個用戶協程 g 中,有一個字段 gcAssisBytes,象徵 GC 期間可分配內存資產的概念,每個 g 在 GC 期間輔助標記了多大的內存空間,就會獲得對應大小的資產,使得其在 GC 期間能多分配對應大小的內存進行對象創建.

type g struct {
    // ...
    gcAssistBytes int64
}
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
    // ...
    var assistG *g
    if gcBlackenEnabled != 0 {       
        assistG = getg()
        if assistG.m.curg != nil {
            assistG = assistG.m.curg
        }
        // 每個 g 會有資產
        assistG.gcAssistBytes -= int64(size)


        if assistG.gcAssistBytes < 0 {           
            gcAssistAlloc(assistG)
        }
    }
}

5.2 輔助標記執行

由於各對象中,可能存在部分不包含指針的字段,這部分字段是無需進行掃描的. 因此真正需要掃描的內存量會小於實際的內存大小,兩者之間的比例通過 gcController.assistWorkPerByte 進行記錄.

於是當一個用戶協程在 GC 期間需要分配 M 大小的新對象時,實際上需要完成的輔助標記量應該爲 assistWorkPerByte*M.

輔助標記邏輯位於 gcAssistAlloc 方法. 在該方法中,會先嚐試從公共資產池 gcController.bgScanCredit 中偷取資產,倘若資產仍然不夠,則會通過 systemstack 方法切換至 g0,並在 gcAssistAlloc1 方法內調用 gcDrainN 方法參與到併發標記流程當中.

func gcAssistAlloc(gp *g) {
    // ...
    // 計算待完成的任務量
    debtBytes := -gp.gcAssistBytes
    assistWorkPerByte := gcController.assistWorkPerByte.Load()
    scanWork := int64(assistWorkPerByte * float64(debtBytes))
    if scanWork < gcOverAssistWork {
        scanWork = gcOverAssistWork
        debtBytes = int64(assistBytesPerWork * float64(scanWork))
    }


    // 先嚐試從全局的可用資產中偷取
    bgScanCredit := atomic.Loadint64(&gcController.bgScanCredit)
    stolen := int64(0)
    if bgScanCredit > 0 {
        if bgScanCredit < scanWork {
            stolen = bgScanCredit
            gp.gcAssistBytes += 1 + int64(assistBytesPerWork*float64(stolen))
        } else {
            stolen = scanWork
            gp.gcAssistBytes += debtBytes
        }
        atomic.Xaddint64(&gcController.bgScanCredit, -stolen)
        scanWork -= stolen
        // 全局資產夠用,則無需輔助標記,直接返回
        if scanWork == 0 {         
            return
        }
    }


    // 切換到 g0,開始執行標記任務
    systemstack(func() {
        gcAssistAlloc1(gp, scanWork)        
    })


    // 輔助標記完成
    completed := gp.param != nil
    gp.param = nil
    if completed {
        gcMarkDone()
    }
    // ...
}

6 標記終止

5lzfOQ

6.1 標記完成

在併發標記階段的 gcBgMarkWorker 方法中,當最後一個標記協程也完成任務後,會調用 gcMarkDone 方法,開始執行併發標記後處理的邏輯.

func gcBgMarkWorker() {
    // ...
    for{
        // ...
        if incnwait == work.nproc && !gcMarkWorkAvailable(nil) {
            // ...
            gcMarkDone()
        }
    }
}

gcMarkDone 方法中,會遍歷釋放所有 P 的寫屏障緩存,查看是否存在因屏障機制遺留的灰色對象,如果有,則會推出 gcMarkDone 方法,回退到 gcBgMarkWorker 的主循環中,繼續完成標記任務.

倘若寫屏障中也沒有遺留的灰對象,此時會調用 STW 停止世界,並步入 gcMarkTermination 方法進入標記終止階段.

func gcMarkDone()
top:    
    if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) {
        semrelease(&work.markDoneSema)
        return    
    }    
    // ...
    // 切換到 p0
    systemstack(func() {
        gp := getg().m.curg
        casgstatus(gp, _Grunning, _Gwaiting)
        forEachP(func(_p_ *p) {
            // 釋放一波寫屏障的緩存,可能會新的待標記任務產生            
            wbBufFlush1(_p_)
        })
        casgstatus(gp, _Gwaiting, _Grunning)
    })


    // 倘若有新的標記對象待處理,則調回 top 處,可能會回退到併發標記階段
    if gcMarkDoneFlushed != 0 {
        // ...
        goto top
    }
    // 正式進入標記完成階段,會STW
    systemstack(stopTheWorldWithSema)
    // ...
    // 在 STW 狀態下,進入標記終止階段
    gcMarkTermination()
}

6.2 標記終止

gcMarkTermination 方法包括幾個核心步驟:

func gcMarkTermination() {


    // 設置GC階段進入標記終止階段
    setGCPhase(_GCmarktermination)
    // ...


    systemstack(func() {
       // ...
        // 設置GC階段進入標記關閉階段
        setGCPhase(_GCoff)  
        // 開始執行標記清掃動作     
        gcSweep(work.mode)
    })
   // 提交下一輪GC的內存閾值
   systemstack(gcControllerCommit)
   // ...
   systemstack(func() { startTheWorldWithSema(true) })
    // ...
}

6.3 標記清掃

gwSweep 方法的核心是調用 ready 方法,喚醒了因爲 gopark 操作陷入被動阻塞的清掃協程 sweep.g.

func gcSweep(mode gcMode) {
    assertWorldStopped()
   // ...


    // 喚醒後臺清掃任務
    lock(&sweep.lock)
    if sweep.parked {
        sweep.parked = false
        ready(sweep.g, 0, true)
    }
    unlock(&sweep.lock)
}

那麼 sweep.g 是在何時被創建,又是在何時被 park 的呢?

我們重新回到 runtime 包的 main 函數中,開始向下追溯:

func main() {
    // ...
    gcenable()
    // ...
}
func gcenable() {    
    // ...
    go bgsweep(c)
    <-c
    // ...
}

可以看到,在異步啓動的 bgsweep 方法中,會首先將當前協程 gopark 掛起,等待被喚醒.

當在標記終止階段被喚醒後,會進入 for 循環,每輪完成一個 mspan 的清掃工作,隨後就調用 Gosched 方法主動讓渡 P 的執行權,採用這種懶清掃的方式逐步推進標記清掃流程.

func bgsweep(c chan int) {
    sweep.g = getg()


    lockInit(&sweep.lock, lockRankSweep)
    lock(&sweep.lock)
    sweep.parked = true
    c <- 1
    // 執行 gopark 操作,等待 GC 併發標記階段完成後將當前協程喚醒
    goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)


    for {
        // 每清掃一個 mspan 後,會發起主動讓渡
        for sweepone() != ^uintptr(0) {
            sweep.nbgsweep++
            Gosched()
        }
        // ...
        lock(&sweep.lock)
        if !isSweepDone() {
            // This can happen if a GC runs between
            // gosweepone returning ^0 above
            // and the lock being acquired.
            unlock(&sweep.lock)
            continue
        }
        // 清掃完成,則繼續 gopark 被動阻塞
        sweep.parked = true
        goparkunlock(&sweep.lock, waitReasonGCSweepWait, traceEvGoBlock, 1)
    }
}

sweepone 方法每次清掃一個協程,清掃邏輯核心位於 sweepLocked.sweep 方法中,正是將 mspan 的 gcmarkBits 賦給 allocBits,並創建出一個空白的 bitmap 作爲新的 gcmarkBits. 這一實現呼應了本文 4.5 小節談到的設定.

func sweepone() uintptr {
    // ...
    sl := sweep.active.begin()
    // ...
    for {
        // 查找到一個待清掃的 mspan
        s := mheap_.nextSpanForSweep()
        // ...
        if s, ok := sl.tryAcquire(s); ok {
            npages = s.npages
            // 對一個 mspan 進行清掃
            if s.sweep(false) {
                // Whole span was freed. Count it toward the
                // page reclaimer credit since these pages can
                // now be used for span allocation.
                mheap_.reclaimCredit.Add(npages)
            } else {
                // Span is still in-use, so this returned no
                // pages to the heap and the span needs to
                // move to the swept in-use list.
                npages = 0
            }
            break
        }
    }
    sweep.active.end(sl)


    // ...
    return npages
}
func (sl *sweepLocked) sweep(preserve bool) bool {
    // ...
    s.allocBits = s.gcmarkBits
    s.gcmarkBits = newMarkBits(s.nelems)
    // ...
}

6.4 設置下輪 GC 閾值

在 gcMarkTermination 方法中,還會通過 g0 調用 gcControllerCommit 方法,完成下輪觸發 GC 的內存閾值的設定.

func gcMarkTermination() {
   // ... 
   // 提交下一輪GC的內存閾值
   systemstack(gcControllerCommit)
   // ...
}
func gcControllerCommit() {
    assertWorldStoppedOrLockHeld(&mheap_.lock)


    gcController.commit(isSweepDone())
    // ...
}

在 gcControllerState.commit 方法中,會讀取 gcControllerState.gcPercent 字段值作爲觸發 GC 的堆使用內存增長比例,並結合當前堆內存的使用情況,推算出觸發下輪 GC 的內存閾值,設置到 gcControllerState.gcPercentHeapGoal 字段中.

func (c *gcControllerState) commit(isSweepDone bool) {
    // ...
    gcPercentHeapGoal := ^uint64(0)
    // gcPercent 值,用戶可以通過環境變量 GOGC 顯式設置. 未設時,默認值爲 100.
    if gcPercent := c.gcPercent.Load(); gcPercent >= 0 {
        gcPercentHeapGoal = c.heapMarked + (c.heapMarked+atomic.Load64(&c.lastStackScan)+atomic.Load64(&c.globalsScan))*uint64(gcPercent)/100
    }
    // ...
    c.gcPercentHeapGoal.Store(gcPercentHeapGoal)
    // ...
}

在新一輪嘗試觸發 GC 的過程中,對於 gcTriggerHeap 類型的觸發事件,會調用 gcController.trigger 方法,讀取到 gcControllerState.gcPercentHeapGoal 中存儲的內存閾值,進行觸發條件校驗.

func (t gcTrigger) test() bool {
    // ...
    switch t.kind {
        case gcTriggerHeap:
        // ...
            trigger, _ := gcController.trigger()
            return atomic.Load64(&gcController.heapLive) >= trigger
        // ...
    }
    return true
}
func (c *gcControllerState) trigger() (uint64, uint64) {
    goal, minTrigger := c.heapGoalInternal()
    // ...
    var trigger uint64
    runway := c.runway.Load()
    // ...
    trigger = goal - runway
    // ...
    return trigger, goal
}
func (c *gcControllerState) heapGoalInternal() (goal, minTrigger uint64) {    
    goal = c.gcPercentHeapGoal.Load()
    // ...
    return
}

7 系統駐留內存清理

Golang 進程從操作系統主內存(Random-Access Memory,簡稱 RAM)中申請到堆中進行復用的內存部分稱爲駐留內存(Resident Set Size,RSS). 顯然,RSS 不可能只借不還,應當遵循實際使用情況進行動態擴縮.

Golang 運行時會異步啓動一個回收協程,以趨近於 1% CPU 使用率作爲目標,持續地對 RSS 中的空閒內存進行回收.

7.1 回收協程啓動

在 runtime 包下的 main 函數中,會異步啓動回收協程 bgscavenge,源碼如下:

func main() {
    // ...
    gcenable()
    // ...
}
func gcenable() {    
    // ...
    go bgscavenge(c)
    <-c
    // ...
}

7.2 執行頻率控制

在 bgscavenge 方法中,通過 for 循環 + sleep 的方式,控制回收協程的執行頻率在佔用 CPU 時間片的 1% 左右. 其中回收 RSS 的核心邏輯位於 scavengerState.run 方法.

func bgscavenge(c chan int) {
    scavenger.init()


    c <- 1
    scavenger.park()
    // 如果當前操作系統分配內存>目標內存
    for {
        // 釋放內存
        released, workTime := scavenger.run()
        if released == 0 {
            scavenger.park()
            continue
        }
        atomic.Xadduintptr(&mheap_.pages.scav.released, released)
        scavenger.sleep(workTime)
    }
}

7.3 回收空閒內存

scavengerState.run 方法中,會開啓循環,經歷 pageAlloc.scavenge -> pageAlloc.scavengeOne 的調用鏈,最終通過 sysUnused 方法進行空閒內存頁的回收.

func (s *scavengerState) run() (released uintptr, worked float64) {
    // ...
    for worked < minScavWorkTime {
        // ...
        const scavengeQuantum = 64 << 10
        r, duration := s.scavenge(scavengeQuantum)
        // ...
    }
    return
}
func (p *pageAlloc) scavenge(nbytes uintptr, shouldStop func() bool) uintptr {
    released := uintptr(0)
    for released < nbytes {
        ci, pageIdx := p.scav.index.find()
        if ci == 0 {
            break
        }
        systemstack(func() {
            released += p.scavengeOne(ci, pageIdx, nbytes-released)
        })
        if shouldStop != nil && shouldStop() {
            break
        }
    }
    return released
}

在 pageAlloc.scavengeOne 方法中,通過 findScavengeCandidate 方法尋找到待回收的頁,通過 sysUnused 方法發起系統調用進行內存回收.

func (p *pageAlloc) scavengeOne(ci chunkIdx, searchIdx uint, max uintptr) uintptr {
    // ...
    lock(p.mheapLock)
    if p.summary[len(p.summary)-1][ci].max() >= uint(minPages) {
        // 找到待回收的部分
        base, npages := p.chunkOf(ci).findScavengeCandidate(pallocChunkPages-1, minPages, maxPages)


        // If we found something, scavenge it and return!
        if npages != 0 {
            // Compute the full address for the start of the range.
            addr := chunkBase(ci) + uintptr(base)*pageSize
            // ...
            unlock(p.mheapLock)


            if !p.test {
                // 發起系統調用,回收內存
                sysUnused(unsafe.Pointer(addr), uintptr(npages)*pageSize)


                // 更新狀態信息
                nbytes := int64(npages) * pageSize
                gcController.heapReleased.add(nbytes)
                gcController.heapFree.add(-nbytes)


                stats := memstats.heapStats.acquire()
                atomic.Xaddint64(&stats.committed, -nbytes)
                atomic.Xaddint64(&stats.released, nbytes)
                memstats.heapStats.release()
            }


            // 更新基數樹信息
            lock(p.mheapLock)
            p.free(addr, uintptr(npages)true)
            p.chunkOf(ci).scavenged.setRange(base, npages)
            unlock(p.mheapLock)


            return uintptr(npages) * pageSize
        }
    }
   // 
}

前文 Golang 內存模型與分配機制中,我們有介紹過,在 Golang 堆中會基於基數樹的形式建立空閒頁的索引,且基數樹每個葉子節點對應了一個 chunk 塊大小的內存 (512 * 8KB = 4MB).

其中 chunk 的封裝類 pallocData 中有還兩個核心字段,一個 pallocBits 中標識了一個頁是否被佔用了(1 佔用、0 空閒),同時還有另一個 scavenged bitmap 用於表示一個頁是否已經被操作系統回收了(1 已回收、0 未回收). 因此,回收協程的目標就是找到某個頁,當其 pallocBits 和 scavenged 中的 bit 都爲 0 時, 代表其可以回收.

由於回收時,可能需要同時回收多個頁. 此時會利用基數樹的特性, 幫助快速找到連續的空閒可回收的頁位置.

type pallocData struct {
    pallocBits
    scavenged pageBits
}

8 總結

至此,Golang 內存管理系列的三篇文章全部結束. 如有紕漏,歡迎批評指正.

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Db19tKNer8D6FX6UG-Yujw