Go 中祕而不宣的數據結構 runq- 難怪運行時調度那麼好

首先,讓我們先來回顧 Go 運行時的 GPM 模型。這方面的介紹網上的資料都非常非常多了,但是我們也不妨回顧一下:

GPM 模型中的 G 代表 goroutine。每個 goroutine 只佔用幾 KB 的內存, 可以輕鬆創建成千上萬個。G 包含了 goroutine 的棧、指令指針和其他信息, 如阻塞 channel 的等待隊列等。

P 代表 processor, 可以理解爲一個抽象的 CPU 核心。P 的數量默認等於實際的 CPU 核心數, 但可以通過環境變量進行調整。P 維護了一個本地的 goroutine 隊列, 還負責執行 goroutine 並管理與之關聯的上下文信息。

M 代表 machine, 是操作系統線程。一個 M 必須綁定一個 P 才能執行 goroutine。當一個 M 阻塞時, 運行時會創建一個新的 M 或者複用一個空閒的 M 來保證 P 的數量總是等於 GOMAXPROCS 的值, 從而充分利用 CPU 資源。

在這個模型中, P 扮演了承上啓下的角色。它連接了 G 和 M, 實現了用戶層級的 goroutine 到操作系統線程的映射。這種設計允許 Go 在用戶空間進行調度, 避免了頻繁的系統調用, 大大提高了併發效率。

調度過程中, 當一個 goroutine 被創建時, 它會被放到 P 的本地隊列或全局隊列中。如果 P 的本地隊列已滿, 一些 goroutine 會被放到全局隊列。當 P 執行完當前的 goroutine 後, 會優先從本地隊列獲取新的 goroutine 來執行。如果本地隊列爲空, P 會嘗試從全局隊列或其他 P 的隊列中偷取 goroutine。

這種工作竊取 (work-stealing) 算法確保了負載的動態平衡。當某個 P 的本地隊列爲空時, 它可以從其他 P 的隊列中竊取一半的 goroutine, 這有效地平衡了各個 P 之間的工作負載。

Go 運行時這麼做,主要還是減少 P 之間對獲取 goroutine 之間的競爭。本地隊列 runq 主要由持有它的 P 進行讀寫,只有在 "被偷" 的情況下,纔可能有 "數據競爭" 的問題,而這種情況發生概率較少,所以它設計了一個高效的 runq 數據結構來應對這麼場景。實際看起來和上面介紹的 PoolDequeue 有異曲同工之妙。

本文還會介紹 global queue 等數據結構,但不是本文的重點。

runq

在運行時中 P 是一個複雜的數據結構,下面列出了本文關注的它的幾個字段:

// 一個goroutine的指針
type guintptr uintptr

//go:nosplit
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }

//go:nosplit
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }

//go:nosplit
func (gp *guintptr) cas(old, new guintptr) bool {
 return atomic.Casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new))
}

type p struct {
 id          int32
 status      uint32 // one of pidle/prunning/...
 link        puintptr
 schedtick   uint32     // incremented on every scheduler call
 syscalltick uint32     // incremented on every system call
 sysmontick  sysmontick // last tick observed by sysmon
 m           muintptr   // back-link to associated m (nil if idle)
 mcache      *mcache
 pcache      pageCache
 raceprocctx uintptr

 deferpool    []*_defer // pool of available defer structs (see panic.go)
 deferpoolbuf [32]*_defer

 // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
 goidcache    uint64
 goidcacheend uint64

 // 本地運行的無鎖循環隊列
 runqhead uint32
 runqtail uint32
 runq     [256]guintptr

 // 如果非nil,是一個可優先運行的G
 runnext guintptr

 ...
}

runq 是一個無鎖循環隊列,由數組實現,它的長度是 256,這個長度是固定的,不會動態調整。runqheadrunqtail 分別是隊列的頭和尾,runqhead 指向隊列的頭部,runqtail 指向隊列的尾部。runq 數組的每個元素是一個 guintptr 類型,它是一個 uintptr 類型的別名,用來存儲 g 的指針。

