溫故知新——Golang GMP 萬字洗髓經
0 前言
我在 23 年初曾發佈過一篇——golang gmp 原理解析,當時剛開始接觸 go 底層源碼,視野廣度和理解深度都有所不足,對一些核心環節的思考和挖掘有所欠缺,對其中某些局部細節又過分拘泥,整體內容質量上還是有所不足.
隨着近期嘗試接觸了 golang 以外的語言,通過橫向對比後,對於 golang 中 gmp 的精妙設計也產生了一些新的感悟. 於是就藉着這個契機開啓一個重置篇,對老版內容進行查缺補漏,力求能夠溫故而知新.
本文會分爲兩大部分,共 5 章內容:
第一部分偏於宏觀,對基礎知識和整體架構加以介紹:
-
• 第一章——基礎概念:簡述線程、協程及 goroutine 的基礎概念,並進一步引出 gmp 架構
-
• 第二章——gmp 詳設:步入到源碼中,對 gmp 底層數據結構設計進行一探究竟
第二部分則着眼於 goroutine 的生命週期變化過程:
-
• 第三章——調度原理:以第一人稱的正向視角,觀察一個 g 是如何誕生以及被調度執行的
-
• 第四章——讓渡設計:以第一人稱的逆向視角,觀察一個 g 如何從運行狀態讓渡出執行權
-
• 第五章——搶佔設計:以第三人稱視角,觀察監控線程如何通過外力干預對 g 實施搶佔處理
提前做個聲明,本文涉及大量對 golang runtime 標準庫源碼的閱讀環節,其中展示的源碼版本統一爲 v1.19 版本.
另外,在學習過程中也和作爲同事兼戰友的龍俊一起針對 gmp 技術話題有過很多次交流探討,這裏要特別緻敬一下龍哥.
1 基礎概念
1.1 從線程到協程
線程(Thread)與協程(Coroutine)是併發編程中的經典概念:
-
• 線程是操作系統內核視角下的最小調度單元,其創建、銷燬、切換、調度都需要由內核參與;
-
• 協程又稱爲用戶態線程,是用戶程序對對線程概念的二次封裝,和線程爲多對一關係,在邏輯意義上屬於更細粒度的調度單元,其調度過程由用戶態閉環完成,無需內核介入
總結來說,線程更加簡單直觀,天然契合操作系統調度模型;協程是用戶態下二次加工的產物,需要引入額外的複雜度,但是相對於線程而言有着更輕的粒度和更小的開銷.
1.2 從協程到 goroutine
golang 是一門天然支持協程的語言,goroutine 是其對協程的本土化實現,並且在原生協程的基礎上做了很大的優化改進.
當我們聊到 goroutine ,需要明白這不是一個能被單獨拆解的概念,其本身是強依附於 gmp(goroutine-machine-processor)體系而生的,通過 gmp 架構的建設,使得 goroutine 相比於原生協程具備着如下核心優勢:
-
• g 與 p、m 之間可以動態結合,整個調度過程有着很高的靈活性
-
• g 棧空間大小可以動態擴縮,既能做到使用方便,也儘可能地節約了資源
此外,golang 中完全屏蔽了線程的概念,圍繞着 gmp 打造的一系列併發工具都以 g 爲併發粒度,可以說是完全統一了 golang 併發世界的秩序,做到了類似 “書同文、車同軌” 的效果
1.3 gmp 架構
gmp = goroutine + machine + processor. 下面我們對這三個核心組件展開介紹:
1)g
-
• g,即 goroutine,是 golang 中對協程的抽象;
-
• g 有自己的運行棧、生命週期狀態、以及執行的任務函數(用戶通過 go func 指定);
-
• g 需要綁定在 m 上執行,在 g 視角中,可以將 m 理解爲它的 cpu
我們可以把 gmp 理解爲一個任務調度系統,那麼 g 就是這個系統中所謂的 “任務”,是一種需要被分配和執行的 “資源”.
2)m
-
• m 即 machine,是 golang 中對線程的抽象;
-
• m 需要和 p 進行結合,從而進入到 gmp 調度體系之中
-
• m 的運行目標始終在 g0 和 g 之間進行切換——當運行 g0 時執行的是 m 的調度流程,負責尋找合適的 “任務”,也就是 g;當運行 g 時,執行的是 m 獲取到的” 任務“,也就是用戶通過 go func 啓動的 goroutine
當我們把 gmp 理解爲一個任務調度系統,那麼 m 就是這個系統中的”引擎 “. 當 m 和 p 結合後,就限定了” 引擎 “的運行是圍繞着 gmp 這條軌道進行的,使得” 引擎“運行着兩個週而復始、不斷交替的步驟——尋找任務(執行 g0);執行任務(執行 g)
3) p
-
• p 即 processor,是 golang 中的調度器;
-
• p 可以理解爲 m 的執行代理,m 需要與 p 綁定後,纔會進入到 gmp 調度模式當中;因此 p 的數量決定了 g 最大並行數量(可由用戶通過 GOMAXPROCS 進行設定,在超過 CPU 核數時無意義)
-
• p 是 g 的存儲容器,其自帶一個本地 g 隊列(local run queue,簡稱 lrq),承載着一系列等待被調度的 g
當我們把 gmp 理解爲一個任務調度系統,那麼 p 就是這個系統中的”中樞 “,當其和作爲” 引擎 “ 的 m 結合後,纔會引導“引擎” 進入 gmp 的運行模式;同時 p 也是這個系統中存儲 “任務” 的“容器”,爲 “引擎” 提供了用於執行的任務資源.
結合上圖可以看到,承載 g 的容器分爲兩個部分:
- • p 的本地隊列 lrq(local run queue):這是每個 p 私有的 g 隊列,通常由 p 自行訪問,併發競爭情況較少,因此設計爲無鎖化結構,通過 CAS(compare-and-swap)操作訪問
當 m 與 p 結合後,不論是創建 g 還是獲取 g,都優先從私有的 lrq 中獲取,從而儘可能減少併發競爭行爲;這裏聊到併發情況較少,但並非完全沒有,是因爲還可能存在來自其他 p 的竊取行爲(stealwork)
- • 全局隊列 grq(global run queue):是全局調度模塊 schedt 中的全局共享 g 隊列,作爲當某個 lrq 不滿足條件時的備用容器,因爲不同的 m 都可能訪問 grq,因此併發競爭比較激烈,訪問前需要加全局鎖
介紹完了 g 的存儲容器設計後,接下來聊聊將 g 放入容器和取出容器的流程設計:
-
• put g:當某個 g 中通過 go func(){...} 操作創建子 g 時,會先嚐試將子 g 添加到當前所在 p 的 lrq 中(無鎖化);如果 lrq 滿了,則會將 g 追加到 grq 中(全局鎖). 此處採取的思路是 “就近原則”
-
• get g:gmp 調度流程中,m 和 p 結合後,運行的 g0 會不斷尋找合適的 g 用於執行,此時會採取 “負載均衡” 的思路,遵循如下實施步驟:
-
• 優先從當前 p 的 lrq 中獲取 g(無鎖化 - CAS)
-
• 從全局的 grq 中獲取 g(全局鎖)
-
• 取 io 就緒的 g(netpoll 機制)
-
• 從其他 p 的 lrq 中竊取 g(無鎖化 - CAS)
在 get g 流程中,還有一個細節需要注意,就是在 g0 每經過 61 次調度循環後,下一次會在處理 lrq 前優先處理一次 grq,避免因 lrq 過於忙碌而致使 grq 陷入饑荒狀態
1.4 gmp 生態
在 golang 中已經完全屏蔽了線程的概念,將 goroutine 統一爲整個語言層面的併發粒度,並遵循着 gmp 的秩序進行運作. 如果把 golang 程序比做一個人的話,那麼 gmp 就是這個人的骨架,支持着他的直立與行走;而在此基礎之上,緊密圍繞着 gmp 理念打造設計的一系列工具、模塊則像是在骨架之上填充的血肉,是依附於這套框架而存在的. 下面我們來看其中幾個經典的案例:
(1)內存管理
golang 的內存管理模塊主要繼承自 TCMalloc(Thread-Caching-Malloc)的設計思路,其中由契合 gmp 模型做了因地制宜的適配改造,爲每個 p 準備了一份私有的高速緩存——mcache,能夠無鎖化地完成一部分 p 本地的內存分配操作.
更多有關 golang 內存管理與垃圾回收的內容,可以閱讀我此前發表的系列專題:1)Golang 內存模型與分配機制;2)Golang 垃圾回收原理分析;3)Golang 垃圾回收源碼分析
(2)併發工具
在 golang 中的併發工具(例如鎖 mutex、通道 channel 等)均契合 gmp 作了適配改造,保證在執行阻塞操作時,會將阻塞粒度限制在 g(goroutine)而非 m(thread)的粒度,使得阻塞與喚醒操作都屬於用戶態行爲,無需內核的介入,同時一個 g 的阻塞也完全不會影響 m 下其他 g 的運行.
有關 mutex 和 channel 底層實現機制,可以閱讀我此前發表的文章:1)Golang 單機鎖實現原理;2)Golang channel 實現原理.
上面這項結論看似理所當然,但實際上是一項非常重要的特性,這一點隨着我近期在學習 c++ 過程中才產生了更深的感悟——我在近期嘗試着使用 c++ 效仿 gmp 實現一套協程調度體系,雖然還原出了其中大部分功能,但在使用上還是存在一個很大的缺陷,就是 c++ 標準庫中的併發工具(如 lock、semaphore 等)對應的阻塞粒度都是 thread 級別的,這就導致一個協程(coroutine)的阻塞會上升到線程(thread)級別,並導致其他 coroutine 也喪失被執行的機會.
這一點如果要解決,就需要針對所有併發工具做一層適配於協程粒度的改造,實現成本無疑是巨大的. 這也從側面印證了 golang 的併發優越性,這種適配性在語言層面就已經天然支持了.
(3)io 多路複用
在設計 io 模型時,golang 採用了 linux 系統提供的 epoll 多路複用技術,然而爲了因爲 epoll_wait 操作而引起 m(thread)粒度的阻塞,golang 專門設計一套 netpoll 機制,使用用戶態的 gopark 指令實現阻塞操作,使用非阻塞 epoll_wait 結合用戶態的 goready 指令實現喚醒操作,從而將 io 行爲也控制在 g 粒度,很好地契合了 gmp 調度體系.
如果對這部分內容感興趣的話,可以閱讀我近期剛發表的文章——萬字解析 golang netpoll 底層原理
類似上述的例子在 golang 世界中是無法窮盡的. gmp 是 golang 知識體系的基石,如果想要深入學習理解 golang,那麼 gmp 無疑是一個絕佳的學習起點.
2 gmp 詳設
文字性的理論描述難免過於空洞,g、m、p 並不是抽象的概念,事實上三者在源碼中都有着具體的實現,定義代碼均位於 runtime/runtime2.go. 下面就從具體的源碼中尋求原理內容的支撐和佐證.
2.1 g 詳設
g (goroutine)的類型聲明如下,其中包含如下核心成員字段:
-
• stack:g 的棧空間
-
• stackguard0:棧空間保護區邊界. 同時也承擔了傳遞搶佔標識的作用(5.3 小節中會進行呼應)
-
• panic:g 運行函數中發生的 panic
-
• defer:g 運行函數中創建的 defer 操作(以 LIFO 次序組織)
-
• m:正在執行 g 的 m(若 g 不爲 running 狀態,則此字段爲空)
-
• atomicstatus:g 的生命週期狀態(具體流轉規則參見上圖)
// 一個 goroutine 的具象類
type g struct{
// g 的執行棧空間
stack stack
/*
棧空間保護區邊界,用於探測是否執行棧擴容
在 g 超時搶佔過程中,用於傳遞搶佔標識
*/
stackguard0 uintptr
// ...
// 記錄 g 執行過程中遇到的異常
_panic *_panic
// g 中掛載的 defer 函數,是一個 LIFO 的鏈表結構
_defer *_defer
// g 從屬的 m
m *m
// ...
/*
g 的狀態
// g 實例剛被分配還未完成初始化
_Gidle = iota // 0
// g 處於就緒態. 可以被調度
_Grunnable // 1
// g 正在被調度運行過程中
_Grunning // 2
// g 正在執行系統調用
_Gsyscall // 3
// g 處於阻塞態,需要等待其他外部條件達成後,才能重新恢復成就緒態
_Gwaiting // 4
// 生死本是一個輪迴. 當 g 調度結束生命終結,或者剛被初始化準備迎接新生前,都會處於此狀態
_Gdead // 6
*/
atomicstatus uint32
// ...
// 進入全局隊列 grq 時指向相鄰 g 的 next 指針
schedlink guintptr
// ...
}
2.2 m 詳設
m(machine)是 go 對 thread 的抽象,其類定義代碼中包含如下核心成員:
-
• g0:執行調度流程的特殊 g(不由用戶創建,是與 m 一對一伴生的特殊 g,爲 m 尋找合適的普通 g 用於執行)
-
• gsignal:執行信號處理的特殊 g(不由用戶創建,是與 m 一對一伴生的特殊 g,處理分配給 m 的 signal)
-
• curg:m 上正在執行的普通 g(由用戶通過 go func(){...} 操作創建)
-
• p:當前與 m 結合的 p
type m struct{
// 用於調度普通 g 的特殊 g,與每個 m 一一對應
g0 *g
// ...
// m 的唯一 id
procid uint64
// 用於處理信號的特殊 g,與每個 m 一一對應
gsignal *g
// ...
// m 上正在運行的 g
curg *g
// m 關聯的 p
p puintptr
// ...
// 進入 schedt midle 鏈表時指向相鄰 m 的 next 指針
schedlink muintptr
// ...
}
此處暫時將 gsignal 按下不表,我們可以將 m 的運行目標劃分爲 g0 和 g ,兩者是始終交替進行的:g0 就類似於引擎中的調度邏輯,檢索任務列表尋找需要執行的任務;g 就是由 g0 找到並分配給 m 執行的一個具體任務.
2.3 p 詳設
p (processor)是 gmp 中的調度器,其類定義代碼中包含如下核心成員字段:
-
• status:p 生命週期狀態
-
• m:當前與 p 結合的 m
-
• runq:p 私有的 g 隊列——local run queue,簡稱 lrq
-
• runqhead:lrq 中隊首節點的索引
-
• runqtail:lrq 中隊尾節點的索引
-
• runnext:lrq 中的特定席,指向下一個即將執行的 g
type p struct{
id int32
/*
p 的狀態
// p 因缺少 g 而進入空閒模式,此時會被添加到全局的 idle p 隊列中
_Pidle = iota // 0
// p 正在運行中,被 m 所持有,可能在運行普通 g,也可能在運行 g0
_Prunning // 1
// p 所關聯的 m 正在執行系統調用. 此時 p 可能被竊取並與其他 m 關聯
_Psyscall // 2
// p 已被終止
_Pdead // 4
*/
status uint32// one of pidle/prunning/...
// 進入 schedt pidle 鏈表時指向相鄰 p 的 next 指針
link puintptr
// ...
// p 所關聯的 m. 若 p 爲 idle 狀態,可能爲 nil
m muintptr // back-link to associated m (nil if idle)
// lrq 的隊首
runqhead uint32
// lrq 的隊尾
runqtail uint32
// q 的本地 g 隊列——lrq
runq [256]guintptr
// 下一個調度的 g. 可以理解爲 lrq 中的特等席
runnext guintptr
// ...
}
2.4 schedt 詳設
schedt 是全局共享的資源模塊,在訪問前需要加全局鎖:
-
• lock:全局維度的互斥鎖
-
• midle:空閒 m 隊列
-
• pidle:空閒 p 隊列
-
• runq:全局 g 隊列——global run queue,簡稱 grq
-
• runqsize:grq 中存在的 g 個數
// 全局調度模塊
type schedt struct{
// ...
// 互斥鎖
lock mutex
// 空閒 m 隊列
midle muintptr // idle m's waiting for work
// ...
// 空閒 p 隊列
pidle puintptr // idle p's
// ...
// 全局 g 隊列——grq
runq gQueue
// grq 中存量 g 的個數
runqsize int32
// ...
}
之所以存在 midle 和 pidle 的設計,就是爲了避免 p 和 m 因缺少 g 而導致 cpu 空轉. 對於空閒的 p 和 m,會被集成到空閒隊列中,並且會暫停 m 的運行
3 調度原理
本章要和大家聊的流程是 “調度”. 所謂調度,指的是一個由用戶通過 go func(){...} 操作創建的 g,是如何被 m 上的 g0 獲取並執行的,所以簡單來說,就是由 g0 -> g 的流轉過程.
我習慣於將 “調度” 稱爲第一視角下的轉換,因爲該流轉過程是由 m 上運行的 g0 主動發起的,而無需第三方角色的干預.
3.1 main 函數與 g
1)main 函數
main 函數作爲整個 go 程序的入口是比較特殊的存在,它是由 go 程序全局唯一的 m0(main thread)執行的,對應源碼位於 runtime.proc.go:
//go:linkname main_main main.main
func main_main()
// The main goroutine.
func main(){
// ...
// 獲取用戶聲明的 main 函數
fn := main_main
// 執行用戶聲明的 main 函數
fn()
// ...
}
2)g
除了 main 函數這個特例之外,所有用戶通過 go func(){...} 操作啓動的 goroutine,都會以 g 的形式進入到 gmp 架構當中.
func handle() {
// 異步啓動 goroutine
go func(){
// do something ...
}()
}
在上述代碼中,我們會創建出一個 g 實例的創建,將其置爲就緒狀態,並添加到就緒隊列中:
-
• 如果當前 p 對應本地隊列 lrq 沒有滿,則添加到 lrq 中;
-
• 如果 lrq 滿了,則加鎖並添加到全局隊列 grq 中.
上述流程對應代碼爲 runtime/proc.go 的 newproc 方法中:
// 創建一個新的 g,本將其投遞入隊列. 入參 fn 爲用戶指定的函數.
// 當前執行方還是某個普通 g
func newproc(fn *funcval){
// 獲取當前正在執行的普通 g 及其程序計數器(program counter)
gp := getg()
pc := getcallerpc()
// 執行 systemstack 時,會臨時切換至 g0,並在完成其中閉包函數調用後,切換回到原本的普通 g
systemstack(func(){
// 此時執行方爲 g0
// 構造一個新的 g 實例
newg := newproc1(fn, gp, pc)
// 獲取當前 p
_p_ := getg().m.p.ptr()
/*
將 newg 添加到隊列中:
1)優先添加到 p 的本地隊列 lrq
2)若 lrq 滿了,則添加到全局隊列 grq
*/
runqput(_p_, newg,true)
// 如果存在因過度空閒而被 block 的 p 和 m,則需要對其進行喚醒
if mainStarted {
wakep()
}
})
// 切換回到原本的普通 g 繼續執行
// ...
}
其中,將 g 添加到就緒隊列的方法爲 runqput,展示如下:
// 嘗試將 g 添加到指定 p 的 lrq 中. 若 lrq 滿了,則將 g 添加到 grqrq 中
func runqput(_p_ *p, gp *g, next bool){
// ...
// 當 next 爲 true 時,會優先將 gp 以 cas 操作放置到 p 的 runnext 位置
// 如果原因 runnext 位置還有 g,則再嘗試將它追加到 lrq 的尾部
if next{
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))){
goto retryNext
}
// 如果 runnext 位置原本不存在 g 直接返回
if oldnext ==0{
return
}
// gp 指向 runnext 中被置換出來的 g
gp = oldnext.ptr()
}
retry:
// 獲取 lrq 頭節點的索引
h := atomic.LoadAcq(&_p_.runqhead)// load-acquire, synchronize with consumers
// 獲取 lrq 尾節點的索引
t := _p_.runqtail
// 如果 lrq 沒有滿,則將 g 追加到尾節點的位置,並且遞增尾節點的索引
if t-h <uint32(len(_p_.runq)){
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1)// store-release, makes the item available for consumption
return
}
// runqputslow 方法中會將 g 以及 lrq 中半數的 g 放置到全局隊列 grq 中
if runqputslow(_p_, gp, h, t){
return
}
// ...
}
3.2 g0 與 g
在每個 m 中會有一個與之伴生的 g0,其任務就是不斷尋找可執行的 g. 所以對一個 m 來說,其運行週期就是處在 g0 與 g 之間輪換交替的過程中.
type m struct {
// 用於尋找並調度普通 g 的特殊 g,與每個 m 一一對應
g0 *g
// ...
// m 上正在運行的普通 g
curg *g
// ...
}
在 m 運行中,能夠通過幾個樁方法實現 g0 與 g 之間執行權的切換:
-
• g -> g0:mcall、systemstack
-
• g0 -> g:gogo
對應方法聲明於 runtime/stubs.go 文件中:
// 從 g 切換至 g0 執行. 只允許在 g 中調用
func mcall(fn func(*g))
// 在普通 g 中調用時,會切換至 g0 壓棧執行 fn,執行完成後切回到 g
func systemstack(fn func())
// 從 g0 切換至 g 執行. gobuf 包含 g 運行上下文信息
func gogo(buf *gobuf)
而從 g0 視角出發來看,其在先後經歷了兩個核心方法後,完成了 g0 -> g 的切換:
-
• schedule:調用 findRunnable 方法,獲取到可執行的 g
-
• execute:更新 g 的上下文信息,調用 gogo 方法,將 m 的執行權由 g0 切換到 g
上述方法均實現於 runtime/proc.go 文件中:
// 執行方爲 g0
func schedule(){
// 獲取當前 g0
_g_ := getg()
// ...
top:
// 獲取當前 p
pp := _g_.m.p.ptr()
// ...
/*
核心方法:獲取需要調度的 g
- 按照優先級,依次取本地隊列 lrq、取全局隊列 grq、執行 netpoll、竊取其他 p lrq
- 若沒有合適 g,則將 p 和 m block 住並添加到空閒隊列中
*/
gp, inheritTime, tryWakeP := findRunnable()// blocks until work is available
// ...
// 執行 g,該方法中會將執行權由 g0 -> g
execute(gp, inheritTime)
}
// 執行給定的 g. 當前執行方還是 g0,但會通過 gogo 方法切換至 gp
func execute(gp *g, inheritTime bool){
// 獲取 g0
_g_ := getg()
// ...
/*
建立 m 和 gp 的關係
1)將 m 中的 curg 字段指向 gp
2)將 gp 的 m 字段指向當前 m
*/
_g_.m.curg = gp
gp.m = _g_.m
// 更新 gp 狀態 runnable -> running
casgstatus(gp,_Grunnable,_Grunning)
// ...
// 設置 gp 的棧空間保護區邊界
gp.stackguard0 = gp.stack.lo +_StackGuard
// ...
// 執行 gogo 方法,m 執行權會切換至 gp
gogo(&gp.sched)
}
3.3 find g
在調度流程中,最核心的步驟就在於,findRunnable 方法中如何按照指定的策略獲取到可執行的 g.
1)主流程
findRunnable 方法聲明於 runtime/proc.go 中,其核心步驟包括::
-
• 每經歷 61 次調度後,需要先處理一次全局隊列 grq(globrunqget——加鎖),避免產生飢餓;
-
• 嘗試從本地隊列 lrq 中獲取 g(runqget——CAS 無鎖)
-
• 嘗試從全局隊列 grq 獲取 g(globrunqget——加鎖)
-
• 嘗試獲取 io 就緒的 g(netpoll——非阻塞模式)
-
• 嘗試從其他 p 的 lrq 竊取 g(stealwork)
-
• double check 一次 grq(globrunqget——加鎖)
-
• 若沒找到 g,將 p 置爲 idle 狀態,添加到 schedt pidle 隊列(動態縮容)
-
• 確保留守一個 m,監聽處理 io 就緒的 g(netpoll——阻塞模式)
-
• 若 m 仍無事可做,則將其添加到 schedt midle 隊列(動態縮容)
-
• 暫停 m(回收資源)
// 獲取可用於執行的 g. 如果該方法返回了,則一定已經找到了目標 g.
func findRunnable()(gp *g, inheritTime, tryWakeP bool){
// 獲取當前執行 p 下的 g0
_g_ := getg()
// ...
top:
// 獲取 p
_p_ := _g_.m.p.ptr()
// ...
// 每 61 次調度,需要嘗試處理一次全局隊列 (防止飢餓)
if _p_.schedtick%61==0&& sched.runqsize >0{
lock(&sched.lock)
gp = globrunqget(_p_,1)
unlock(&sched.lock)
if gp !=nil{
return gp,false,false
}
}
// ...
// 嘗試從本地隊列 lrq 中獲取 g
if gp, inheritTime := runqget(_p_); gp !=nil{
return gp, inheritTime,false
}
// 嘗試從全局隊列 grq 中獲取 g
if sched.runqsize !=0{
lock(&sched.lock)
gp := globrunqget(_p_,0)
unlock(&sched.lock)
if gp !=nil{
return gp,false,false
}
}
// 執行 netpoll 流程,嘗試批量喚醒 io 就緒的 g 並獲取首個用以調度
if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{
if list := netpoll(0);!list.empty(){// non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp,_Gwaiting,_Grunnable)
// ...
return gp,false,false
}
}
// ...
// 從其他 p 的 lrq 中竊取 g
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp !=nil{
return gp, inheritTime,false
}
// 若存在 gc 併發標記任務,則以 idle 模式參與協作,好過直接回收 p
// ...
// 加全局鎖,並 double check 全局隊列是否有 g
lock(&sched.lock)
// ...
if sched.runqsize !=0{
gp := globrunqget(_p_,0)
unlock(&sched.lock)
return gp,false,false
}
// ...
// 確認當前 p 無事可做,則將 p 和 m 解綁,並將其添加到全局調度模塊 schedt 中的空閒 p 隊列 pidle 中
// 解除 m 和 p 的關係
releasep()
// 將 p 添加到 schedt.pidle 中
now = pidleput(_p_, now)
unlock(&sched.lock)
// ...
// 在 block 當前 m 之前,保證全局存在一個 m 留守下來,以阻塞模式執行 netpoll,保證有 io 就緒事件發生時,能被第一時間處理
if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{
atomic.Store64(&sched.pollUntil,uint64(pollUntil))
// ...
// 以阻塞模式執行 netpoll 流程
delay :=int64(-1)
// ...
list := netpoll(delay)// block until new work is available
// 恢復 lastpoll 標識
atomic.Store64(&sched.lastpoll,uint64(now))
// ...
lock(&sched.lock)
// 從 schedt 的空閒 p 隊列 pidle 中獲取一個空閒 p
_p_, _ = pidleget(now)
unlock(&sched.lock)
// 若沒有獲取到 p,則將就緒的 g 都添加到全局隊列 grq 中
if _p_ ==nil{
injectglist(&list)
}else{
// m 與 p 結合
acquirep(_p_)
// 將首個 g 直接用於調度,其餘的添加到全局隊列 grq
if!list.empty(){
gp := list.pop()
injectglist(&list)
casgstatus(gp,_Gwaiting,_Grunnable)
// ...
return gp,false,false
}
// ...
goto top
}
}
// ...
// 走到此處仍然未找到合適的 g 用於調度,則需要將 m block 住,添加到 schedt 的 midle 中
stopm()
goto top
}
2)從 lrq 獲取 g
runqget 方法用於從某個 p 的 lrq 中獲取 g:
-
• 以 CAS 操作取 runnext 位置的 g,獲取成功則返回
-
• 以 CAS 操作移動 lrq 的頭節點索引,然後返回頭節點對應 g
// [無鎖化]從某個 p 的本地隊列 lrq 中獲取 g
func runqget(_p_ *p)(gp *g, inheritTime bool){
// 首先嚐試獲取特定席位 runnext 中的 g,使用 cas 操作
next:= _p_.runnext
if next!=0&& _p_.runnext.cas(next,0){
return next.ptr(),true
}
// 嘗試基於 cas 操作,獲取本地隊列頭節點中的 g
for{
// 獲取頭節點索引
h := atomic.LoadAcq(&_p_.runqhead)// load-acquire, synchronize with other consumers
// 獲取尾節點索引
t := _p_.runqtail
// 頭尾節點重合,說明 lrq 爲空
if t == h {
return nil,false
}
// 根據索引從 lrq 中取出頭節點對應的 g
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
// 通過 cas 操作更新頭節點索引
if atomic.CasRel(&_p_.runqhead, h, h+1){// cas-release, commits consume
return gp,false
}
}
}
3)從全局隊列獲取 g
globrunqget 方法用於從全局的 grq 中獲取 g. 調用時需要確保持有 schedt 的全局鎖:
// 從全局隊列 grq 中獲取 g. 調用此方法前必須持有 schedt 中的互斥鎖 lock
func globrunqget(_p_ *p, max int32)*g {
// 斷言確保持有鎖
assertLockHeld(&sched.lock)
// 隊列爲空,直接返回
if sched.runqsize ==0{
return nil
}
// ...
// 此外還有一些邏輯是根據傳入的 max 值嘗試獲取 grq 中的半數 g 填充到 p 的 lrq 中. 此處不展開
// ...
// 從全局隊列的隊首彈出一個 g
gp := sched.runq.pop()
// ...
return gp
}
4)獲取 io 就緒的 g
在 gmp 調度流程中,如果 lrq 和 grq 都爲空,則會執行 netpoll 流程,嘗試以非阻塞模式下的 epoll_wait 操作獲取 io 就緒的 g. 該方法位於 runtime/netpoll_epoll.go:
func netpoll(delay int64) gList {
// ...
// 調用 epoll_wait 獲取就緒的 io event
var events [128]epollevent
n := epollwait(epfd,&events[0],int32(len(events)), waitms)
// ...
var toRun gList
for i :=int32(0); i < n; i++{
ev :=&events[i]
// 將就緒 event 對應 g 追加到的 glist 中
netpollready(...)
}
return toRun
}
5)從其他 p 竊取 g
如果執行完 netpoll 流程後仍未獲得 g,則會嘗試從其他 p 的 lrq 中竊取半數 g 補充到當前 p 的 lrq 中:
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool){
// 獲取當前 p
pp := getg().m.p.ptr()
// ...
// 外層循環 4 次
const stealTries =4
for i :=0; i < stealTries; i++{
// ...
// 通過隨機數以隨機起點隨機步長選取目標 p 進行竊取
for enum:= stealOrder.start(fastrand());!enum.done();enum.next(){
// ...
// 獲取擬竊取的目標 p
p2 := allp[enum.position()]
// 如果目標 p 是當前 p,則跳過
if pp == p2 {
continue
}
// ...
// 只要目標 p 不爲 idle 狀態,則進行竊取
if!idlepMask.read(enum.position()){
// 竊取目標 p,其中會嘗試將目標 p lrq 中半數 g 竊取到當前 p 的 lrq 中
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp !=nil{
return gp,false, now, pollUntil, ranTimer
}
}
}
}
// 竊取失敗 未找到合適的目標
return nil,false, now, pollUntil, ranTimer
}
6)回收空閒的 p 和 m
如果直到最後都沒有找到合適的 g 用於執行,則需要將 p 和 m 添加到 schedt 的 pidle 和 midle 隊列中並停止 m 的運行,避免產生資源浪費:
// 將 p 追加到 schedt pidle 隊列中
func pidleput(_p_ *p, now int64)int64{
assertLockHeld(&sched.lock)
// ...
// p 指針指向原本 pidle 隊首
_p_.link = sched.pidle
// 將 p 設置爲 pidle 隊首
sched.pidle.set(_p_)
atomic.Xadd(&sched.npidle,1)
// ...
}
// 將當前 m 添加到 schedt midle 隊列並停止 m
func stopm(){
_g_ := getg()
// ...
lock(&sched.lock)
// 將 m 添加到 schedt.mdile
mput(_g_.m)
unlock(&sched.lock)
// 停止 m
mPark()
// ...
}
4 讓渡設計
所謂 “讓渡”,指的是當 g 在 m 上運行時,主動讓出執行權,使得 m 的運行對象重新回到 g0,即由 g -> g0 的流轉過程.
“讓渡” 和” 調度 “一樣,也屬於第一視角下的轉換,該流轉過程是由 m 上運行的 g 主動發起的,而無需第三方角色的干預.
4.1 結束讓渡
當 g 執行結束時,會正常退出,並將執行權切換回到 g0.
首先,g 在運行結束時會調用 goexit1 方法中,並通過 mcall 指令切換至 g0,由 g0 調用 goexit0 方法,並由 g0 執行下述步驟:
-
• 將 g 狀態由 running 更新爲 dead
-
• 清空 g 中的數據
-
• 解除 g 和 m 的關係
-
• 將 g 添加到 p 的 gfree 隊列以供複用
-
• 調用 schedule 方法發起新一輪調度
// goroutine 運行結束. 此時執行方是普通 g
func goexit1(){
// 通過 mcall,將執行方轉爲 g0,調用 goexit0 方法
mcall(goexit0)
}
// 此時執行方爲 g0,入參 gp 爲已經運行結束的 g
func goexit0(gp *g){
// 獲取 g0
_g_ := getg()
// 獲取對應的 p
_p_ := _g_.m.p.ptr()
// 將 gp 的狀態由 running 更新爲 dead
casgstatus(gp,_Grunning,_Gdead)
// ...
// 將 gp 中的內容清空
gp.m =nil
// ...
gp._defer =nil// should be true already but just in case.
gp._panic =nil// non-nil for Goexit during panic. points at stack-allocated data.
// ...
// 將 g 和 p 解除關係
dropg()
// ...
// 將 g 添加到 p 的 gfree 隊列中
gfput(_p_, gp)
// ...
// 發起新一輪調度流程
schedule()
}
4.2 主動讓渡
主動讓渡指的是由用戶手動調用 runtime.Gosched 方法讓出 g 所持有的執行權. 在 Gosched 方法中,會通過 mcall 指令切換至 g0,並由 g0 執行 gosched_m 方法,其中包含如下步驟:
-
• 將 g 由 running 改爲 runnable 狀態
-
• 解除 g 和 m 的關係
-
• 將 g 直接添加到全局隊列 grq 中
-
• 調用 schedule 方法發起新一輪調度
// 主動讓渡出執行權,此時執行方還是普通 g
func Gosched() {
// ...
// 通過 mcall,將執行方轉爲 g0,調用 gosched_m 方法
mcall(gosched_m)
}
// 將 gp 切換回就緒態後添加到全局隊列 grq,併發起新一輪調度
// 此時執行方爲 g0
func gosched_m(gp *g){
// ...
goschedImpl(gp)
}
func goschedImpl(gp *g){
// ...
// 將 g 狀態由 running 改爲 runnable 就緒態
casgstatus(gp,_Grunning,_Grunnable)
// 解除 g 和 m 的關係
dropg()
// 將 g 添加到全局隊列 grq
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
// 發起新一輪調度
schedule()
}
4.3 阻塞讓渡
阻塞讓渡指的是 g 在執行過程中所依賴的外部條件沒有達成,需要進入阻塞等待的狀態(waiting),直到條件達成後才能完成將狀態重新更新爲就緒態(runnable).
Golang 針對 mutex、channel 等併發工具的設計,在底層都是採用了阻塞讓渡的設計模式,具體執行的方法是位於 runtime/proc.go 的 gopark 方法:
-
• 通過 mcall 從 g 切換至 g0,並由 g0 執行 park_m 方法
-
• g0 將 g 由 running 更新爲 waiting 狀態,然後發起新一輪調度
此處需要注意,在阻塞讓渡後,g 不會進入到 lrq 或 grq 中,因爲 lrq/grq 屬於就緒隊列. 在執行 gopark 時,使用方有義務自行維護 g 的引用,並在外部條件就緒時,通過 goready 操作將其更新爲 runnable 狀態並重新添加到就緒隊列中.
// 此時執行方爲普通 g
func gopark(unlockf func(*g, unsafe.Pointer)bool,lockunsafe.Pointer, reason waitReason, traceEv byte, traceskip int){
// 獲取 m 正在執行的 g,也就是要阻塞讓渡的 g
gp := mp.curg
// ...
// 通過 mcall,將執行方由普通 g -> g0
mcall(park_m)
}
// 此時執行方爲 g0. 入參 gp 爲需要執行 park 的普通 g
func park_m(gp *g){
// 獲取 g0
_g_ := getg()
// 將 gp 狀態由 running 變更爲 waiting
casgstatus(gp,_Grunning,_Gwaiting)
// 解綁 g 與 m 的關係
dropg()
// g0 發起新一輪調度流程
schedule()
}
與 gopark 相對的,是用於喚醒 g 的 goready 方法,其中會通過 systemstack 壓棧切換至 g0 執行 ready 方法——將目標 g 狀態由 waiting 改爲 runnable,然後添加到就緒隊列中.
// 此時執行方爲普通 g. 入參 gp 爲需要喚醒的另一個普通 g
func goready(gp *g, traceskip int) {
// 調用 systemstack 後,會切換至 g0 亞展調用傳入的 ready 方法. 調用結束後則會直接切換回到當前普通 g 繼續執行.
systemstack(func() {
ready(gp, traceskip, true)
})
// 恢復成普通 g 繼續執行 ...
}
// 此時執行方爲 g0. 入參 gp 爲擬喚醒的普通 g
func ready(gp *g, traceskip int, next bool){
// ...
// 獲取當前 g0
_g_ := getg()
// ...
// 將目標 g 狀態由 waiting 更新爲 runnable
casgstatus(gp,_Gwaiting,_Grunnable)
/*
1) 優先將目標 g 添加到當前 p 的本地隊列 lrq
2)若 lrq 滿了,則將 g 追加到全局隊列 grq
*/
runqput(_g_.m.p.ptr(), gp,next)
// 如果有 m 或 p 處於 idle 狀態,將其喚醒
wakep()
// ...
}
5 搶佔設計
最後是關於 “搶佔” 的流程介紹,搶佔和讓渡有相同之處,都表示由 g->g0 的流轉過程,但區別在於,讓渡是由 g 主動發起的(第一人稱),而搶佔則是由外力干預(sysmon thread)發起的(第三人稱).
5.1 監控線程
在 go 程序運行時,會啓動一個全局唯一的監控線程——sysmon thread,其負責定時執行監控工作,主要包括:
-
• 執行 netpoll 操作,喚醒 io 就緒的 g
-
• 執行 retake 操作,對運行時間過長的 g 執行搶佔操作
-
• 執行 gcTrigger 操作,探測是否需要發起新的 gc 輪次
// The main goroutine.
func main(){
systemstack(func(){
newm(sysmon,nil,-1)
})
// ...
}
func sysmon(){
// ..
for{
// 根據閒忙情況調整輪詢間隔,在空閒情況下 10 ms 輪詢一次
usleep(delay)
// ...
// 執行 netpoll
lastpoll :=int64(atomic.Load64(&sched.lastpoll))
if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now {
// ...
list := netpoll(0)// non-blocking - returns list of goroutines
// ...
}
// 執行搶佔工作
retake(now)
// ...
// 定時檢查是否需要發起 gc
if t :=(gcTrigger{kind: gcTriggerTime, now: now}); t.test()&& atomic.Load(&forcegc.idle)!=0{
// ...
}
// ...
}
}
執行搶佔邏輯的 retake 方法本章研究的重點,其中根據搶佔目標和狀態的不同,又可以分爲系統調用搶佔和運行超時搶佔.
5.2 系統調用
系統調用是 m(thread)粒度的,在執行期間會導致整個 m 暫時不可用,所以此時的搶佔處理思路是,將發起 syscall 的 g 和 m 綁定,但是解除 p 與 m 的綁定關係,使得此期間 p 存在和其他 m 結合的機會.
在發起系統調用時,會執行位於 runtime/proc.go 的 reentersyscall 方法,此方法核心步驟包括:
-
• 將 g 和 p 的狀態更新爲 syscall
-
• 解除 p 和 m 的綁定
-
• 將 p 設置爲 m.oldp,保留 p 與 m 之間的弱聯繫(使得 m syscall 結束後,還有一次嘗試複用 p 的機會)
func reentersyscall(pc, sp uintptr) {
// 獲取 g
_g_ := getg()
// ...
// 保存寄存器信息
save(pc, sp)
// ...
// 將 g 狀態更新爲 syscall
casgstatus(_g_,_Grunning,_Gsyscall)
// ...
// 解除 p 與 m 綁定關係
pp := _g_.m.p.ptr()
pp.m =0
// 將 p 設置爲 m 的 oldp
_g_.m.oldp.set(pp)
_g_.m.p =0
// 將 p 狀態更新爲 syscall
atomic.Store(&pp.status,_Psyscall)
// ...
}
當系統系統調用完成時,會執行位於 runtime/proc.go 的 exitsyscall 方法(此時執行方還是 m 上的 g),包含如下步驟:
-
• 檢查 syscall 期間,p 是否未和其他 m 結合,如果是的話,直接複用 p,繼續執行 g
-
• 通過 mcall 操作切換至 g0 執行 exitsyscall0 方法——嘗試爲當前 m 結合一個新的 p,如果結合成功,則繼續執行 g,否則將 g 添加到 grq 後暫停 m
func exitsyscall() {
// 獲取 g
_g_ := getg()
// ...
// 如果 oldp 沒有和其他 m 結合,則直接複用 oldp
oldp := _g_.m.oldp.ptr()
_g_.m.oldp =0
if exitsyscallfast(oldp){
// ...
// 將 g 狀態由 syscall 更新回 running
casgstatus(_g_,_Gsyscall,_Grunning)
// ...
return
}
// 切換至 g0 調用 exitsyscall0 方法
mcall(exitsyscall0)
// ...
}
// 此時執行方爲 m 下的 g0
func exitsyscall0(gp *g){
// 將 g 的狀態修改爲 runnable 就緒態
casgstatus(gp,_Gsyscall,_Grunnable)
// 解除 g 和 m 的綁定關係
dropg()
lock(&sched.lock)
// 嘗試尋找一個空閒的 p 與當前 m 結合
var _p_ *p
_p_, _ = pidleget(0)
var locked bool
// 如果與 p 結合失敗,則將 g 添加到全局隊列中
if _p_ ==nil{
globrunqput(gp)
// ...
}
// ...
unlock(&sched.lock)
// 如果與 p 結合成功,則繼續調度 g
if _p_ !=nil{
acquirep(_p_)
execute(gp,false)// Never returns.
}
// ...
// 與 p 結合失敗的話,需要將當前 m 添加到 schedt 的 midle 隊列並停止 m
stopm()
// 如果 m 被重新啓用,則發起新一輪調度
schedule()// Never returns.
}
我們將視角切回到 sysmon thread 中的 retake 方法,此處會遍歷每個 p,並針對正在發起系統調用的 p 執行如下檢查邏輯:
-
• 檢查 p 的 lrq 中是否存在等待執行的 g
-
• 檢查 p 的 syscall 時長是否 >= 10ms
但凡上述條件滿足其一,就會執行對 p 執行搶佔操作(handoffp)——分配一個新的 m 與 p 結合,完成後續任務的調度處理.
func retake(now int64) uint32{
n :=0
// 加鎖
lock(&allpLock)
// 遍歷所有 p
for i :=0; i <len(allp); i++{
_p_ := allp[i]
// ...
s := _p_.status
// ...
// 對於正在執行 syscall 的 p
if s ==_Psyscall{
// 如果 p 本地隊列爲空且發起系統調用時間 < 10ms,則不進行搶佔
if runqempty(_p_)&& atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle)>0&& pd.syscallwhen+10*1000*1000> now {
continue
}
unlock(&allpLock)
// 將 p 的狀態由 syscall 更新爲 idle
if atomic.Cas(&_p_.status, s,_Pidle){
// ...
// 讓 p 擁有和其他 m 結合的機會
handoffp(_p_)
}
// ...
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
func handoffp(_p_ *p) {
// 如果 p lrq 中還有 g 或者全局隊列 grq 中還有 g,則立即分配一個新 m 與該 p 結合
if!runqempty(_p_)|| sched.runqsize !=0{
// 分配一個 m 與 p 結合
startm(_p_,false)
return
}
// ...
// 若系統空閒沒有 g 需要調度,則將 p 添加到 schedt 中的空閒 p 隊列 pidle 中
pidleput(_p_,0)
// ...
}
5.3 運行超時
除了系統調用搶佔之外,當 sysmon thread 發現某個 g 執行時間過長時,也會對其發起搶佔操作.
1)發起搶佔
在 retake 方法中,會檢測到哪些 p 中運行一個 g 的時長超過了 10 ms,然後對其發起搶佔操作(preemtone):
func retake(now int64) uint32{
// ...
for i :=0; i <len(allp); i++{
_p_ := allp[i]
// ...
if s ==_Prunning{
// ...
// 如果某個 p 下存在運行超過 10 ms 的 g ,需要對 g 進行搶佔
if pd.schedwhen+forcePreemptNS <= now{
preemptone(_p_)
}
}
// ...
}
// ...
}
在 preemtone 方法中:
-
• 會對目標 g 設置搶佔標識(將 stackguard0 標識設置爲 stackPreempt),這樣當 g 運行到檢查點時,就會配合搶佔意圖,自覺完成讓渡操作
-
• 會對目標 g 所在的 m 發送搶佔信號 sigPreempt,通過改寫 g 程序計數器(pc,program counter)的方式將 g 逼停
// 搶佔指定 p 上正在執行的 g
func preemptone(_p_ *p)bool{
// 獲取 p 對應 m
mp := _p_.m.ptr()
// 獲取 p 上正在執行的 g(搶佔目標)
gp := mp.curg
// ...
/*
啓動協作式搶佔標識
1) 將搶佔標識 preempt 置爲 true
2)將 g 中的 stackguard0 標識置爲 stackPreempt
3)g 查看到搶佔標識後,會配合主動讓渡 p 調度權
*/
gp.preempt =true
gp.stackguard0 = stackPreempt
// 基於信號機制實現非協作式搶佔
if preemptMSupported && debug.asyncpreemptoff ==0{
_p_.preempt =true
preemptM(mp)
}
// ...
}
在 preemptM 方法中,會通過 tkill 指令向進程中的指定 thread 發送搶佔信號 sigPreempt,對應代碼位於 runtime/signal_unix.go:
func preemptM(mp *m) {
// ...
if atomic.Cas(&mp.signalPending,0,1){
if GOOS =="darwin"|| GOOS =="ios"{
atomic.Xadd(&pendingPreemptSignals,1)
}
// 向指定的 m 發送搶佔信號
// const sigPreempt untyped int = 16
signalM(mp, sigPreempt)
}
// ...
}
func signalM(mp *m, sig int){
pthread_kill(pthread(mp.procid),uint32(sig))
}
2)協作式搶佔
對於運行中的 g,在棧空間不足時,會切換至 g0 調用 newstack 方法執行棧空間擴張操作,在該流程中預留了一個檢查樁點,當其中發現 g 已經被打上搶佔標記時,就會主動配合執行讓渡操作:
// 棧擴張. 執行方爲 g0
func newstack(){
// ...
// 獲取當前 m 正在執行的 g
gp := thisg.m.curg
// ...
// 讀取 g 中的 stackguard0 標識位
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
// 若 stackguard0 標識位被置爲 stackPreempt,則代表需要對 g 進行搶佔
preempt := stackguard0 == stackPreempt
// ...
if preempt{
// 若當前 g 不具備搶佔條件,則繼續調度,不進行搶佔
// 當持有鎖、在進行內存分配或者顯式禁用搶佔模式時,則不允許對 g 執行搶佔操作
if!canPreemptM(thisg.m){
// ...
gogo(&gp.sched)// never return
}
}
if preempt {
// ...
// 響應搶佔意圖,完成讓渡操作
gopreempt_m(gp)// never return
}
// ...
}
// gopreempt_m 方法會走到 goschedImpl 方法中,後續流程與 4.2 小節中介紹的主動讓渡類似
func gopreempt_m(gp *g){
// ...
goschedImpl(gp)
}
func goschedImpl(gp *g){
// ...
casgstatus(gp,_Grunning,_Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
schedule()
}
這種通過預留檢查點,由 g 主動配合搶佔意圖完成讓渡操作的流程被稱作協作式搶佔,其存在的侷限就在於,當 g 未發生棧擴張行爲時,則沒有觸碰到檢查點的機會,也就無法響應搶佔意圖.
3)非協作式搶佔
爲了彌補協作式搶佔的不足,go 1.14 中引入了基於信號量實現的非協作式搶佔機制.
在 go 程序啓動時,main thread 會完成對各類信號量的監聽註冊,其中也包含了搶佔信號 sigPreempt(index = 16). 對應代碼位於 runtime/signal_unix.go:
func initsig(preinit bool) {
// ...
for i :=uint32(0); i < _NSIG; i++{
/*
var sigtable = [...]sigTabT{
// ...
// 16 {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"},
// ...
}
*/
t :=&sigtable[i]
// ...
// const _NSIG untyped int = 32
handlingSig[i]=1
setsig(i, abi.FuncPCABIInternal(sighandler))
}
}
當某個 m 接收到搶佔信號後,會由 gsignal 通過 sighandler 方法完成信號處理工作,此時針對搶佔信號會進一步調用 doSigPreempt 方法:在判斷 g 具備可搶佔條件後,則會保存 g 的寄存器信息,然後修改 g 的棧程序計數器 pc 和棧頂指針 sp,往其中插入一段函數 asyncPreempt:
// 此時執行該方法的是搶佔目標 g 所在 m 下的 gsignal
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g){
// 獲取擬搶佔 g 從屬 p 對應的 g0
_g_ := getg()
c :=&sigctxt{info, ctxt}
// 獲取擬搶佔 g 從屬的 m
mp := _g_.m
// ...
// 倘若接收到搶佔信號
if sig == sigPreempt && debug.asyncpreemptoff ==0&&!delayedSignal {
// 對目標 g 進行搶佔
doSigPreempt(gp, c)
}
// ...
}
// 此時執行該方法的是搶佔目標 g 所在 m 下的 gsignal
func doSigPreempt(gp *g, ctxt *sigctxt){
// 判斷 g 是否需要被搶佔
if wantAsyncPreempt(gp){
// 判斷 g 是否滿足搶佔條件
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// 通過修改 g 寄存器的方式,往 g 的執行指令中插入 asyncPreempt 函數
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
// ...
}
sigctxt.pushCall 方法中,通過移動棧頂指針(sp,stack pointer)、修改程序計數器(pc,program counter)的方式,強行在 g 的執行指令中插入了一段新的指令——asyncPreempt 函數.
// 此時執行該方法的是搶佔目標 g 所在 m 下的 gsignal
func (c *sigctxt) pushCall(targetPC, resumePC uintptr){
// 獲取棧頂指針 sp
sp :=uintptr(c.rsp())
// sp 偏移一個指針地址
sp -= goarch.PtrSize
// 將原本下一條執行指令 pc 存放在棧頂位置
*(*uintptr)(unsafe.Pointer(sp))= resumePC
// 更新棧頂指針 sp
c.set_rsp(uint64(sp))
// 將傳入的指令(asyncPreemt)作爲下一跳執行執行 pc
c.set_rip(uint64(targetPC))
}
由於 pc 被修改了,所以搶佔的目標 g 隨後會執行到 asyncPreemt2 方法,其中會通過 mcall 指令切換至 g0,並由 g0 執行 gopreempt_m 玩法,完成 g 的讓渡操作:
// 此時執行方是即將要被搶佔的 g,這段代碼是被臨時插入的邏輯
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
mcall(gopreempt_m)
gp.asyncSafePoint = false
}
6 總結
祝賀,至此全文結束.
本篇我們一起了解了 golang 中的 gmp 整體架構與應用生態,並深入到源碼中逐幀解析了 gmp 中的核心結構與執行流程設計. 希望上述內容能對各位 go 友們有所幫助~
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/BR6SO7bQF4UXQoRdEjorAg