Go 中祕而不宣的數據結構 runq- 難怪運行時調度那麼好
首先,讓我們先來回顧 Go 運行時的 GPM 模型。這方面的介紹網上的資料都非常非常多了,但是我們也不妨回顧一下:
GPM 模型中的 G 代表 goroutine。每個 goroutine 只佔用幾 KB 的內存, 可以輕鬆創建成千上萬個。G 包含了 goroutine 的棧、指令指針和其他信息, 如阻塞 channel 的等待隊列等。
P 代表 processor, 可以理解爲一個抽象的 CPU 核心。P 的數量默認等於實際的 CPU 核心數, 但可以通過環境變量進行調整。P 維護了一個本地的 goroutine 隊列, 還負責執行 goroutine 並管理與之關聯的上下文信息。
M 代表 machine, 是操作系統線程。一個 M 必須綁定一個 P 才能執行 goroutine。當一個 M 阻塞時, 運行時會創建一個新的 M 或者複用一個空閒的 M 來保證 P 的數量總是等於 GOMAXPROCS 的值, 從而充分利用 CPU 資源。
在這個模型中, P 扮演了承上啓下的角色。它連接了 G 和 M, 實現了用戶層級的 goroutine 到操作系統線程的映射。這種設計允許 Go 在用戶空間進行調度, 避免了頻繁的系統調用, 大大提高了併發效率。
調度過程中, 當一個 goroutine 被創建時, 它會被放到 P 的本地隊列或全局隊列中。如果 P 的本地隊列已滿, 一些 goroutine 會被放到全局隊列。當 P 執行完當前的 goroutine 後, 會優先從本地隊列獲取新的 goroutine 來執行。如果本地隊列爲空, P 會嘗試從全局隊列或其他 P 的隊列中偷取 goroutine。
這種工作竊取 (work-stealing) 算法確保了負載的動態平衡。當某個 P 的本地隊列爲空時, 它可以從其他 P 的隊列中竊取一半的 goroutine, 這有效地平衡了各個 P 之間的工作負載。
Go 運行時這麼做,主要還是減少 P 之間對獲取 goroutine 之間的競爭。本地隊列 runq 主要由持有它的 P 進行讀寫,只有在 "被偷" 的情況下,纔可能有 "數據競爭" 的問題,而這種情況發生概率較少,所以它設計了一個高效的 runq 數據結構來應對這麼場景。實際看起來和上面介紹的 PoolDequeue 有異曲同工之妙。
本文還會介紹 global queue 等數據結構,但不是本文的重點。
runq
在運行時中 P 是一個複雜的數據結構,下面列出了本文關注的它的幾個字段:
// 一個goroutine的指針
type guintptr uintptr
//go:nosplit
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }
//go:nosplit
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }
//go:nosplit
func (gp *guintptr) cas(old, new guintptr) bool {
return atomic.Casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new))
}
type p struct {
id int32
status uint32 // one of pidle/prunning/...
link puintptr
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
mcache *mcache
pcache pageCache
raceprocctx uintptr
deferpool []*_defer // pool of available defer structs (see panic.go)
deferpoolbuf [32]*_defer
// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
goidcache uint64
goidcacheend uint64
// 本地運行的無鎖循環隊列
runqhead uint32
runqtail uint32
runq [256]guintptr
// 如果非nil,是一個可優先運行的G
runnext guintptr
...
}
runq 是一個無鎖循環隊列,由數組實現,它的長度是 256,這個長度是固定的,不會動態調整。runqhead 和 runqtail 分別是隊列的頭和尾,runqhead 指向隊列的頭部,runqtail 指向隊列的尾部。runq 數組的每個元素是一個 guintptr 類型,它是一個 uintptr 類型的別名,用來存儲 g 的指針。
runq 的操作主要是 runqput、runqputslow、runqputbatch、runqget、runqdrain、runqgrab、runqsteal等方法。
接下來我們撿重點的方法看一下它是怎麼實現高效額度併發讀寫的.
runqput
runqput 方法是向 runq 中添加一個 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:
// runqput 嘗試將 g 放到本地可運行隊列上。
// 如果 next 爲 false,runqput 將 g 添加到可運行隊列的尾部。
// 如果 next 爲 true,runqput 將 g 放在 pp.runnext 位置。
// 如果可運行隊列已滿,runnext 將 g 放到全局隊列上。
// 只能由擁有 P 的所有者執行。
func runqput(pp *p, gp *g, next bool) {
if !haveSysmon && next {
// 如果沒有 sysmon,我們必須完全避免 runnext,否則會導致飢餓。
next = false
}
if randomizeScheduler && next && randn(2) == 0 {
// 如果隨機調度器打開,我們有一半的機會避免運行 runnext
next = false
}
// 如果 next 爲 true,優先處理 runnext
// 將當前的goroutine放到 runnext 中, 如果原來runnext中有goroutine, 則將其放到runq中
if next {
retryNext:
oldnext := pp.runnext
if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
// 重點來了,將goroutine放入runq中
retry:
h := atomic.LoadAcq(&pp.runqhead) // ①
t := pp.runqtail
if t-h < uint32(len(pp.runq)) { // ② 如果隊列未滿
pp.runq[t%uint32(len(pp.runq))].set(gp) // ③ 將goroutine放入隊列
atomic.StoreRel(&pp.runqtail, t+1) // ④ 更新隊尾
return
}
if runqputslow(pp, gp, h, t) { // ⑤ 如果隊列滿了,調用runqputslow 嘗試將goroutine放入全局隊列
return
}
// 如果隊列未滿,上面的操作應該已經成功返回,否則重試
goto retry
}
runqput 方法的實現非常簡單,它首先判斷是否需要優先處理 runnext,如果需要,就將 g 放到 runnext 中,然後再將 g 放到 runq 中。runq 的操作是無鎖的,它通過 atomic 包提供的原子操作來實現。這裏使用的內部的更精細化的原子操作,這個也是我後面專門有一篇文章來講解的。你現在大概把 ①、④ 理解爲Load、Store操作即可。
②、⑤ 分別處理本地隊列未滿和隊列已滿的情況,如果隊列未滿,就將 g 放到隊列中,然後更新隊尾;如果隊列已滿,就調用 runqputslow 方法,將 g 放到全局隊列中。
③ 處直接將 g 放到隊列中,這是因爲只有當前的 P 才能操作 runq,所以不會有併發問題。同時我們也可以看到,我們總是往尾部插入, t總是一直增加的, 取餘操作保證了循環隊列的特性。
runqputslow 會把本地隊列中的一半的 g 放到全局隊列中,包括當前要放入的 g。一旦涉及到全局隊列,就會有一定的競爭,Go 運行時使用了一把鎖來控制併發,所以 runqputslow 方法是一個慢路徑,是性能的瓶頸點。
runqputbatch
func runqputbatch(pp *p, q *gQueue, qsize int) 是批量往本地隊列中放入 g 的方法,比如它從其它 P 那裏偷來一批 g ,需要放到本地隊列中,就會調用這個方法。它的實現如下:
// runqputbatch 嘗試將 q 上的所有 G 放到本地可運行隊列上。
// 如果隊列已滿,它們將被放到全局隊列上;在這種情況下,這將暫時獲取調度器鎖。
// 只能由擁有 P 的所有者執行。
func runqputbatch(pp *p, q *gQueue, qsize int) {
h := atomic.LoadAcq(&pp.runqhead) // ①
t := pp.runqtail
n := uint32(0)
for !q.empty() && t-h < uint32(len(pp.runq)) { // ② 放入的批量goroutine非空, 並且本地隊列還足以放入
gp := q.pop()
pp.runq[t%uint32(len(pp.runq))].set(gp)
t++
n++
}
qsize -= int(n)
if randomizeScheduler { // ③ 隨機調度器, 隨機打亂
off := func(o uint32) uint32 {
return (pp.runqtail + o) % uint32(len(pp.runq))
}
for i := uint32(1); i < n; i++ {
j := cheaprandn(i + 1)
pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)]
}
}
atomic.StoreRel(&pp.runqtail, t) // ④ 更新隊尾
if !q.empty() {
lock(&sched.lock)
globrunqputbatch(q, int32(qsize))
unlock(&sched.lock)
}
}
① 獲取隊列頭, 使用原子操作獲取隊頭。
它下面一行是獲取隊尾的值,你可以思考下爲什麼不需要使用
atomic.LoadAcq。
② 逐個的將 g 放到隊列中,直到放完或者放滿。
如果是隨機調度器,則使用混淆算法將隊列中的 g 隨機打亂。
最後如果隊列還有剩餘的 g,則調用 globrunqputbatch 方法,將剩餘的 g 放到全局隊列中。
runqget
runqget 方法是從 runq 中獲取一個 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:
// runqget 從本地可運行隊列中獲取一個 G。
// 如果 inheritTime 爲 true,gp 應該繼承當前時間片的剩餘時間。
// 否則,它應該開始一個新的時間片。
// 只能由擁有 P 的所有者執行。
func runqget(pp *p) (gp *g, inheritTime bool) {
next := pp.runnext
// 如果有 runnext,優先處理 runnext
if next != 0 && pp.runnext.cas(next, 0) { // ①
return next.ptr(), true
}
for {
h := atomic.LoadAcq(&pp.runqhead) // ② 獲取隊頭
t := pp.runqtail
if t == h { // ③ 隊列爲空
return nil, false
}
gp := pp.runq[h%uint32(len(pp.runq))].ptr() // ④ 獲取隊頭的goroutine
if atomic.CasRel(&pp.runqhead, h, h+1) { // ⑤ 更新隊頭
return gp, false
}
}
}
① 如果有 runnext,則優先處理 runnext,將 runnext 中的 g 取出來。
② 獲取隊列頭。如果 ③ 隊列爲空,直接返回。
④ 獲取隊頭的 g,這就是要讀取的 g。
⑤ 更新隊頭,這裏使用的是 atomic.CasRel 方法,它是一個原子的 Compare-And-Swap 操作,用來更新隊頭。
可以看到這裏只使用到了隊列頭runqhead。
runqdrain
runqdrain 方法是從 runq 中獲取所有的 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:
// runqdrain 從 pp 的本地可運行隊列中獲取所有的 G 並返回。
// 只能由擁有 P 的所有者執行。
func runqdrain(pp *p) (drainQ gQueue, n uint32) {
oldNext := pp.runnext
if oldNext != 0 && pp.runnext.cas(oldNext, 0) {
drainQ.pushBack(oldNext.ptr()) // ① 將 runnext 中的goroutine放入隊列
n++
}
retry:
h := atomic.LoadAcq(&pp.runqhead) // ② 獲取隊頭
t := pp.runqtail
qn := t - h
if qn == 0 {
return
}
if qn > uint32(len(pp.runq)) { // ③ 居然超出隊列的長度了?
goto retry
}
if !atomic.CasRel(&pp.runqhead, h, h+qn) { // ④ 更新隊頭
goto retry
}
// ⑤ 將隊列中的goroutine放入隊列drainQ中
for i := uint32(0); i < qn; i++ {
gp := pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
drainQ.pushBack(gp)
n++
}
return
}
runqgrab
runqgrab 方法是從 runq 中獲取一半的 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:
// runqgrab 從 pp 的本地可運行隊列中獲取一半的 G 並返回。
// Batch 是一個環形緩衝區,從 batchHead 開始。
// 返回獲取的 goroutine 數量。
// 可以由任何 P 執行。
func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2 // ① 取一半的goroutine
if n == 0 {
if stealRunNextG {
// ② 如果要偷取runnext中的goroutine
if next := pp.runnext; next != 0 {
if pp.status == _Prunning {
// ② 如果要偷取runnext中的goroutine,這裏會sleep一會
if !osHasLowResTimer {
usleep(3)
} else {
osyield()
}
}
if !pp.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(pp.runq)/2) { // ③ 如果要偷取的goroutine數量超過一半, 重試
continue
}
// ④ 將隊列中至多一半的goroutine放入batch中
for i := uint32(0); i < n; i++ {
g := pp.runq[(h+i)%uint32(len(pp.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&pp.runqhead, h, h+n) { // ⑤ 更新隊頭
return n
}
}
}
① 取一半的 g,這裏是一個簡單的算法,取一半的 g。
② 如果要偷取 runnext 中的 g,則會嘗試偷取 runnext 中的 g。
③ 如果要偷取的 g 數量超過一半,則重試。
④ 將隊列中至多一半的 g 放入 batch 中。
⑤ 更新隊頭,這裏使用的是 atomic.CasRel 方法,它是一個原子的 Compare-And-Swap 操作,用來更新隊頭。
runqsteal
runqsteal 方法是從其它 P 的 runq 中偷取 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:
// runqsteal 從 p2 的本地可運行隊列中偷取一半的 G 並返回。
// 如果 stealRunNextG 爲 true,它還會嘗試偷取 runnext 中的 G。
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
t := pp.runqtail
n := runqgrab(p2, &pp.runq, t, stealRunNextG) // ① 從p2中偷取一半的goroutine
if n == 0 {
return nil
}
n--
gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr() // ② 獲取偷取的一個goroutine
if n == 0 {
return gp
}
h := atomic.LoadAcq(&pp.runqhead) // ③ 獲取隊頭
if t-h+n >= uint32(len(pp.runq)) { // ④ 如果隊列滿了,重置隊列
throw("runqsteal: runq overflow")
}
atomic.StoreRel(&pp.runqtail, t+n) // ⑤ 更新隊尾
return gp
}
它實際使用了 runqgrab 方法來偷取 g,然後再從 runq 中取出一個 g。
以上就是runq的主要操作,它針對 Go 調度器的特點,設計了一套特定的隊列操作的函數,這些函數都是無鎖的,不會阻塞,保證了高效的併發讀寫。
gQueue 和 gList
gQueue 和 gList 是 Go 運行時中的兩個隊列,它們都是用來存儲 g 的,但是它們的實現方式不同。
gQueue是一個 G 的雙端隊列,可以從首尾增加 gp, 通過 g.schedlink 鏈接。一個 G 只能在一個 gQueue 或 gList 上。
type gQueue struct {
head guintptr
tail guintptr
}
func (q *gQueue) empty() bool {
return q.head == 0
}
// push 將gp添加到q的頭部。
func (q *gQueue) push(gp *g) {
gp.schedlink = q.head
q.head.set(gp)
if q.tail == 0 {
q.tail.set(gp)
}
}
// pushBack 增加gp到q的尾部。
func (q *gQueue) pushBack(gp *g) {
gp.schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink.set(gp)
} else {
q.head.set(gp)
}
q.tail.set(gp)
}
// q2的所有G添加到q的尾部。之後不能再使用q2。
func (q *gQueue) pushBackAll(q2 gQueue) {
if q2.tail == 0 {
return
}
q2.tail.ptr().schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink = q2.head
} else {
q.head = q2.head
}
q.tail = q2.tail
}
// pop 移除並返回隊列q的頭部。如果q爲空,則返回nil。
func (q *gQueue) pop() *g {
gp := q.head.ptr()
if gp != nil {
q.head = gp.schedlink
if q.head == 0 {
q.tail = 0
}
}
return gp
}
// popList 將所有的元素從隊列q中取出並返回一個gList。
func (q *gQueue) popList() gList {
stack := gList{q.head}
*q = gQueue{}
return stack
}
而gList是一個 G 的鏈表,通過 g.schedlink 鏈接。一個 G 只能在一個 gQueue 或 gList 上。
type gList struct {
head guintptr
}
func (l *gList) empty() bool {
return l.head == 0
}
// push 將gp添加到l的頭部。
func (l *gList) push(gp *g) {
gp.schedlink = l.head
l.head.set(gp)
}
// pushAll 將q中的所有G添加到l的頭部。
func (l *gList) pushAll(q gQueue) {
if !q.empty() {
q.tail.ptr().schedlink = l.head
l.head = q.head
}
}
// pop 移除並返回l的頭部。如果l爲空,則返回nil。
func (l *gList) pop() *g {
gp := l.head.ptr()
if gp != nil {
l.head = gp.schedlink
}
return gp
}
這是常規的數據結構中鏈表的實現,你可以和教科書中的介紹和實現做對比,看看書本中的內容如何應用到顯示的工程中的。
global runq
一個全局的runq用來處理太多的 goroutine, 在本地runq中的 goroutine 太少的情況下,從全局隊列中偷取 goroutine。主要用來處理 P 中 goroutine 不均的情況。
因爲它直接使用一把鎖 (sched.lock),而不是 lock-free 的數據結構,所以代碼閱讀和理解起來會相對簡單一些。這裏就不詳細介紹了
var (
sched schedt
)
type schedt struct {
...
// Global runnable queue.
runq gQueue
runqsize int32
...
}
func globrunqput(gp *g) {
assertLockHeld(&sched.lock) // 保證鎖被持有
sched.runq.pushBack(gp)
sched.runqsize++
}
func globrunqputhead(gp *g) {
assertLockHeld(&sched.lock) // 保證鎖被持有
sched.runq.push(gp)
sched.runqsize++
}
func globrunqputbatch(batch *gQueue, n int32) {
assertLockHeld(&sched.lock) // 保證鎖被持有
sched.runq.pushBackAll(*batch)
sched.runqsize += n
*batch = gQueue{}
}
func globrunqget(pp *p, max int32) *g {
assertLockHeld(&sched.lock) // 保證鎖被持有
if sched.runqsize == 0 { // 如果全局隊列爲空
return nil
}
n := sched.runqsize/gomaxprocs + 1 // 從全局隊列中獲取goroutine的數量
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max { // 如果max大於0,取最小值
n = max
}
if n > int32(len(pp.runq))/2 { // 如果要獲取的goroutine數量超過一半,只取一半,不貪婪
n = int32(len(pp.runq)) / 2
}
sched.runqsize -= n
gp := sched.runq.pop() // 從全局隊列中獲取一個goroutine
n--
for ; n > 0; n-- { // 從全局隊列中獲取n-1個goroutine
gp1 := sched.runq.pop()
runqput(pp, gp1, false) // 將goroutine放入本地隊列
}
return gp // 返回獲取的goroutine
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/C8iQsaAKSQNxpoQeSEGTWg