溫故知新——Golang GMP 萬字洗髓經

0 前言

我在 23 年初曾發佈過一篇——golang gmp 原理解析,當時剛開始接觸 go 底層源碼,視野廣度和理解深度都有所不足,對一些核心環節的思考和挖掘有所欠缺,對其中某些局部細節又過分拘泥,整體內容質量上還是有所不足.

隨着近期嘗試接觸了 golang 以外的語言,通過橫向對比後,對於 golang 中 gmp 的精妙設計也產生了一些新的感悟. 於是就藉着這個契機開啓一個重置篇,對老版內容進行查缺補漏,力求能夠溫故而知新.

本文會分爲兩大部分,共 5 章內容:

第一部分偏於宏觀,對基礎知識和整體架構加以介紹:

第二部分則着眼於 goroutine 的生命週期變化過程:

提前做個聲明,本文涉及大量對 golang runtime 標準庫源碼的閱讀環節,其中展示的源碼版本統一爲 v1.19 版本.

另外,在學習過程中也和作爲同事兼戰友的龍俊一起針對 gmp 技術話題有過很多次交流探討,這裏要特別緻敬一下龍哥.

1 基礎概念

1.1 從線程到協程

線程(Thread)與協程(Coroutine)是併發編程中的經典概念:

總結來說,線程更加簡單直觀,天然契合操作系統調度模型;協程是用戶態下二次加工的產物,需要引入額外的複雜度,但是相對於線程而言有着更輕的粒度和更小的開銷.

1.2 從協程到 goroutine

golang 是一門天然支持協程的語言,goroutine 是其對協程的本土化實現,並且在原生協程的基礎上做了很大的優化改進.

當我們聊到 goroutine ,需要明白這不是一個能被單獨拆解的概念,其本身是強依附於 gmp(goroutine-machine-processor)體系而生的,通過 gmp 架構的建設,使得 goroutine 相比於原生協程具備着如下核心優勢:

此外,golang 中完全屏蔽了線程的概念,圍繞着 gmp 打造的一系列併發工具都以 g 爲併發粒度,可以說是完全統一了 golang 併發世界的秩序,做到了類似 “書同文、車同軌” 的效果

1.3 gmp 架構

gmp = goroutine + machine + processor. 下面我們對這三個核心組件展開介紹:

1)g

我們可以把 gmp 理解爲一個任務調度系統,那麼 g 就是這個系統中所謂的 “任務”,是一種需要被分配和執行的 “資源”.

2)m

當我們把 gmp 理解爲一個任務調度系統,那麼 m 就是這個系統中的”引擎 “. 當 m 和 p 結合後,就限定了” 引擎 “的運行是圍繞着 gmp 這條軌道進行的,使得” 引擎“運行着兩個週而復始、不斷交替的步驟——尋找任務(執行 g0);執行任務(執行 g)

3) p

當我們把 gmp 理解爲一個任務調度系統,那麼 p 就是這個系統中的”中樞 “,當其和作爲” 引擎 “ 的 m 結合後,纔會引導“引擎” 進入 gmp 的運行模式;同時 p 也是這個系統中存儲 “任務” 的“容器”,爲 “引擎” 提供了用於執行的任務資源.

結合上圖可以看到,承載 g 的容器分爲兩個部分:

當 m 與 p 結合後,不論是創建 g 還是獲取 g,都優先從私有的 lrq 中獲取,從而儘可能減少併發競爭行爲;這裏聊到併發情況較少,但並非完全沒有,是因爲還可能存在來自其他 p 的竊取行爲(stealwork)

介紹完了 g 的存儲容器設計後,接下來聊聊將 g 放入容器和取出容器的流程設計:

在 get g 流程中,還有一個細節需要注意,就是在 g0 每經過 61 次調度循環後,下一次會在處理 lrq 前優先處理一次 grq,避免因 lrq 過於忙碌而致使 grq 陷入饑荒狀態

1.4 gmp 生態

在 golang 中已經完全屏蔽了線程的概念,將 goroutine 統一爲整個語言層面的併發粒度,並遵循着 gmp 的秩序進行運作. 如果把 golang 程序比做一個人的話,那麼 gmp 就是這個人的骨架,支持着他的直立與行走;而在此基礎之上,緊密圍繞着 gmp 理念打造設計的一系列工具、模塊則像是在骨架之上填充的血肉,是依附於這套框架而存在的. 下面我們來看其中幾個經典的案例:

(1)內存管理

golang 的內存管理模塊主要繼承自 TCMalloc(Thread-Caching-Malloc)的設計思路,其中由契合 gmp 模型做了因地制宜的適配改造,爲每個 p 準備了一份私有的高速緩存——mcache,能夠無鎖化地完成一部分 p 本地的內存分配操作.

更多有關 golang 內存管理與垃圾回收的內容,可以閱讀我此前發表的系列專題:1)Golang 內存模型與分配機制;2)Golang 垃圾回收原理分析;3)Golang 垃圾回收源碼分析

(2)併發工具

在 golang 中的併發工具(例如鎖 mutex、通道 channel 等)均契合 gmp 作了適配改造,保證在執行阻塞操作時,會將阻塞粒度限制在 g(goroutine)而非 m(thread)的粒度,使得阻塞與喚醒操作都屬於用戶態行爲,無需內核的介入,同時一個 g 的阻塞也完全不會影響 m 下其他 g 的運行.

有關 mutex 和 channel 底層實現機制,可以閱讀我此前發表的文章:1)Golang 單機鎖實現原理;2)Golang channel 實現原理.

上面這項結論看似理所當然,但實際上是一項非常重要的特性,這一點隨着我近期在學習 c++ 過程中才產生了更深的感悟——我在近期嘗試着使用 c++ 效仿 gmp 實現一套協程調度體系,雖然還原出了其中大部分功能,但在使用上還是存在一個很大的缺陷,就是 c++ 標準庫中的併發工具(如 lock、semaphore 等)對應的阻塞粒度都是 thread 級別的,這就導致一個協程(coroutine)的阻塞會上升到線程(thread)級別,並導致其他 coroutine 也喪失被執行的機會.