runq 的操作主要是 runqputrunqputslowrunqputbatchrunqgetrunqdrainrunqgrabrunqsteal等方法。

接下來我們撿重點的方法看一下它是怎麼實現高效額度併發讀寫的.

runqput

runqput 方法是向 runq 中添加一個 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:

// runqput 嘗試將 g 放到本地可運行隊列上。
// 如果 next 爲 false,runqput 將 g 添加到可運行隊列的尾部。
// 如果 next 爲 true,runqput 將 g 放在 pp.runnext 位置。
// 如果可運行隊列已滿,runnext 將 g 放到全局隊列上。
// 只能由擁有 P 的所有者執行。
func runqput(pp *p, gp *g, next bool) {
 if !haveSysmon && next {
        // 如果沒有 sysmon,我們必須完全避免 runnext,否則會導致飢餓。
  next = false
 }
 if randomizeScheduler && next && randn(2) == 0 {
        // 如果隨機調度器打開,我們有一半的機會避免運行 runnext
  next = false
 }

    // 如果 next 爲 true,優先處理 runnext
    // 將當前的goroutine放到 runnext 中, 如果原來runnext中有goroutine, 則將其放到runq中
 if next {
 retryNext:
  oldnext := pp.runnext
  if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
   goto retryNext
  }
  if oldnext == 0 {
   return
  }
  // Kick the old runnext out to the regular run queue.
  gp = oldnext.ptr()
 }

    // 重點來了,將goroutine放入runq中
retry:
 h := atomic.LoadAcq(&pp.runqhead) // ①
 t := pp.runqtail
 if t-h < uint32(len(pp.runq)) { // ② 如果隊列未滿
  pp.runq[t%uint32(len(pp.runq))].set(gp) // ③ 將goroutine放入隊列
  atomic.StoreRel(&pp.runqtail, t+1) // ④ 更新隊尾
  return
 }
 if runqputslow(pp, gp, h, t) { // ⑤ 如果隊列滿了,調用runqputslow 嘗試將goroutine放入全局隊列
  return
 }
 // 如果隊列未滿,上面的操作應該已經成功返回,否則重試
 goto retry
}

runqput 方法的實現非常簡單,它首先判斷是否需要優先處理 runnext,如果需要,就將 g 放到 runnext 中,然後再將 g 放到 runq 中。runq 的操作是無鎖的,它通過 atomic 包提供的原子操作來實現。這裏使用的內部的更精細化的原子操作,這個也是我後面專門有一篇文章來講解的。你現在大概把 ①、④ 理解爲LoadStore操作即可。

②、⑤ 分別處理本地隊列未滿和隊列已滿的情況,如果隊列未滿,就將 g 放到隊列中,然後更新隊尾;如果隊列已滿,就調用 runqputslow 方法,將 g 放到全局隊列中。

③ 處直接將 g 放到隊列中,這是因爲只有當前的 P 才能操作 runq,所以不會有併發問題。同時我們也可以看到,我們總是往尾部插入, t總是一直增加的, 取餘操作保證了循環隊列的特性。

runqputslow 會把本地隊列中的一半的 g 放到全局隊列中,包括當前要放入的 g。一旦涉及到全局隊列,就會有一定的競爭,Go 運行時使用了一把鎖來控制併發,所以 runqputslow 方法是一個慢路徑,是性能的瓶頸點。

runqputbatch

func runqputbatch(pp *p, q *gQueue, qsize int) 是批量往本地隊列中放入 g 的方法,比如它從其它 P 那裏偷來一批 g ,需要放到本地隊列中,就會調用這個方法。它的實現如下:

// runqputbatch 嘗試將 q 上的所有 G 放到本地可運行隊列上。
// 如果隊列已滿,它們將被放到全局隊列上;在這種情況下,這將暫時獲取調度器鎖。
// 只能由擁有 P 的所有者執行。
func runqputbatch(pp *p, q *gQueue, qsize int) {
 h := atomic.LoadAcq(&pp.runqhead) // ①
 t := pp.runqtail
 n := uint32(0)
 for !q.empty() && t-h < uint32(len(pp.runq)) { // ② 放入的批量goroutine非空, 並且本地隊列還足以放入
  gp := q.pop()
  pp.runq[t%uint32(len(pp.runq))].set(gp)
  t++
  n++
 }
 qsize -= int(n)

 if randomizeScheduler { // ③ 隨機調度器, 隨機打亂
  off := func(o uint32) uint32 {
   return (pp.runqtail + o) % uint32(len(pp.runq))
  }
  for i := uint32(1); i < n; i++ {
   j := cheaprandn(i + 1)
   pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)]
  }
 }

 atomic.StoreRel(&pp.runqtail, t) // ④ 更新隊尾
 if !q.empty() {
  lock(&sched.lock)
  globrunqputbatch(q, int32(qsize))
  unlock(&sched.lock)
 }
}

① 獲取隊列頭, 使用原子操作獲取隊頭。

它下面一行是獲取隊尾的值,你可以思考下爲什麼不需要使用atomic.LoadAcq

② 逐個的將 g 放到隊列中,直到放完或者放滿。

如果是隨機調度器,則使用混淆算法將隊列中的 g 隨機打亂。

最後如果隊列還有剩餘的 g,則調用 globrunqputbatch 方法,將剩餘的 g 放到全局隊列中。

runqget

runqget 方法是從 runq 中獲取一個 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:

// runqget 從本地可運行隊列中獲取一個 G。
// 如果 inheritTime 爲 true,gp 應該繼承當前時間片的剩餘時間。
// 否則,它應該開始一個新的時間片。
// 只能由擁有 P 的所有者執行。
func runqget(pp *p) (gp *g, inheritTime bool) {
 next := pp.runnext
    // 如果有 runnext,優先處理 runnext
 if next != 0 && pp.runnext.cas(next, 0) { // ①
  return next.ptr(), true
 }

 for {
  h := atomic.LoadAcq(&pp.runqhead) // ② 獲取隊頭
  t := pp.runqtail
  if t == h { // ③ 隊列爲空
   return nil, false
  }
  gp := pp.runq[h%uint32(len(pp.runq))].ptr() // ④ 獲取隊頭的goroutine
  if atomic.CasRel(&pp.runqhead, h, h+1) { // ⑤ 更新隊頭
   return gp, false
  }
 }
}

① 如果有 runnext,則優先處理 runnext,將 runnext 中的 g 取出來。

② 獲取隊列頭。如果 ③ 隊列爲空,直接返回。

④ 獲取隊頭的 g,這就是要讀取的 g

⑤ 更新隊頭,這裏使用的是 atomic.CasRel 方法,它是一個原子的 Compare-And-Swap 操作,用來更新隊頭。

可以看到這裏只使用到了隊列頭runqhead

runqdrain

runqdrain 方法是從 runq 中獲取所有的 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:

// runqdrain 從 pp 的本地可運行隊列中獲取所有的 G 並返回。
// 只能由擁有 P 的所有者執行。
func runqdrain(pp *p) (drainQ gQueue, n uint32) {
 oldNext := pp.runnext
 if oldNext != 0 && pp.runnext.cas(oldNext, 0) {
  drainQ.pushBack(oldNext.ptr()) // ① 將 runnext 中的goroutine放入隊列
  n++
 }

retry:
 h := atomic.LoadAcq(&pp.runqhead) // ② 獲取隊頭
 t := pp.runqtail
 qn := t - h
 if qn == 0 {
  return
 }
 if qn > uint32(len(pp.runq)) { // ③ 居然超出隊列的長度了?
  goto retry
 }

 if !atomic.CasRel(&pp.runqhead, h, h+qn) { // ④ 更新隊頭
  goto retry
 }

    // ⑤ 將隊列中的goroutine放入隊列drainQ中
 for i := uint32(0); i < qn; i++ {
  gp := pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
  drainQ.pushBack(gp)
  n++
 }
 return
}

runqgrab

runqgrab 方法是從 runq 中獲取一半的 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:

// runqgrab 從 pp 的本地可運行隊列中獲取一半的 G 並返回。
// Batch 是一個環形緩衝區,從 batchHead 開始。
// 返回獲取的 goroutine 數量。
// 可以由任何 P 執行。
func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
 for {
  h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
  t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
  n := t - h
  n = n - n/2 // ① 取一半的goroutine
  if n == 0 {
   if stealRunNextG {
                // ② 如果要偷取runnext中的goroutine
    if next := pp.runnext; next != 0 {
     if pp.status == _Prunning {
                        // ② 如果要偷取runnext中的goroutine,這裏會sleep一會
      if !osHasLowResTimer {
       usleep(3)
      } else {
       osyield()
      }
     }
     if !pp.runnext.cas(next, 0) {
      continue
     }
     batch[batchHead%uint32(len(batch))] = next
     return 1
    }
   }
   return 0
  }
  if n > uint32(len(pp.runq)/2) { // ③ 如果要偷取的goroutine數量超過一半, 重試
   continue
  }

        // ④ 將隊列中至多一半的goroutine放入batch中
  for i := uint32(0); i < n; i++ {
   g := pp.runq[(h+i)%uint32(len(pp.runq))]
   batch[(batchHead+i)%uint32(len(batch))] = g
  }
  if atomic.CasRel(&pp.runqhead, h, h+n) { // ⑤ 更新隊頭
   return n
  }
 }
}

① 取一半的 g,這裏是一個簡單的算法,取一半的 g

② 如果要偷取 runnext 中的 g,則會嘗試偷取 runnext 中的 g

③ 如果要偷取的 g 數量超過一半,則重試。

④ 將隊列中至多一半的 g 放入 batch 中。

⑤ 更新隊頭,這裏使用的是 atomic.CasRel 方法,它是一個原子的 Compare-And-Swap 操作,用來更新隊頭。

runqsteal

runqsteal 方法是從其它 Prunq 中偷取 g 的方法,它是一個無鎖的操作,不會阻塞。它的實現如下:

// runqsteal 從 p2 的本地可運行隊列中偷取一半的 G 並返回。
// 如果 stealRunNextG 爲 true,它還會嘗試偷取 runnext 中的 G。
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
 t := pp.runqtail
 n := runqgrab(p2, &pp.runq, t, stealRunNextG) // ① 從p2中偷取一半的goroutine
 if n == 0 {
  return nil
 }
 n--
 gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr() // ② 獲取偷取的一個goroutine
 if n == 0 {
  return gp
 }
 h := atomic.LoadAcq(&pp.runqhead) // ③ 獲取隊頭
 if t-h+n >= uint32(len(pp.runq)) { // ④ 如果隊列滿了,重置隊列
  throw("runqsteal: runq overflow")
 }
 atomic.StoreRel(&pp.runqtail, t+n) // ⑤ 更新隊尾
 return gp
}

它實際使用了 runqgrab 方法來偷取 g,然後再從 runq 中取出一個 g

以上就是runq的主要操作,它針對 Go 調度器的特點,設計了一套特定的隊列操作的函數,這些函數都是無鎖的,不會阻塞,保證了高效的併發讀寫。

gQueuegList

gQueuegList 是 Go 運行時中的兩個隊列,它們都是用來存儲 g 的,但是它們的實現方式不同。

gQueue是一個 G 的雙端隊列,可以從首尾增加 gp, 通過 g.schedlink 鏈接。一個 G 只能在一個 gQueue 或 gList 上。

type gQueue struct {
 head guintptr
 tail guintptr
}
func (q *gQueue) empty() bool {
 return q.head == 0
}

// push 將gp添加到q的頭部。
func (q *gQueue) push(gp *g) {
 gp.schedlink = q.head
 q.head.set(gp)
 if q.tail == 0 {
  q.tail.set(gp)
 }
}

// pushBack 增加gp到q的尾部。
func (q *gQueue) pushBack(gp *g) {
 gp.schedlink = 0
 if q.tail != 0 {
  q.tail.ptr().schedlink.set(gp)
 } else {
  q.head.set(gp)
 }
 q.tail.set(gp)
}

