Golang 非協作式搶佔的實現
從 Go 1.14 開始,通過使用信號,Go 語言實現了調度和 GC 過程中的真 “搶佔 “。
搶佔流程由搶佔的發起方向被搶佔線程發送 SIGURG 信號。
當被搶佔線程收到信號後,進入 SIGURG 的處理流程,將 asyncPreempt 的調用強制插入到用戶當前執行的代碼位置。
本節會對該過程進行詳盡分析。
搶佔發起的時機
搶佔會在下列時機發生:
-
STW 期間
-
在 P 上執行 safe point 函數期間
-
sysmon 後臺監控期間
-
gc pacer 分配新的 dedicated worker 期間
-
panic 崩潰期間
如上圖,除了棧掃描流程,所有觸發搶佔最終都會去執行 preemptone 函數。
棧掃描流程比較特殊:
從這些流程裏,我們挑出三個來一探究竟。
STW 搶佔
GC 的多個階段
上圖是現在 Go 語言的 GC 流程圖,在兩個 STW 階段都需要將正在執行的線程上的 running 狀態的 goroutine 停下來。
func stopTheWorldWithSema() {
.....
preemptall()
.....
// 等待剩餘的 P 主動停下
if wait {
for {
// wait for 100us, then try to re-preempt in case of any races
// 等待 100us,然後重新嘗試搶佔
if notetsleep(&sched.stopnote, 100*1000) {
noteclear(&sched.stopnote)
break
}
preemptall()
}
}
GC 棧掃描
goroutine 的棧是 GC 掃描期間的根,所有 markroot 中需要將用戶的 goroutine 停下來,主要是 running 狀態:
func markroot(gcw *gcWork, i uint32) {
// Note: if you add a case here, please also update heapdump.go:dumproots.
switch {
......
default:
// the rest is scanning goroutine stacks
var gp *g
......
// scanstack must be done on the system stack in case
// we're trying to scan our own stack.
systemstack(func() {
stopped := suspendG(gp)
scanstack(gp, gcw)
resumeG(stopped)
})
}
}
suspendG 中會調用 preemptM -> signalM 對正在執行的 goroutine 所在的線程發送搶佔信號。
sysmon 後臺監控
func sysmon() {
idle := 0 // how many cycles in succession we had not wokeup somebody
for {
......
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
}
}
執行 syscall 太久的,需要將 P 從 M 上剝離;運行用戶代碼太久的,需要搶佔停止該 goroutine 執行。這裏我們只看搶佔 goroutine 的部分:
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
......
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
s := _p_.status
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_)
}
}
......
}
......
}
協作式搶佔原理
cooperative preemption 關鍵在於 cooperative,搶佔的時機在各個版本實現差異不大,我們重點來看看這個協作過程。
函數頭、函數尾插入的棧擴容檢查
在 Go 語言中發生函數調用時,如果函數的 framesize > 0,說明在調用該函數時可能會發生 goroutine 的棧擴張,這時會在函數頭、函數尾分別插入一段彙編碼:
package main
func main() {
add(1, 2)
}
//go:noinline
func add(x, y int) (int, bool) {
var z = x + y
println(z)
return x + y, true
}
add 函數在使用 go tool compile -S 後會生成下面的結果:
// 這裏的彙編代碼使用 go1.14 生成
// 在 go1.17 之後,函數的調用規約發生變化
// 1.17 與以往版本頭部的彙編代碼也會有所不同,但邏輯保持一致
"".add STEXT size=103 args=0x20 locals=0x18
0x0000 00000 (add.go:8) TEXT "".add(SB), ABIInternal, $24-32
0x0000 00000 (add.go:8) MOVQ (TLS), CX
0x0009 00009 (add.go:8) CMPQ SP, 16(CX)
0x000d 00013 (add.go:8) JLS 96
...... func body
0x005f 00095 (add.go:11) RET
0x0060 00096 (add.go:11) NOP
0x0060 00096 (add.go:8) CALL runtime.morestack_noctxt(SB)
0x0065 00101 (add.go:8) JMP 0
TLS 中存儲的是 G 的指針,偏移 16 字節即是 G 的結構體中的 stackguard0。由於 goroutine 的棧也是從高地址向低地址增長,因此這裏檢查當前 SP < stackguard0 的話,說明需要對棧進行擴容了。
morestack 中的調度邏輯
// morestack_noctxt 是個簡單的彙編方法
// 直接跳轉到 morestack
TEXT runtime·morestack_noctxt(SB),NOSPLIT|NOFRAME,$0-0
MOV ZERO, CTXT
JMP runtime·morestack(SB)
TEXT runtime·morestack(SB),NOSPLIT,$0-0
......
// 前面會切換將執行現場保存到 goroutine 的 gobuf 中
// 並將執行棧切換到 g0
// Call newstack on m->g0's stack.
MOVQ m_g0(BX), BX
MOVQ BX, g(CX)
MOVQ (g_sched+gobuf_sp)(BX), SP
CALL runtime·newstack(SB)
......
RET
morestack 會將 goroutine 的現場保存在當前 goroutine 的 gobuf 中,並將執行棧切換到 g0,然後在 g0 上執行 runtime.newstack。
在未實現信號搶佔之前,用戶的 g 到底啥時候能停下來,負責 GC 棧掃描的 goroutine 也不知道,所以 scanstack 也就只能設置一下 preemptscan 的標誌位,最終棧掃描要 newstack[1] 來配合,下面的 newstack 是 Go 1.13 版本的實現:
func newstack() {
thisg := getg()
gp := thisg.m.curg
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
if preempt {
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
if preempt {
// 要和 scang 過程配合
// 老版本的 newstack 和 gc scan 過程是有較重的耦合的
casgstatus(gp, _Grunning, _Gwaiting)
if gp.preemptscan {
for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
}
if !gp.gcscandone {
gcw := &gp.m.p.ptr().gcw
// 注意這裏,偶合了 GC 的 scanstack 邏輯代碼
scanstack(gp, gcw)
gp.gcscandone = true
}
gp.preemptscan = false
gp.preempt = false
casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
casgstatus(gp, _Gwaiting, _Grunning)
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
casgstatus(gp, _Gwaiting, _Grunning)
gopreempt_m(gp) // never return
}
......
}
搶佔成功後,當前的 goroutine 會被放在全局隊列中:
func gopreempt_m(gp *g) {
goschedImpl(gp)
}
func goschedImpl(gp *g) {
status := readgstatus(gp)
......
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp) // 將當前 goroutine 放進全局隊列
unlock(&sched.lock)
schedule() // 當前線程重新進入調度循環
}
信號式搶佔實現後的 newstack
在實現了信號式搶佔之後,對於用戶的 goroutine 何時中止有了一些預期,所以 newstack 就不需要耦合 scanstack 的邏輯了,新版的 newstack[2] 實現如下:
func newstack() {
thisg := getg()
gp := thisg.m.curg
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
if preempt {
if !canPreemptM(thisg.m) {
// 讓 goroutine 繼續執行
// 下次再搶佔它
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
if preempt {
// 當 GC 需要發起 goroutine 的棧掃描時
// 會設置這個 preemptStop 爲 true
// 這時候需要 goroutine 自己去 gopark
if gp.preemptStop {
preemptPark(gp) // never returns
}
// 除了 GC 棧掃描以外的其它搶佔場景走這個分支
// 看起來就像 goroutine 自己調用了 runtime.Gosched 一樣
gopreempt_m(gp) // never return
}
...... 後面就是正常的棧擴展邏輯了
}
newstack 中會使用 canPreemptM[3] 判斷哪些場景適合搶佔,哪些不適合。如果當前 goroutine 正在執行 (即 status == running),並且滿足下列任意其一:
-
持有鎖 (主要是寫鎖,讀鎖其實判斷不出來);
-
正在進行內存分配
-
preemptoff 非空
便不應該進行搶佔,會在下一次進入到 newstack 時再進行判斷。
非協作式搶佔
非協作式搶佔,就是通過信號處理來實現的。所以我們只要關注 SIGURG 的處理流程即可。
信號處理初始化
當 m0(即程序啓動時的第一個線程) 初始化時,會進行信號處理的初始化工作:
// mstartm0 implements part of mstart1 that only runs on the m0.
func mstartm0() {
initsig(false)
}
// Initialize signals.
func initsig(preinit bool) {
for i := uint32(0); i < _NSIG; i++ {
setsig(i, funcPC(sighandler))
}
}
var sigtable = [...]sigTabT{
......
/* 23 */ {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"},
......
}
最後都是執行 sigaction:
TEXT runtime·rt_sigaction(SB),NOSPLIT,$0-36
MOVQ sig+0(FP), DI
MOVQ new+8(FP), SI
MOVQ old+16(FP), DX
MOVQ size+24(FP), R10
MOVL $SYS_rt_sigaction, AX
SYSCALL
MOVL AX, ret+32(FP)
RET
與一般的 syscall 區別不大。
信號處理初始化的流程比較簡單,就是給所有已知的需要處理的信號綁上 sighandler。
發送信號
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
gp := mp.curg
gp.preempt = true
gp.stackguard0 = stackPreempt
// 向該線程發送 SIGURG 信號
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
preemptM 的流程較爲線性:
func preemptM(mp *m) {
if atomic.Cas(&mp.signalPending, 0, 1) {
signalM(mp, sigPreempt)
}
}
func signalM(mp *m, sig int) {
tgkill(getpid(), int(mp.procid), sig)
}
最後使用 tgkill 這個 syscall 將信號發送給指定 id 的線程:
TEXT ·tgkill(SB),NOSPLIT,$0
MOVQ tgid+0(FP), DI
MOVQ tid+8(FP), SI
MOVQ sig+16(FP), DX
MOVL $SYS_tgkill, AX
SYSCALL
RET
接收信號後的處理
當線程 m 接收到信號後,會從用戶棧 g 切換到 gsignal 執行信號處理邏輯,即 sighandler 流程:
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
_g_ := getg()
c := &sigctxt{info, ctxt}
......
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
doSigPreempt(gp, c)
}
......
}
如果收到的是搶佔信號,那麼執行 doSigPreempt 邏輯:
func doSigPreempt(gp *g, ctxt *sigctxt) {
// 檢查當前 G 被搶佔是否安全
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(funcPC(asyncPreempt), newpc)
}
}
......
}
isAsyncSafePoint 中會把一些不應該搶佔的場景過濾掉,具體包括:
-
當前代碼在彙編編寫的函數中執行
-
代碼在 runtime,runtime/internal 或者 reflect 包中執行
doSigPreempt 代碼中的 pushCall 是關鍵步驟:
func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
// Make it look like we called target at resumePC.
sp := uintptr(c.rsp())
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = resumePC
c.set_rsp(uint64(sp))
c.set_rip(uint64(targetPC))
}
pushCall 相當於將用戶將要執行的下一條代碼的地址直接 push 到棧上,並 jmp 到指定的 target 地址去執行代碼:
before
----- PC = 0x123
local var 1
-----
local var 2
----- <---- SP
after
----- PC = targetPC
local var 1
-----
local var 2
-----
prev PC = 0x123
----- <---- SP
這裏的 target 就是 asyncPreempt。
asyncPreempt 執行流程分析
asyncPreempt 分爲上半部分和下半部分,中間被 asyncPreempt2 隔開。上半部分負責將 goroutine 當前執行現場的所有寄存器都保存到當前的運行棧上。
下半部分負責在 asyncPreempt2 返回後將這些現場恢復出來。
TEXT ·asyncPreempt<ABIInternal>(SB),NOSPLIT|NOFRAME,$0-0
PUSHQ BP
MOVQ SP, BP
...... 保存現場 1
MOVQ AX, 0(SP)
MOVQ CX, 8(SP)
MOVQ DX, 16(SP)
MOVQ BX, 24(SP)
MOVQ SI, 32(SP)
...... 保存現場 2
MOVQ R15, 104(SP)
MOVUPS X0, 112(SP)
MOVUPS X1, 128(SP)
......
MOVUPS X15, 352(SP)
CALL ·asyncPreempt2(SB)
MOVUPS 352(SP), X15
...... 恢復現場 2
MOVUPS 112(SP), X0
MOVQ 104(SP), R15
...... 恢復現場 1
MOVQ 8(SP), CX
MOVQ 0(SP), AX
......
RET
asyncPreempt2 中有兩個分支:
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop { // 這個 preemptStop 是在 GC 的棧掃描中才會設置爲 true
mcall(preemptPark)
} else { // 除了棧掃描,其它搶佔全部走這條分支
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
GC 棧掃描走 if 分支,除棧掃描以外所有情況均走 else 分支。
棧掃描搶佔流程
suspendG -> preemptM -> signalM 發信號。
sighandler -> asyncPreempt -> 保存執行現場 -> asyncPreempt2 -> preemptPark
preemptPark 和 gopark 類似,掛起當前正在執行的 goroutine,該 goroutine 之前綁定的線程就可以繼續執行調度循環了。
scanstack 執行完之後:
resumeG -> ready -> runqput 會讓之前被停下來的 goroutine 進當前 P 的隊列或全局隊列。
其它流程
preemptone -> preemptM - signalM 發信號。
sighandler -> asyncPreempt -> 保存執行現場 -> asyncPreempt2 -> gopreempt_m
gopreempt_m 會直接將被搶佔的 goroutine 放進全局隊列。
無論是棧掃描流程還是其它流程,當 goroutine 程序被調度到時,都是從彙編中的 CALL ·asyncPreempt2(SB)
的下一條指令開始執行的,即 asyncPreempt 彙編函數的下半部分。
公衆號
這部分會將之前 goroutine 的現場完全恢復,就和搶佔從來沒有發生過一樣。
[1]
newstack: https://github.com/golang/go/blob/2bc8d90fa21e9547aeb0f0ae775107dc8e05dc0a/src/runtime/stack.go#L917
[2]
newstack: https://github.com/golang/go/blob/c3b47cb598e1ecdbbec110325d9d1979553351fc/src/runtime/stack.go#L948
[3]
canPreemptM: https://github.com/golang/go/blob/287c5e8066396e953254d7980a80ec082edf11bd/src/runtime/preempt.go#L287
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/EfDmwKilzVLAR-gH_7yzDQ