這一點如果要解決,就需要針對所有併發工具做一層適配於協程粒度的改造,實現成本無疑是巨大的. 這也從側面印證了 golang 的併發優越性,這種適配性在語言層面就已經天然支持了.

(3)io 多路複用

在設計 io 模型時,golang 採用了 linux 系統提供的 epoll 多路複用技術,然而爲了因爲 epoll_wait 操作而引起 m(thread)粒度的阻塞,golang 專門設計一套 netpoll 機制,使用用戶態的 gopark 指令實現阻塞操作,使用非阻塞 epoll_wait 結合用戶態的 goready 指令實現喚醒操作,從而將 io 行爲也控制在 g 粒度,很好地契合了 gmp 調度體系.

如果對這部分內容感興趣的話,可以閱讀我近期剛發表的文章——萬字解析 golang netpoll 底層原理

類似上述的例子在 golang 世界中是無法窮盡的. gmp 是 golang 知識體系的基石,如果想要深入學習理解 golang,那麼 gmp 無疑是一個絕佳的學習起點.

2 gmp 詳設

文字性的理論描述難免過於空洞,g、m、p 並不是抽象的概念,事實上三者在源碼中都有着具體的實現,定義代碼均位於 runtime/runtime2.go. 下面就從具體的源碼中尋求原理內容的支撐和佐證.

2.1 g 詳設

g (goroutine)的類型聲明如下,其中包含如下核心成員字段:

// 一個 goroutine 的具象類
type g struct{
    // g 的執行棧空間
    stack       stack   
    /*
        棧空間保護區邊界,用於探測是否執行棧擴容
        在 g 超時搶佔過程中,用於傳遞搶佔標識
    */
    stackguard0 uintptr
    // ...

    // 記錄 g 執行過程中遇到的異常    
    _panic    *_panic 
    // g 中掛載的 defer 函數,是一個 LIFO 的鏈表結構
    _defer    *_defer 

    // g 從屬的 m
    m         *m      
    // ...  
    /*
        g 的狀態
        // g 實例剛被分配還未完成初始化
        _Gidle = iota // 0

        // g 處於就緒態.  可以被調度 
        _Grunnable // 1

        // g 正在被調度運行過程中
        _Grunning // 2

        // g 正在執行系統調用
        _Gsyscall // 3

        // g 處於阻塞態,需要等待其他外部條件達成後,才能重新恢復成就緒態
        _Gwaiting // 4

        // 生死本是一個輪迴. 當 g 調度結束生命終結,或者剛被初始化準備迎接新生前,都會處於此狀態
        _Gdead // 6
    */
    atomicstatus uint32
    // ...
    // 進入全局隊列 grq 時指向相鄰 g 的 next 指針
    schedlink    guintptr
    // ...
}

2.2 m 詳設

m(machine)是 go 對 thread 的抽象,其類定義代碼中包含如下核心成員:

type m struct{
    // 用於調度普通 g 的特殊 g,與每個 m 一一對應
    g0      *g     
    // ...
    // m 的唯一 id
    procid        uint64
    // 用於處理信號的特殊 g,與每個 m 一一對應
    gsignal       *g              
    // ...
    // m 上正在運行的 g
    curg          *g       
    // m 關聯的 p
    p             puintptr 
    // ...
    // 進入 schedt midle 鏈表時指向相鄰 m 的 next 指針 
    schedlink     muintptr
    // ...
}

此處暫時將 gsignal 按下不表,我們可以將 m 的運行目標劃分爲 g0 和 g ,兩者是始終交替進行的:g0 就類似於引擎中的調度邏輯,檢索任務列表尋找需要執行的任務;g 就是由 g0 找到並分配給 m 執行的一個具體任務.

2.3 p 詳設

p (processor)是 gmp 中的調度器,其類定義代碼中包含如下核心成員字段:

type p struct{
    id          int32
    /*
        p 的狀態
        // p 因缺少 g 而進入空閒模式,此時會被添加到全局的 idle p 隊列中
        _Pidle = iota // 0

        // p 正在運行中,被 m 所持有,可能在運行普通 g,也可能在運行 g0
        _Prunning // 1

        // p 所關聯的 m 正在執行系統調用. 此時 p 可能被竊取並與其他 m 關聯
        _Psyscall // 2

        // p 已被終止
        _Pdead // 4
    */
    status      uint32// one of pidle/prunning/...
    // 進入 schedt pidle 鏈表時指向相鄰 p 的 next 指針
    link        puintptr        
    // ...
    // p 所關聯的 m. 若 p 爲 idle 狀態,可能爲 nil
    m           muintptr   // back-link to associated m (nil if idle)


    // lrq 的隊首
    runqhead uint32
    // lrq 的隊尾
    runqtail uint32
    // q 的本地 g 隊列——lrq
    runq     [256]guintptr
    // 下一個調度的 g. 可以理解爲 lrq 中的特等席
    runnext guintptr
    // ...
}

2.4 schedt 詳設

schedt 是全局共享的資源模塊,在訪問前需要加全局鎖:

// 全局調度模塊
type schedt struct{
    // ...
    // 互斥鎖
    lock mutex

    // 空閒 m 隊列
    midle        muintptr // idle m's waiting for work
    // ...
    // 空閒 p 隊列
    pidle      puintptr // idle p's
    // ...

    // 全局 g 隊列——grq
    runq     gQueue
    // grq 中存量 g 的個數
    runqsize int32
    // ...
}

