Golang 非協作式搶佔的實現

從 Go 1.14 開始,通過使用信號,Go 語言實現了調度和 GC 過程中的真 “搶佔 “。

搶佔流程由搶佔的發起方向被搶佔線程發送 SIGURG 信號。

當被搶佔線程收到信號後,進入 SIGURG 的處理流程,將 asyncPreempt 的調用強制插入到用戶當前執行的代碼位置。

本節會對該過程進行詳盡分析。

搶佔發起的時機

搶佔會在下列時機發生:

如上圖,除了棧掃描流程,所有觸發搶佔最終都會去執行 preemptone 函數。

棧掃描流程比較特殊:

從這些流程裏,我們挑出三個來一探究竟。

STW 搶佔

GC 的多個階段

上圖是現在 Go 語言的 GC 流程圖,在兩個 STW 階段都需要將正在執行的線程上的 running 狀態的 goroutine 停下來。

func stopTheWorldWithSema() {
 .....
 preemptall()
 .....
 // 等待剩餘的 P 主動停下
 if wait {
  for {
   // wait for 100us, then try to re-preempt in case of any races
   // 等待 100us,然後重新嘗試搶佔
   if notetsleep(&sched.stopnote, 100*1000) {
    noteclear(&sched.stopnote)
    break
   }
   preemptall()
  }
 }

GC 棧掃描

goroutine 的棧是 GC 掃描期間的根,所有 markroot 中需要將用戶的 goroutine 停下來,主要是 running 狀態:

func markroot(gcw *gcWork, i uint32) {
 // Note: if you add a case here, please also update heapdump.go:dumproots.
 switch {
 ......
 default:
  // the rest is scanning goroutine stacks
  var gp *g
  ......

  // scanstack must be done on the system stack in case
  // we're trying to scan our own stack.
  systemstack(func() {
   stopped := suspendG(gp)
   scanstack(gp, gcw)
   resumeG(stopped)
  })
 }
}

suspendG 中會調用 preemptM -> signalM 對正在執行的 goroutine 所在的線程發送搶佔信號。

sysmon 後臺監控

func sysmon() {
 idle := 0 // how many cycles in succession we had not wokeup somebody
 for {
  ......
  // retake P's blocked in syscalls
  // and preempt long running G's
  if retake(now) != 0 {
   idle = 0
  } else {
   idle++
  }
 }
}

執行 syscall 太久的,需要將 P 從 M 上剝離;運行用戶代碼太久的,需要搶佔停止該 goroutine 執行。這裏我們只看搶佔 goroutine 的部分:

const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
 ......
 for i := 0; i < len(allp); i++ {
  _p_ := allp[i]
  s := _p_.status
  if s == _Prunning || s == _Psyscall {
   // Preempt G if it's running for too long.
   t := int64(_p_.schedtick)
   if int64(pd.schedtick) != t {
    pd.schedtick = uint32(t)
    pd.schedwhen = now
   } else if pd.schedwhen+forcePreemptNS <= now {
    preemptone(_p_)
   }
  }
  ......
 }
 ......
}

協作式搶佔原理

cooperative preemption 關鍵在於 cooperative,搶佔的時機在各個版本實現差異不大,我們重點來看看這個協作過程。

函數頭、函數尾插入的棧擴容檢查

在 Go 語言中發生函數調用時,如果函數的 framesize > 0,說明在調用該函數時可能會發生 goroutine 的棧擴張,這時會在函數頭、函數尾分別插入一段彙編碼:

package main

func main() {
 add(1, 2)
}

//go:noinline
func add(x, y int) (int, bool) {
 var z = x + y
 println(z)
 return x + y, true
}

add 函數在使用 go tool compile -S 後會生成下面的結果:

// 這裏的彙編代碼使用 go1.14 生成
// 在 go1.17 之後,函數的調用規約發生變化
// 1.17 與以往版本頭部的彙編代碼也會有所不同,但邏輯保持一致
"".add STEXT size=103 args=0x20 locals=0x18
 0x0000 00000 (add.go:8) TEXT "".add(SB), ABIInternal, $24-32
 0x0000 00000 (add.go:8) MOVQ (TLS), CX
 0x0009 00009 (add.go:8) CMPQ SP, 16(CX)
 0x000d 00013 (add.go:8) JLS 96
 ...... func body
 0x005f 00095 (add.go:11) RET
 0x0060 00096 (add.go:11) NOP
 0x0060 00096 (add.go:8) CALL runtime.morestack_noctxt(SB)
 0x0065 00101 (add.go:8) JMP 0

TLS 中存儲的是 G 的指針,偏移 16 字節即是 G 的結構體中的 stackguard0。由於 goroutine 的棧也是從高地址向低地址增長,因此這裏檢查當前 SP < stackguard0 的話,說明需要對棧進行擴容了。

morestack 中的調度邏輯

// morestack_noctxt 是個簡單的彙編方法
// 直接跳轉到 morestack
TEXT runtime·morestack_noctxt(SB),NOSPLIT|NOFRAME,$0-0
 MOV ZERO, CTXT
 JMP runtime·morestack(SB)

TEXT runtime·morestack(SB),NOSPLIT,$0-0
 ......
 // 前面會切換將執行現場保存到 goroutine 的 gobuf 中
 // 並將執行棧切換到 g0
 // Call newstack on m->g0's stack.
 MOVQ m_g0(BX), BX
 MOVQ BX, g(CX)
 MOVQ (g_sched+gobuf_sp)(BX), SP
 CALL runtime·newstack(SB)
 ......
 RET

morestack 會將 goroutine 的現場保存在當前 goroutine 的 gobuf 中,並將執行棧切換到 g0,然後在 g0 上執行 runtime.newstack。

在未實現信號搶佔之前,用戶的 g 到底啥時候能停下來,負責 GC 棧掃描的 goroutine 也不知道,所以 scanstack 也就只能設置一下 preemptscan 的標誌位,最終棧掃描要 newstack[1] 來配合,下面的 newstack 是 Go 1.13 版本的實現:

func newstack() {
 thisg := getg()
 gp := thisg.m.curg
 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
 if preempt {
  if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
   gp.stackguard0 = gp.stack.lo + _StackGuard
   gogo(&gp.sched) // never return
  }
 }

 if preempt {
  // 要和 scang 過程配合
  // 老版本的 newstack 和 gc scan 過程是有較重的耦合的
  casgstatus(gp, _Grunning, _Gwaiting)
  if gp.preemptscan {
   for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
   }
   if !gp.gcscandone {
    gcw := &gp.m.p.ptr().gcw
    // 注意這裏,偶合了 GC 的 scanstack 邏輯代碼
    scanstack(gp, gcw)
    gp.gcscandone = true
   }
   gp.preemptscan = false
   gp.preempt = false
   casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
   casgstatus(gp, _Gwaiting, _Grunning)
   gp.stackguard0 = gp.stack.lo + _StackGuard
   gogo(&gp.sched) // never return
  }

  casgstatus(gp, _Gwaiting, _Grunning)
  gopreempt_m(gp) // never return
 }
 ......
}

搶佔成功後,當前的 goroutine 會被放在全局隊列中:

func gopreempt_m(gp *g) {
 goschedImpl(gp)
}

func goschedImpl(gp *g) {
 status := readgstatus(gp)
 ......

 casgstatus(gp, _Grunning, _Grunnable)
 dropg()
 lock(&sched.lock)
 globrunqput(gp) // 將當前 goroutine 放進全局隊列
 unlock(&sched.lock)

 schedule() // 當前線程重新進入調度循環
}

信號式搶佔實現後的 newstack

在實現了信號式搶佔之後,對於用戶的 goroutine 何時中止有了一些預期,所以 newstack 就不需要耦合 scanstack 的邏輯了,新版的 newstack[2] 實現如下:

func newstack() {
 thisg := getg()
 gp := thisg.m.curg
 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

 if preempt {
  if !canPreemptM(thisg.m) {
   // 讓 goroutine 繼續執行
   // 下次再搶佔它
   gp.stackguard0 = gp.stack.lo + _StackGuard
   gogo(&gp.sched) // never return
  }
 }

 if preempt {
  // 當 GC 需要發起 goroutine 的棧掃描時
  // 會設置這個 preemptStop 爲 true
  // 這時候需要 goroutine 自己去 gopark
  if gp.preemptStop {
   preemptPark(gp) // never returns
  }

  // 除了 GC 棧掃描以外的其它搶佔場景走這個分支
  // 看起來就像 goroutine 自己調用了 runtime.Gosched 一樣
  gopreempt_m(gp) // never return
 }
 ...... 後面就是正常的棧擴展邏輯了
}

newstack 中會使用 canPreemptM[3] 判斷哪些場景適合搶佔,哪些不適合。如果當前 goroutine 正在執行 (即 status == running),並且滿足下列任意其一:

便不應該進行搶佔,會在下一次進入到 newstack 時再進行判斷。

非協作式搶佔

非協作式搶佔,就是通過信號處理來實現的。所以我們只要關注 SIGURG 的處理流程即可。

信號處理初始化

當 m0(即程序啓動時的第一個線程) 初始化時,會進行信號處理的初始化工作:

// mstartm0 implements part of mstart1 that only runs on the m0.
func mstartm0() {
 initsig(false)
}

// Initialize signals.
func initsig(preinit bool) {
 for i := uint32(0); i < _NSIG; i++ {
  setsig(i, funcPC(sighandler))
 }
}

var sigtable = [...]sigTabT{
 ......
 /* 23 */ {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"},
 ......
}

最後都是執行 sigaction:

TEXT runtime·rt_sigaction(SB),NOSPLIT,$0-36
 MOVQ sig+0(FP), DI
 MOVQ new+8(FP), SI
 MOVQ old+16(FP), DX
 MOVQ size+24(FP), R10
 MOVL $SYS_rt_sigaction, AX
 SYSCALL
 MOVL AX, ret+32(FP)
 RET

與一般的 syscall 區別不大。

信號處理初始化的流程比較簡單,就是給所有已知的需要處理的信號綁上 sighandler。

發送信號

func preemptone(_p_ *p) bool {
 mp := _p_.m.ptr()
 gp := mp.curg
 gp.preempt = true
 gp.stackguard0 = stackPreempt

 // 向該線程發送 SIGURG 信號
 if preemptMSupported && debug.asyncpreemptoff == 0 {
  _p_.preempt = true
  preemptM(mp)
 }

 return true
}

preemptM 的流程較爲線性:

func preemptM(mp *m) {
 if atomic.Cas(&mp.signalPending, 0, 1) {
  signalM(mp, sigPreempt)
 }
}

func signalM(mp *m, sig int) {
 tgkill(getpid(), int(mp.procid), sig)
}

最後使用 tgkill 這個 syscall 將信號發送給指定 id 的線程:

TEXT ·tgkill(SB),NOSPLIT,$0
 MOVQ tgid+0(FP), DI
 MOVQ tid+8(FP), SI
 MOVQ sig+16(FP), DX
 MOVL $SYS_tgkill, AX
 SYSCALL
 RET

接收信號後的處理

當線程 m 接收到信號後,會從用戶棧 g 切換到 gsignal 執行信號處理邏輯,即 sighandler 流程:

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
 _g_ := getg()
 c := &sigctxt{info, ctxt}

 ......
 if sig == sigPreempt && debug.asyncpreemptoff == 0 {
  doSigPreempt(gp, c)
 }
 ......
}

如果收到的是搶佔信號,那麼執行 doSigPreempt 邏輯:

func doSigPreempt(gp *g, ctxt *sigctxt) {
 // 檢查當前 G 被搶佔是否安全
 if wantAsyncPreempt(gp) {
  if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
   // Adjust the PC and inject a call to asyncPreempt.
   ctxt.pushCall(funcPC(asyncPreempt), newpc)
  }
 }
 ......
}

isAsyncSafePoint 中會把一些不應該搶佔的場景過濾掉,具體包括:

doSigPreempt 代碼中的 pushCall 是關鍵步驟:

func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
 // Make it look like we called target at resumePC.
 sp := uintptr(c.rsp())
 sp -= sys.PtrSize
 *(*uintptr)(unsafe.Pointer(sp)) = resumePC
 c.set_rsp(uint64(sp))
 c.set_rip(uint64(targetPC))
}

pushCall 相當於將用戶將要執行的下一條代碼的地址直接 push 到棧上,並 jmp 到指定的 target 地址去執行代碼:

before

-----                     PC = 0x123
local var 1
-----
local var 2
----- <---- SP

after

-----                  PC = targetPC
local var 1
-----
local var 2
-----
prev PC = 0x123
----- <---- SP

這裏的 target 就是 asyncPreempt。

asyncPreempt 執行流程分析

asyncPreempt 分爲上半部分和下半部分,中間被 asyncPreempt2 隔開。上半部分負責將 goroutine 當前執行現場的所有寄存器都保存到當前的運行棧上。

下半部分負責在 asyncPreempt2 返回後將這些現場恢復出來。

TEXT ·asyncPreempt<ABIInternal>(SB),NOSPLIT|NOFRAME,$0-0
 PUSHQ BP
 MOVQ SP, BP
 ...... 保存現場 1
 MOVQ AX, 0(SP)
 MOVQ CX, 8(SP)
 MOVQ DX, 16(SP)
 MOVQ BX, 24(SP)
 MOVQ SI, 32(SP)
 ...... 保存現場 2
 MOVQ R15, 104(SP)
 MOVUPS X0, 112(SP)
 MOVUPS X1, 128(SP)
 ......
 MOVUPS X15, 352(SP)

 CALL ·asyncPreempt2(SB)

 MOVUPS 352(SP), X15
 ...... 恢復現場 2
 MOVUPS 112(SP), X0
 MOVQ 104(SP), R15
 ...... 恢復現場 1
 MOVQ 8(SP), CX
 MOVQ 0(SP), AX
 ......
 RET

asyncPreempt2 中有兩個分支:

func asyncPreempt2() {
 gp := getg()
 gp.asyncSafePoint = true
 if gp.preemptStop { // 這個 preemptStop 是在 GC 的棧掃描中才會設置爲 true
  mcall(preemptPark)
 } else { // 除了棧掃描,其它搶佔全部走這條分支
  mcall(gopreempt_m)
 }
 gp.asyncSafePoint = false
}

GC 棧掃描走 if 分支,除棧掃描以外所有情況均走 else 分支。

棧掃描搶佔流程

suspendG -> preemptM -> signalM 發信號。

sighandler -> asyncPreempt -> 保存執行現場 -> asyncPreempt2 -> preemptPark

preemptPark 和 gopark 類似,掛起當前正在執行的 goroutine,該 goroutine 之前綁定的線程就可以繼續執行調度循環了。

scanstack 執行完之後:

resumeG -> ready -> runqput 會讓之前被停下來的 goroutine 進當前 P 的隊列或全局隊列。

其它流程

preemptone -> preemptM - signalM 發信號。

sighandler -> asyncPreempt -> 保存執行現場 -> asyncPreempt2 -> gopreempt_m

gopreempt_m 會直接將被搶佔的 goroutine 放進全局隊列。

無論是棧掃描流程還是其它流程,當 goroutine 程序被調度到時,都是從彙編中的 CALL ·asyncPreempt2(SB) 的下一條指令開始執行的,即 asyncPreempt 彙編函數的下半部分。

公衆號

這部分會將之前 goroutine 的現場完全恢復,就和搶佔從來沒有發生過一樣。

[1]

newstack: https://github.com/golang/go/blob/2bc8d90fa21e9547aeb0f0ae775107dc8e05dc0a/src/runtime/stack.go#L917

[2]

newstack: https://github.com/golang/go/blob/c3b47cb598e1ecdbbec110325d9d1979553351fc/src/runtime/stack.go#L948

[3]

canPreemptM: https://github.com/golang/go/blob/287c5e8066396e953254d7980a80ec082edf11bd/src/runtime/preempt.go#L287

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/EfDmwKilzVLAR-gH_7yzDQ