// q2的所有G添加到q的尾部。之後不能再使用q2。
func (q *gQueue) pushBackAll(q2 gQueue) {
 if q2.tail == 0 {
  return
 }
 q2.tail.ptr().schedlink = 0
 if q.tail != 0 {
  q.tail.ptr().schedlink = q2.head
 } else {
  q.head = q2.head
 }
 q.tail = q2.tail
}

// pop 移除並返回隊列q的頭部。如果q爲空,則返回nil。
func (q *gQueue) pop() *g {
 gp := q.head.ptr()
 if gp != nil {
  q.head = gp.schedlink
  if q.head == 0 {
   q.tail = 0
  }
 }
 return gp
}

// popList 將所有的元素從隊列q中取出並返回一個gList。
func (q *gQueue) popList() gList {
 stack := gList{q.head}
 *q = gQueue{}
 return stack
}

gList是一個 G 的鏈表,通過 g.schedlink 鏈接。一個 G 只能在一個 gQueue 或 gList 上。

type gList struct {
 head guintptr
}
func (l *gList) empty() bool {
 return l.head == 0
}

// push 將gp添加到l的頭部。
func (l *gList) push(gp *g) {
 gp.schedlink = l.head
 l.head.set(gp)
}

// pushAll 將q中的所有G添加到l的頭部。
func (l *gList) pushAll(q gQueue) {
 if !q.empty() {
  q.tail.ptr().schedlink = l.head
  l.head = q.head
 }
}

// pop 移除並返回l的頭部。如果l爲空,則返回nil。
func (l *gList) pop() *g {
 gp := l.head.ptr()
 if gp != nil {
  l.head = gp.schedlink
 }
 return gp
}

這是常規的數據結構中鏈表的實現,你可以和教科書中的介紹和實現做對比,看看書本中的內容如何應用到顯示的工程中的。

global runq

一個全局的runq用來處理太多的 goroutine, 在本地runq中的 goroutine 太少的情況下,從全局隊列中偷取 goroutine。主要用來處理 P 中 goroutine 不均的情況。

因爲它直接使用一把鎖 (sched.lock),而不是 lock-free 的數據結構,所以代碼閱讀和理解起來會相對簡單一些。這裏就不詳細介紹了

var (
 sched      schedt
)

type schedt struct {
 ...
 // Global runnable queue.
 runq     gQueue
 runqsize int32
    ...
}
func globrunqput(gp *g) {
 assertLockHeld(&sched.lock) // 保證鎖被持有

 sched.runq.pushBack(gp)
 sched.runqsize++
}


func globrunqputhead(gp *g) {
 assertLockHeld(&sched.lock) // 保證鎖被持有

 sched.runq.push(gp)
 sched.runqsize++
}

func globrunqputbatch(batch *gQueue, n int32) {
 assertLockHeld(&sched.lock) // 保證鎖被持有

 sched.runq.pushBackAll(*batch)
 sched.runqsize += n
 *batch = gQueue{}
}

func globrunqget(pp *p, max int32) *g {
 assertLockHeld(&sched.lock) // 保證鎖被持有

 if sched.runqsize == 0 { // 如果全局隊列爲空
  return nil
 }

 n := sched.runqsize/gomaxprocs + 1 // 從全局隊列中獲取goroutine的數量
 if n > sched.runqsize {
  n = sched.runqsize
 }
 if max > 0 && n > max { // 如果max大於0,取最小值
  n = max
 }
 if n > int32(len(pp.runq))/2 { // 如果要獲取的goroutine數量超過一半,只取一半,不貪婪
  n = int32(len(pp.runq)) / 2
 }

 sched.runqsize -= n

 gp := sched.runq.pop() // 從全局隊列中獲取一個goroutine
 n--
 for ; n > 0; n-- { // 從全局隊列中獲取n-1個goroutine
  gp1 := sched.runq.pop()
  runqput(pp, gp1, false) // 將goroutine放入本地隊列
 }
 return gp // 返回獲取的goroutine
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/C8iQsaAKSQNxpoQeSEGTWg