之所以存在 midle 和 pidle 的設計,就是爲了避免 p 和 m 因缺少 g 而導致 cpu 空轉. 對於空閒的 p 和 m,會被集成到空閒隊列中,並且會暫停 m 的運行

3 調度原理

本章要和大家聊的流程是 “調度”. 所謂調度,指的是一個由用戶通過 go func(){...} 操作創建的 g,是如何被 m 上的 g0 獲取並執行的,所以簡單來說,就是由 g0 -> g 的流轉過程.

我習慣於將 “調度” 稱爲第一視角下的轉換,因爲該流轉過程是由 m 上運行的 g0 主動發起的,而無需第三方角色的干預.

3.1 main 函數與 g

1)main 函數

main 函數作爲整個 go 程序的入口是比較特殊的存在,它是由 go 程序全局唯一的 m0(main thread)執行的,對應源碼位於 runtime.proc.go:

//go:linkname main_main main.main
func main_main()

// The main goroutine.
func main(){
    // ...
    // 獲取用戶聲明的 main 函數
    fn := main_main 
    // 執行用戶聲明的 main 函數
    fn()
    // ...
}

2)g

除了 main 函數這個特例之外,所有用戶通過 go func(){...} 操作啓動的 goroutine,都會以 g 的形式進入到 gmp 架構當中.

func handle() {
    // 異步啓動 goroutine
    go func(){
        // do something ...
    }()
}

在上述代碼中,我們會創建出一個 g 實例的創建,將其置爲就緒狀態,並添加到就緒隊列中:

上述流程對應代碼爲 runtime/proc.go 的 newproc 方法中:

// 創建一個新的 g,本將其投遞入隊列. 入參 fn 爲用戶指定的函數.
// 當前執行方還是某個普通 g
func newproc(fn *funcval){
    // 獲取當前正在執行的普通 g 及其程序計數器(program counter)
    gp := getg()
    pc := getcallerpc()
    // 執行 systemstack 時,會臨時切換至 g0,並在完成其中閉包函數調用後,切換回到原本的普通 g 
    systemstack(func(){
        // 此時執行方爲 g0
        // 構造一個新的 g 實例
        newg := newproc1(fn, gp, pc)
        // 獲取當前 p 
        _p_ := getg().m.p.ptr()
        /*
            將 newg 添加到隊列中:
            1)優先添加到 p 的本地隊列 lrq 
            2)若 lrq 滿了,則添加到全局隊列 grq
        */
        runqput(_p_, newg,true)
        // 如果存在因過度空閒而被 block 的 p 和 m,則需要對其進行喚醒
        if mainStarted {
            wakep()
        }
    })
    // 切換回到原本的普通 g 繼續執行
    // ...
}

其中,將 g 添加到就緒隊列的方法爲 runqput,展示如下:

// 嘗試將 g 添加到指定 p 的 lrq 中. 若 lrq 滿了,則將 g 添加到 grqrq 中
func runqput(_p_ *p, gp *g, next bool){
    // ...
    // 當 next 爲 true 時,會優先將 gp 以 cas 操作放置到 p 的 runnext 位置
    // 如果原因 runnext 位置還有 g,則再嘗試將它追加到 lrq 的尾部
    if next{
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))){
            goto retryNext
        }
        // 如果 runnext 位置原本不存在 g 直接返回
        if oldnext ==0{
            return
        }
        // gp 指向 runnext 中被置換出來的 g 
        gp = oldnext.ptr()
    }

retry:
    // 獲取 lrq 頭節點的索引
    h := atomic.LoadAcq(&_p_.runqhead)// load-acquire, synchronize with consumers
    // 獲取 lrq 尾節點的索引
    t := _p_.runqtail
    // 如果 lrq 沒有滿,則將 g 追加到尾節點的位置,並且遞增尾節點的索引
    if t-h <uint32(len(_p_.runq)){
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1)// store-release, makes the item available for consumption
        return
    }
    // runqputslow 方法中會將 g 以及 lrq 中半數的 g 放置到全局隊列 grq 中
    if runqputslow(_p_, gp, h, t){
        return
    }
    // ...
}

3.2 g0 與 g

在每個 m 中會有一個與之伴生的 g0,其任務就是不斷尋找可執行的 g. 所以對一個 m 來說,其運行週期就是處在 g0 與 g 之間輪換交替的過程中.

type m struct {
    // 用於尋找並調度普通 g 的特殊 g,與每個 m 一一對應
    g0      *g     
    // ...
    // m 上正在運行的普通 g
    curg          *g       
    // ...
}

在 m 運行中,能夠通過幾個樁方法實現 g0 與 g 之間執行權的切換:

對應方法聲明於 runtime/stubs.go 文件中:

// 從 g 切換至 g0 執行. 只允許在 g 中調用
func mcall(fn func(*g))

// 在普通 g 中調用時,會切換至 g0 壓棧執行 fn,執行完成後切回到 g
func systemstack(fn func())

// 從 g0 切換至 g 執行. gobuf 包含 g 運行上下文信息
func gogo(buf *gobuf)

而從 g0 視角出發來看,其在先後經歷了兩個核心方法後,完成了 g0 -> g 的切換:

上述方法均實現於 runtime/proc.go 文件中:

// 執行方爲 g0
func schedule(){
    // 獲取當前 g0 
    _g_ := getg()
    // ...

top:
    // 獲取當前 p
    pp := _g_.m.p.ptr()
    // ...
    /*
         核心方法:獲取需要調度的 g
              - 按照優先級,依次取本地隊列 lrq、取全局隊列 grq、執行 netpoll、竊取其他 p lrq
              - 若沒有合適 g,則將 p 和 m block 住並添加到空閒隊列中
    */
    gp, inheritTime, tryWakeP := findRunnable()// blocks until work is available

    // ...
    // 執行 g,該方法中會將執行權由 g0 -> g
    execute(gp, inheritTime)
}

// 執行給定的 g. 當前執行方還是 g0,但會通過 gogo 方法切換至 gp
func execute(gp *g, inheritTime bool){
    // 獲取 g0
    _g_ := getg()
    // ...
        /*
            建立 m 和 gp 的關係
            1)將 m 中的 curg 字段指向 gp
            2)將 gp 的 m 字段指向當前 m
        */
    _g_.m.curg = gp
    gp.m = _g_.m

    // 更新 gp 狀態 runnable -> running
    casgstatus(gp,_Grunnable,_Grunning)
    // ...
    // 設置 gp 的棧空間保護區邊界
    gp.stackguard0 = gp.stack.lo +_StackGuard
    // ...
    // 執行 gogo 方法,m 執行權會切換至 gp
    gogo(&gp.sched)
}

3.3 find g

在調度流程中,最核心的步驟就在於,findRunnable 方法中如何按照指定的策略獲取到可執行的 g.

1)主流程

findRunnable 方法聲明於 runtime/proc.go 中,其核心步驟包括::

// 獲取可用於執行的 g. 如果該方法返回了,則一定已經找到了目標 g. 
func findRunnable()(gp *g, inheritTime, tryWakeP bool){
    // 獲取當前執行 p 下的 g0
    _g_ := getg()
    // ...

top:
    // 獲取 p
    _p_ := _g_.m.p.ptr()
    // ...
    // 每 61 次調度,需要嘗試處理一次全局隊列 (防止飢餓)
    if _p_.schedtick%61==0&& sched.runqsize >0{
        lock(&sched.lock)
        gp = globrunqget(_p_,1)
        unlock(&sched.lock)
        if gp !=nil{
            return gp,false,false
        }
    }

    // ...
    // 嘗試從本地隊列 lrq 中獲取 g
    if gp, inheritTime := runqget(_p_); gp !=nil{
        return gp, inheritTime,false
    }

    // 嘗試從全局隊列 grq 中獲取 g
    if sched.runqsize !=0{
        lock(&sched.lock)
        gp := globrunqget(_p_,0)
        unlock(&sched.lock)
        if gp !=nil{
            return gp,false,false
        }
    }

    // 執行 netpoll 流程,嘗試批量喚醒 io 就緒的 g 並獲取首個用以調度
    if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{
        if list := netpoll(0);!list.empty(){// non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp,_Gwaiting,_Grunnable)
            // ...
            return gp,false,false
        }
    }

    // ...
    // 從其他 p 的 lrq 中竊取 g
    gp, inheritTime, tnow, w, newWork := stealWork(now)
    if gp !=nil{
        return gp, inheritTime,false
    }

    // 若存在 gc 併發標記任務,則以 idle 模式參與協作,好過直接回收 p
    // ...

    // 加全局鎖,並 double check 全局隊列是否有 g
    lock(&sched.lock)
    // ...
    if sched.runqsize !=0{
        gp := globrunqget(_p_,0)
        unlock(&sched.lock)
        return gp,false,false
    }
    // ... 
    // 確認當前 p 無事可做,則將 p 和 m 解綁,並將其添加到全局調度模塊 schedt 中的空閒 p 隊列 pidle 中 
    // 解除 m 和 p 的關係
    releasep()
    // 將 p 添加到 schedt.pidle 中
    now = pidleput(_p_, now)
    unlock(&sched.lock)

    // ...
    // 在 block 當前 m 之前,保證全局存在一個 m 留守下來,以阻塞模式執行 netpoll,保證有 io 就緒事件發生時,能被第一時間處理
    if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{
        atomic.Store64(&sched.pollUntil,uint64(pollUntil))
        // ...
        // 以阻塞模式執行 netpoll 流程
        delay :=int64(-1)
        // ...
        list := netpoll(delay)// block until new work is available

        // 恢復 lastpoll 標識
        atomic.Store64(&sched.lastpoll,uint64(now))
        // ...
        lock(&sched.lock)

        // 從 schedt 的空閒 p 隊列 pidle 中獲取一個空閒 p
        _p_, _ = pidleget(now)
        unlock(&sched.lock)
        // 若沒有獲取到 p,則將就緒的 g 都添加到全局隊列 grq 中
        if _p_ ==nil{
            injectglist(&list)
        }else{
            // m 與 p 結合
            acquirep(_p_)
            // 將首個 g 直接用於調度,其餘的添加到全局隊列 grq
            if!list.empty(){
                gp := list.pop()
                injectglist(&list)
                casgstatus(gp,_Gwaiting,_Grunnable)
                // ...
                return gp,false,false
            }
            // ...
            goto top
        }
    }
    // ...
    // 走到此處仍然未找到合適的 g 用於調度,則需要將 m block 住,添加到 schedt 的 midle 中
    stopm()
    goto top
}

2)從 lrq 獲取 g

runqget 方法用於從某個 p 的 lrq 中獲取 g:

// [無鎖化]從某個 p 的本地隊列 lrq 中獲取 g
func runqget(_p_ *p)(gp *g, inheritTime bool){
    // 首先嚐試獲取特定席位 runnext 中的 g,使用 cas 操作
    next:= _p_.runnext
    if next!=0&& _p_.runnext.cas(next,0){
        return next.ptr(),true
    }

    // 嘗試基於 cas 操作,獲取本地隊列頭節點中的 g
    for{
        // 獲取頭節點索引
        h := atomic.LoadAcq(&_p_.runqhead)// load-acquire, synchronize with other consumers
        // 獲取尾節點索引
        t := _p_.runqtail
        // 頭尾節點重合,說明 lrq 爲空
        if t == h {
            return nil,false
        }
        // 根據索引從 lrq 中取出頭節點對應的 g
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        // 通過 cas 操作更新頭節點索引
        if atomic.CasRel(&_p_.runqhead, h, h+1){// cas-release, commits consume
            return gp,false
        }
    }
}

3)從全局隊列獲取 g

globrunqget 方法用於從全局的 grq 中獲取 g. 調用時需要確保持有 schedt 的全局鎖:

// 從全局隊列 grq 中獲取 g. 調用此方法前必須持有 schedt 中的互斥鎖 lock
func globrunqget(_p_ *p, max int32)*g {
    // 斷言確保持有鎖
    assertLockHeld(&sched.lock)
    // 隊列爲空,直接返回
    if sched.runqsize ==0{
        return nil
    }

    // ...
    // 此外還有一些邏輯是根據傳入的 max 值嘗試獲取 grq 中的半數 g 填充到 p 的 lrq 中. 此處不展開
    // ...

    // 從全局隊列的隊首彈出一個 g
    gp := sched.runq.pop()
    // ...
    return gp
}

4)獲取 io 就緒的 g

在 gmp 調度流程中,如果 lrq 和 grq 都爲空,則會執行 netpoll 流程,嘗試以非阻塞模式下的 epoll_wait 操作獲取 io 就緒的 g. 該方法位於 runtime/netpoll_epoll.go:

func netpoll(delay int64) gList {
    // ...
    // 調用 epoll_wait 獲取就緒的 io event
    var events [128]epollevent
    n := epollwait(epfd,&events[0],int32(len(events)), waitms)
    // ...
    var toRun gList
    for i :=int32(0); i < n; i++{
        ev :=&events[i]
        // 將就緒 event 對應 g 追加到的 glist 中
        netpollready(...)
    }
    return toRun
}

5)從其他 p 竊取 g

如果執行完 netpoll 流程後仍未獲得 g,則會嘗試從其他 p 的 lrq 中竊取半數 g 補充到當前 p 的 lrq 中:

func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool){
    // 獲取當前 p
    pp := getg().m.p.ptr()
    // ...
    // 外層循環 4 次
    const stealTries =4
    for i :=0; i < stealTries; i++{
        // ...
        // 通過隨機數以隨機起點隨機步長選取目標 p 進行竊取
        for enum:= stealOrder.start(fastrand());!enum.done();enum.next(){
            // ...
            // 獲取擬竊取的目標 p
            p2 := allp[enum.position()]
            // 如果目標 p 是當前 p,則跳過
            if pp == p2 {
                continue
            }

            // ...
            // 只要目標 p 不爲 idle 狀態,則進行竊取
            if!idlepMask.read(enum.position()){
                // 竊取目標 p,其中會嘗試將目標 p lrq 中半數 g 竊取到當前 p 的 lrq 中
                if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp !=nil{
                    return gp,false, now, pollUntil, ranTimer
                }
            }
        }
    }

    // 竊取失敗 未找到合適的目標
    return nil,false, now, pollUntil, ranTimer
}

6)回收空閒的 p 和 m

如果直到最後都沒有找到合適的 g 用於執行,則需要將 p 和 m 添加到 schedt 的 pidle 和 midle 隊列中並停止 m 的運行,避免產生資源浪費:

// 將 p 追加到 schedt pidle 隊列中
func pidleput(_p_ *p, now int64)int64{
    assertLockHeld(&sched.lock)
    // ...
    // p 指針指向原本 pidle 隊首
    _p_.link = sched.pidle
    // 將 p 設置爲 pidle 隊首
    sched.pidle.set(_p_)
    atomic.Xadd(&sched.npidle,1)
    // ...
}

// 將當前 m 添加到 schedt midle 隊列並停止 m
func stopm(){
    _g_ := getg()

    // ...
    lock(&sched.lock)
    // 將 m 添加到 schedt.mdile 
    mput(_g_.m)
    unlock(&sched.lock)
    // 停止 m
    mPark()
    // ...
}

4 讓渡設計

所謂 “讓渡”,指的是當 g 在 m 上運行時,主動讓出執行權,使得 m 的運行對象重新回到 g0,即由 g -> g0 的流轉過程.

“讓渡” 和” 調度 “一樣,也屬於第一視角下的轉換,該流轉過程是由 m 上運行的 g 主動發起的,而無需第三方角色的干預.

4.1 結束讓渡

當 g 執行結束時,會正常退出,並將執行權切換回到 g0.

首先,g 在運行結束時會調用 goexit1 方法中,並通過 mcall 指令切換至 g0,由 g0 調用 goexit0 方法,並由 g0 執行下述步驟:

// goroutine 運行結束. 此時執行方是普通 g
func goexit1(){
    // 通過 mcall,將執行方轉爲 g0,調用 goexit0 方法
    mcall(goexit0)
}

// 此時執行方爲 g0,入參 gp 爲已經運行結束的 g
func goexit0(gp *g){
    // 獲取 g0
    _g_ := getg()
    // 獲取對應的 p
    _p_ := _g_.m.p.ptr()

    // 將 gp 的狀態由 running 更新爲 dead
    casgstatus(gp,_Grunning,_Gdead)
    // ...

    // 將 gp 中的內容清空
    gp.m =nil
    // ...
    gp._defer =nil// should be true already but just in case.
    gp._panic =nil// non-nil for Goexit during panic. points at stack-allocated data.
    // ...
    // 將 g 和 p 解除關係
    dropg()
    // ...
    // 將 g 添加到 p 的 gfree 隊列中
    gfput(_p_, gp)
    // ...
    // 發起新一輪調度流程
    schedule()
}

4.2 主動讓渡

主動讓渡指的是由用戶手動調用 runtime.Gosched 方法讓出 g 所持有的執行權. 在 Gosched 方法中,會通過 mcall 指令切換至 g0,並由 g0 執行 gosched_m 方法,其中包含如下步驟:

// 主動讓渡出執行權,此時執行方還是普通 g
func Gosched() {
    // ...
    // 通過 mcall,將執行方轉爲 g0,調用 gosched_m 方法
    mcall(gosched_m)
}
// 將 gp 切換回就緒態後添加到全局隊列 grq,併發起新一輪調度
// 此時執行方爲 g0
func gosched_m(gp *g){
    // ...
    goschedImpl(gp)
}

func goschedImpl(gp *g){
    // ...
    // 將 g 狀態由 running 改爲 runnable 就緒態
    casgstatus(gp,_Grunning,_Grunnable)
    // 解除 g 和 m 的關係
    dropg()
    // 將 g 添加到全局隊列 grq
    lock(&sched.lock)
    globrunqput(gp)
    unlock(&sched.lock)
    // 發起新一輪調度
    schedule()
}

4.3 阻塞讓渡

阻塞讓渡指的是 g 在執行過程中所依賴的外部條件沒有達成,需要進入阻塞等待的狀態(waiting),直到條件達成後才能完成將狀態重新更新爲就緒態(runnable).

Golang 針對 mutex、channel 等併發工具的設計,在底層都是採用了阻塞讓渡的設計模式,具體執行的方法是位於 runtime/proc.go 的 gopark 方法:

此處需要注意,在阻塞讓渡後,g 不會進入到 lrq 或 grq 中,因爲 lrq/grq 屬於就緒隊列. 在執行 gopark 時,使用方有義務自行維護 g 的引用,並在外部條件就緒時,通過 goready 操作將其更新爲 runnable 狀態並重新添加到就緒隊列中.

// 此時執行方爲普通 g
func gopark(unlockf func(*g, unsafe.Pointer)bool,lockunsafe.Pointer, reason waitReason, traceEv byte, traceskip int){
    // 獲取 m 正在執行的 g,也就是要阻塞讓渡的 g
    gp := mp.curg
    // ...
    // 通過 mcall,將執行方由普通 g -> g0
    mcall(park_m)
}

// 此時執行方爲 g0. 入參 gp 爲需要執行 park 的普通 g
func park_m(gp *g){
    // 獲取 g0 
    _g_ := getg()

    // 將 gp 狀態由 running 變更爲 waiting
    casgstatus(gp,_Grunning,_Gwaiting)
    // 解綁 g 與 m 的關係
    dropg()

    // g0 發起新一輪調度流程
    schedule()
}

與 gopark 相對的,是用於喚醒 g 的 goready 方法,其中會通過 systemstack 壓棧切換至 g0 執行 ready 方法——將目標 g 狀態由 waiting 改爲 runnable,然後添加到就緒隊列中.

// 此時執行方爲普通 g. 入參 gp 爲需要喚醒的另一個普通 g
func goready(gp *g, traceskip int) {
    // 調用 systemstack 後,會切換至 g0 亞展調用傳入的 ready 方法. 調用結束後則會直接切換回到當前普通 g 繼續執行. 
    systemstack(func() {
        ready(gp, traceskip, true)
    })

    // 恢復成普通 g 繼續執行 ...
}
// 此時執行方爲 g0. 入參 gp 爲擬喚醒的普通 g
func ready(gp *g, traceskip int, next bool){
    // ...

    // 獲取當前 g0
    _g_ := getg()
    // ...
    // 將目標 g 狀態由 waiting 更新爲 runnable
    casgstatus(gp,_Gwaiting,_Grunnable)
    /*
        1) 優先將目標 g 添加到當前 p 的本地隊列 lrq
        2)若 lrq 滿了,則將 g 追加到全局隊列 grq
    */
    runqput(_g_.m.p.ptr(), gp,next)
    // 如果有 m 或 p 處於 idle 狀態,將其喚醒
    wakep()
    // ...
}

5 搶佔設計

最後是關於 “搶佔” 的流程介紹,搶佔和讓渡有相同之處,都表示由 g->g0 的流轉過程,但區別在於,讓渡是由 g 主動發起的(第一人稱),而搶佔則是由外力干預(sysmon thread)發起的(第三人稱).

5.1 監控線程

在 go 程序運行時,會啓動一個全局唯一的監控線程——sysmon thread,其負責定時執行監控工作,主要包括:

// The main goroutine.
func main(){
    systemstack(func(){
        newm(sysmon,nil,-1)
    })
    // ...
}

func sysmon(){
    // ..

    for{
        // 根據閒忙情況調整輪詢間隔,在空閒情況下 10 ms 輪詢一次
        usleep(delay)

        // ...
        // 執行 netpoll 
        lastpoll :=int64(atomic.Load64(&sched.lastpoll))
        if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now {
            // ...
            list := netpoll(0)// non-blocking - returns list of goroutines
            // ...
        }

        // 執行搶佔工作
        retake(now)

        // ...

        // 定時檢查是否需要發起 gc
        if t :=(gcTrigger{kind: gcTriggerTime, now: now}); t.test()&& atomic.Load(&forcegc.idle)!=0{
            // ...
        }
        // ...
    }
}

執行搶佔邏輯的 retake 方法本章研究的重點,其中根據搶佔目標和狀態的不同,又可以分爲系統調用搶佔和運行超時搶佔.

5.2 系統調用

系統調用是 m(thread)粒度的,在執行期間會導致整個 m 暫時不可用,所以此時的搶佔處理思路是,將發起 syscall 的 g 和 m 綁定,但是解除 p 與 m 的綁定關係,使得此期間 p 存在和其他 m 結合的機會.

在發起系統調用時,會執行位於 runtime/proc.go 的 reentersyscall 方法,此方法核心步驟包括:

func reentersyscall(pc, sp uintptr) {
    // 獲取 g
    _g_ := getg()

    // ...
    // 保存寄存器信息
    save(pc, sp)
    // ...
    // 將 g 狀態更新爲 syscall
    casgstatus(_g_,_Grunning,_Gsyscall)
    // ...
    // 解除 p 與 m 綁定關係
    pp := _g_.m.p.ptr()
    pp.m =0
    // 將 p 設置爲 m 的 oldp
    _g_.m.oldp.set(pp)
    _g_.m.p =0
    // 將 p 狀態更新爲 syscall
    atomic.Store(&pp.status,_Psyscall)
    // ...
}

當系統系統調用完成時,會執行位於 runtime/proc.go 的 exitsyscall 方法(此時執行方還是 m 上的 g),包含如下步驟:

func exitsyscall() {
    // 獲取 g
    _g_ := getg()

    // ...

    // 如果 oldp 沒有和其他 m 結合,則直接複用 oldp
    oldp := _g_.m.oldp.ptr()
    _g_.m.oldp =0
    if exitsyscallfast(oldp){
        // ...
        // 將 g 狀態由 syscall 更新回 running
        casgstatus(_g_,_Gsyscall,_Grunning)
        // ...

        return
    }

    // 切換至 g0 調用 exitsyscall0 方法
    mcall(exitsyscall0)
    // ...
}
// 此時執行方爲 m 下的 g0
func exitsyscall0(gp *g){
    // 將 g 的狀態修改爲 runnable 就緒態
    casgstatus(gp,_Gsyscall,_Grunnable)
    // 解除 g 和 m 的綁定關係
    dropg()
    lock(&sched.lock)
    // 嘗試尋找一個空閒的 p 與當前 m 結合
    var _p_ *p
    _p_, _ = pidleget(0)
    var locked bool
    // 如果與 p 結合失敗,則將 g 添加到全局隊列中
    if _p_ ==nil{
        globrunqput(gp)
        // ...
    }
    // ...
    unlock(&sched.lock)
    // 如果與 p 結合成功,則繼續調度 g 
    if _p_ !=nil{
        acquirep(_p_)
        execute(gp,false)// Never returns.
    }
    // ...
    // 與 p 結合失敗的話,需要將當前 m 添加到 schedt 的 midle 隊列並停止 m
    stopm()
    // 如果 m 被重新啓用,則發起新一輪調度
    schedule()// Never returns.
}

我們將視角切回到 sysmon thread 中的 retake 方法,此處會遍歷每個 p,並針對正在發起系統調用的 p 執行如下檢查邏輯:

但凡上述條件滿足其一,就會執行對 p 執行搶佔操作(handoffp)——分配一個新的 m 與 p 結合,完成後續任務的調度處理.

func retake(now int64) uint32{
    n :=0
    // 加鎖
    lock(&allpLock)
    // 遍歷所有 p
    for i :=0; i <len(allp); i++{
        _p_ := allp[i]
        // ...
        s := _p_.status
        // ...
        // 對於正在執行 syscall 的 p
        if s ==_Psyscall{
            // 如果 p 本地隊列爲空且發起系統調用時間 < 10ms,則不進行搶佔
            if runqempty(_p_)&& atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle)>0&& pd.syscallwhen+10*1000*1000> now {
                continue
            }
            unlock(&allpLock)
            // 將 p 的狀態由 syscall 更新爲 idle
            if atomic.Cas(&_p_.status, s,_Pidle){
                // ...
                // 讓 p 擁有和其他 m 結合的機會
                handoffp(_p_)
            }
            // ...
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}
func handoffp(_p_ *p) {
    // 如果 p lrq 中還有 g 或者全局隊列 grq 中還有 g,則立即分配一個新 m 與該 p 結合
    if!runqempty(_p_)|| sched.runqsize !=0{
        // 分配一個 m 與 p 結合
        startm(_p_,false)
        return
    }
    // ...
    // 若系統空閒沒有 g 需要調度,則將 p 添加到 schedt 中的空閒 p 隊列 pidle 中
    pidleput(_p_,0)
    // ...
}

5.3 運行超時

除了系統調用搶佔之外,當 sysmon thread 發現某個 g 執行時間過長時,也會對其發起搶佔操作.

1)發起搶佔

在 retake 方法中,會檢測到哪些 p 中運行一個 g 的時長超過了 10 ms,然後對其發起搶佔操作(preemtone):

func retake(now int64) uint32{
    // ...
    for i :=0; i <len(allp); i++{
        _p_ := allp[i]
        // ...
        if s ==_Prunning{
            // ... 
            // 如果某個 p 下存在運行超過 10 ms 的 g ,需要對 g 進行搶佔
            if pd.schedwhen+forcePreemptNS <= now{
                   preemptone(_p_)
            }
        }
    // ...
    }
    // ...
}

在 preemtone 方法中:

// 搶佔指定 p 上正在執行的 g
func preemptone(_p_ *p)bool{
    // 獲取 p 對應 m
    mp := _p_.m.ptr()
    // 獲取 p 上正在執行的 g(搶佔目標)
    gp := mp.curg
    // ...
        /*
            啓動協作式搶佔標識
            1) 將搶佔標識 preempt 置爲 true
            2)將 g 中的 stackguard0 標識置爲 stackPreempt
            3)g 查看到搶佔標識後,會配合主動讓渡 p 調度權
        */
    gp.preempt =true
    gp.stackguard0 = stackPreempt

    // 基於信號機制實現非協作式搶佔
    if preemptMSupported && debug.asyncpreemptoff ==0{
        _p_.preempt =true
        preemptM(mp)
    }
    // ...
}

在 preemptM 方法中,會通過 tkill 指令向進程中的指定 thread 發送搶佔信號 sigPreempt,對應代碼位於 runtime/signal_unix.go:

func preemptM(mp *m) {
    // ...
    if atomic.Cas(&mp.signalPending,0,1){
        if GOOS =="darwin"|| GOOS =="ios"{
            atomic.Xadd(&pendingPreemptSignals,1)
        }

        // 向指定的 m 發送搶佔信號
        // const sigPreempt untyped int = 16
        signalM(mp, sigPreempt)
    }
    // ...
}

func signalM(mp *m, sig int){
    pthread_kill(pthread(mp.procid),uint32(sig))
}

2)協作式搶佔

對於運行中的 g,在棧空間不足時,會切換至 g0 調用 newstack 方法執行棧空間擴張操作,在該流程中預留了一個檢查樁點,當其中發現 g 已經被打上搶佔標記時,就會主動配合執行讓渡操作:

// 棧擴張. 執行方爲 g0
func newstack(){
    // ...
    // 獲取當前 m 正在執行的 g
    gp := thisg.m.curg
    // ...
    // 讀取 g 中的 stackguard0 標識位
    stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
    // 若 stackguard0 標識位被置爲 stackPreempt,則代表需要對 g 進行搶佔
    preempt := stackguard0 == stackPreempt
    // ...
    if preempt{
        // 若當前 g 不具備搶佔條件,則繼續調度,不進行搶佔
        // 當持有鎖、在進行內存分配或者顯式禁用搶佔模式時,則不允許對 g 執行搶佔操作
        if!canPreemptM(thisg.m){
            // ...
            gogo(&gp.sched)// never return
        }
    }


    if preempt {
        // ...

        // 響應搶佔意圖,完成讓渡操作
        gopreempt_m(gp)// never return
    }

    // ...
}

// gopreempt_m 方法會走到 goschedImpl 方法中,後續流程與 4.2 小節中介紹的主動讓渡類似
func gopreempt_m(gp *g){
    // ...
    goschedImpl(gp)
}

func goschedImpl(gp *g){
    // ...
    casgstatus(gp,_Grunning,_Grunnable)
    dropg()
    lock(&sched.lock)
    globrunqput(gp)
    unlock(&sched.lock)

    schedule()
}

這種通過預留檢查點,由 g 主動配合搶佔意圖完成讓渡操作的流程被稱作協作式搶佔,其存在的侷限就在於,當 g 未發生棧擴張行爲時,則沒有觸碰到檢查點的機會,也就無法響應搶佔意圖.

3)非協作式搶佔

爲了彌補協作式搶佔的不足,go 1.14 中引入了基於信號量實現的非協作式搶佔機制.

在 go 程序啓動時,main thread 會完成對各類信號量的監聽註冊,其中也包含了搶佔信號 sigPreempt(index = 16). 對應代碼位於 runtime/signal_unix.go:

func initsig(preinit bool) {
    // ...

    for i :=uint32(0); i < _NSIG; i++{
                /*
                    var sigtable = [...]sigTabT{
                    // ...
                    // 16 {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"},
                    // ...
                    }
                */
        t :=&sigtable[i]
        // ...
        // const _NSIG untyped int = 32
        handlingSig[i]=1
        setsig(i, abi.FuncPCABIInternal(sighandler))
    }
}

當某個 m 接收到搶佔信號後,會由 gsignal 通過 sighandler 方法完成信號處理工作,此時針對搶佔信號會進一步調用 doSigPreempt 方法:在判斷 g 具備可搶佔條件後,則會保存 g 的寄存器信息,然後修改 g 的棧程序計數器 pc 和棧頂指針 sp,往其中插入一段函數 asyncPreempt:

// 此時執行該方法的是搶佔目標 g 所在 m 下的 gsignal
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g){
    // 獲取擬搶佔 g 從屬 p 對應的 g0
    _g_ := getg()
    c :=&sigctxt{info, ctxt}
    // 獲取擬搶佔 g 從屬的 m
    mp := _g_.m

    // ...
    // 倘若接收到搶佔信號 
    if sig == sigPreempt && debug.asyncpreemptoff ==0&&!delayedSignal {
    // 對目標 g 進行搶佔
        doSigPreempt(gp, c)
    }
    // ...
}

// 此時執行該方法的是搶佔目標 g 所在 m 下的 gsignal
func doSigPreempt(gp *g, ctxt *sigctxt){
    // 判斷 g 是否需要被搶佔
    if wantAsyncPreempt(gp){
        // 判斷 g 是否滿足搶佔條件
        if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
            // 通過修改 g 寄存器的方式,往 g 的執行指令中插入 asyncPreempt 函數
            ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
        }
    }

    // ...
}

sigctxt.pushCall 方法中,通過移動棧頂指針(sp,stack pointer)、修改程序計數器(pc,program counter)的方式,強行在 g 的執行指令中插入了一段新的指令——asyncPreempt 函數.

// 此時執行該方法的是搶佔目標 g 所在 m 下的 gsignal
func (c *sigctxt) pushCall(targetPC, resumePC uintptr){
    // 獲取棧頂指針 sp
    sp :=uintptr(c.rsp())
    // sp 偏移一個指針地址
    sp -= goarch.PtrSize
    // 將原本下一條執行指令 pc 存放在棧頂位置
    *(*uintptr)(unsafe.Pointer(sp))= resumePC
    // 更新棧頂指針 sp
    c.set_rsp(uint64(sp))
    // 將傳入的指令(asyncPreemt)作爲下一跳執行執行 pc
    c.set_rip(uint64(targetPC))
}

由於 pc 被修改了,所以搶佔的目標 g 隨後會執行到 asyncPreemt2 方法,其中會通過 mcall 指令切換至 g0,並由 g0 執行 gopreempt_m 玩法,完成 g 的讓渡操作:

// 此時執行方是即將要被搶佔的 g,這段代碼是被臨時插入的邏輯
func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    mcall(gopreempt_m)
    gp.asyncSafePoint = false
}

6 總結

祝賀,至此全文結束.

本篇我們一起了解了 golang 中的 gmp 整體架構與應用生態,並深入到源碼中逐幀解析了 gmp 中的核心結構與執行流程設計. 希望上述內容能對各位 go 友們有所幫助~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/BR6SO7bQF4UXQoRdEjorAg