最全 Go select 底層原理,一文學透高頻用法

在日常開發中,select 語句被高頻使用。但目前,全網分析 select 在編譯期和運行時的完整底層原理資料,非常匱乏。本文基於 Go1.18.1 版本的源碼,講解 select 訪問 Channel 在編譯期和運行時的底層原理——select 編譯器優化用到的 src/cmd/compile/internal/walk/select.go 的 walkSelectCases() 函數和多 case 情況下運行時用到的 runtime.selectgo() 函數。希望能幫助到各位開發者。

在對 Channel 的讀寫方式上,除了我們通用的讀 i <- ch, i, ok <- ch,寫 ch <- 1 這種阻塞訪問方式,還有 select 關鍵字提供的非阻塞訪問方式。

在日常開發中,select 語句還是會經常用到的。可能是 channel 普通讀寫的使用頻率比 select 高,網上關於 Channel 源碼的分析文章很多,關於 select 用法的文章也很多,select 運行時的 selectgo 函數的分析也有一些,但是關於 select 在編譯期和運行時的完整的底層原理的分析文章並不多。

本文的分析基於 Go1.18.1 版本的源碼,主要分析 select 編譯器優化用到的 src/cmd/compile/internal/walk/select.go 的 walkSelectCases() 函數和多 case 情況下運行時用到的 runtime.selectgo() 函數。

結論先行

爲了節省各位開發者時間,本文先給出結論,若您時間不足可以先看完本節並收藏,後續再持續閱讀消化:

第一,Go select 語句採用的多路複用思想,本質上是爲了達到通過一個協程同時處理多個 IO 請求(Channel 讀寫事件)。

第二,select 的基本用法是:通過多個 case 監聽多個 Channel 的讀寫操作,任何一個 case 可以執行則選擇該 case 執行,否則執行 default。如果沒有 default,且所有的 case 均不能執行,則當前的 goroutine 阻塞。

第三,編譯器會對 select 有不同的 case 的情況進行優化以提高性能。首先,編譯器對 select 沒有 case、有單 case 和單 case+default 的情況進行單獨處理。這些處理或者直接調用運行時函數,或者直接轉成對 channel 的操作,或者以非阻塞的方式訪問 channel,多種靈活的處理方式能夠提高性能,尤其是避免對 channel 的加鎖。

第四,對最常出現的 select 有多 case 的情況,會調用 runtime.selectgo() 函數來獲取執行 case 的索引,並生成 if 語句執行該 case 的代碼。

第五,selectgo 函數的執行分爲四個步驟:首先,隨機生成一個遍歷 case 的輪詢順序 pollorder 並根據 channel 地址生成加鎖順序 lockorder,隨機順序能夠避免 channel 飢餓,保證公平性,加鎖順序能夠避免死鎖;然後,根據 pollorder 的順序查找 scases 是否有可以立即收發的 channel,如果有則獲取 case 索引進行處理;再次,如果 pollorder 順序上沒有可以直接處理的 case,則將當前 goroutine 加入各 case 的 channel 對應的收發隊列上並等待其他 goroutine 的喚醒;最後,當調度器喚醒當前 goroutine 時,會再次按照 lockorder 遍歷所有的 case,從中查找需要被處理的 case 索引進行讀寫處理,同時從所有 case 的發送接收隊列中移除掉當前 goroutine。

select 是什麼?怎麼用?

select 是 Go 在語言層面提供的 I/O 多路複用的機制,其專門用來檢測多個 channel 是否準備完畢:可讀或可寫。

1)什麼是 IO 多路複用?

我們一看到 select,就知道它原本是 Linux 操作系統中的系統調用。操作系統提供 select、poll 和 epoll 等函數構建 I/O 多路複用模型提升程序處理 IO 事件如網絡請求的性能。Go 語言的 select 與操作系統中的 select 比較相似但又不完全相同。

操作系統中 IO 多路複用中多路就是多個 TCP 連接,複用就是指複用一個或少量線程,理解起來就是多個網絡連接的 IO 事件複用一個或少量線程來處理這些連接。一句話概括就是,IO 多路複用就是複用一個線程處理多個 IO 請求

普通多線程 IO 如圖 1.1 所示,每來一個 IO 事件,比如網絡讀寫請求事件,操作系統都會起一個線程或進程進行處理。這種方式的缺點很明顯:對多個 IO 事件,系統需要創建和維護對應的多個線程或進程。 大多數時候,大部分 IO 事件是處於等待狀態,只有少部分會立即操作完成,這會導致對應的處理線程大部分時候處於等待狀態,系統爲此還需要多做很多額外的線程或者進程的管理工作。

圖 1.1 普通多線程 IO

IO 多路複用的基本原理如圖 1.2 所示。通過複用可以使一個線程處理多個 IO 事件。操作系統無需對額外的多個線程或者進程進行管理,節約了資源,提升了效率。

圖 1.2 IO 多路複用

操作系統中實現 IO 多路複用的命令 select、poll、epoll,主要通過起一個線程來監聽並處理多個文件描述符代表的 TCP 鏈接,用來提高處理網絡讀寫請求的效率。而 Go 語言的 select 命令,是用來起一個 goroutine 協程監聽多個 Channel(代表多個 goroutine)的讀寫事件,提高從多個 Channel 獲取信息的效率。二者具體目標和實現不同,但本質思想都是相同的。

2)select 怎麼用?

select 命令的基本語法如下:

select {
  case <- chan1:
    // 如果 chan1 成功讀到數據,則進行該 case 處理語句
  case chan2 <- 1:
    // 如果成功向 chan2 寫入數據,則進行該 case 處理語句
  default:
    // 如果上面都沒有成功,則進入default處理流程
}

select 的結構跟 switch 有些相似,不過僅僅只是形式上相似而已,本質上大爲不同。select 中的多個 case 的表達式必須都是 Channel 的讀寫操作,不能是其他的數據類型。select 通過多個 case 監聽多個 Channel 的讀寫操作,任何一個 case 可以執行則選擇該 case 執行,否則執行 default。如果沒有 default,且所有的 case 均不能執行,則當前的 goroutine 阻塞。

Go 執行如下的代碼:

package main

func main() {
  select {
  }
}

會發生程序因爲 select 所在 goroutine 永久阻塞而失敗的現象:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:
...

對於空的 select 語句,程序會被阻塞,確切的說是當前協程被阻塞,同時 Go 自帶死鎖檢測機制,當發現當前協程再也沒有機會被喚醒時,則會發生 panic。所以上述程序會 panic。

Go 執行如下代碼:

package main
import (
  "fmt"
)

func main() {
  ch1 := make(chan int, 1)
  ch2 := make(chan int)
  select {
  case <- ch1:
      // 從有緩衝chan中讀取數據,由於緩衝區沒有數據且沒有發送者,該分支會阻塞
      fmt.Println("Received from ch")
  case i := <- ch2:
            // 從無緩衝chan中讀取數據,由於沒有發送者,該分支會阻塞
      fmt.Printf("i is: %d", i)
  }
}

程序會發生因所有 case 不滿足執行條件,且沒有 default 分支,而阻塞,由於 Go 自帶死鎖檢測機制,當發現當前協程再也沒有機會被喚醒時,則會發生 panic:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
...

 如果修改代碼如下:

package main
import (
  "fmt"
)

func main() {
  ch1 := make(chan int, 1)
  select {
  case <- ch1:
      // 從有緩衝chan中讀取數據,由於緩衝區沒有數據且沒有發送者,該分支會阻塞
      fmt.Println("Received from ch")
  default:
      fmt.Println("this is default")
  }
}

select 有一個 case 分支和 default 分支,當 case 分支不滿足執行條件時執行 default 分支:

this is default

如果有滿足的分支,則執行對應的分支:

package main

import (
  "fmt"
)

func main() {
  ch1 := make(chan int, 1)
  ch1 <- 10
  select {
  case <- ch1:
      // ch1有發送者,該分支滿足執行條件
      fmt.Println("Received from ch1")
  default:
      fmt.Println("this is default")
  }
}

程序運行後,輸出結果如下:

Received from ch1
package main
import (
  "fmt"
)
func main() {
  ch := make(chan int, 1)
  ch <- 10
  select {
  case val := <-ch:
    fmt.Println("Received from ch1, val =", val)
  case val := <-ch:
    fmt.Println("Received from ch2, val =", val)
  case val := <-ch:
    fmt.Println("Received from ch3, val =", val)
  default:
    fmt.Println("Run in default")
  }
}

程序運行後,輸出結果如下:

Received from ch2, val = 10

如果多次運行該程序,會發現,第一個 case、第二個 case 和第三個 case 都會被執行。也就是說,此時所有分支條件都滿足,則隨機選擇一個 case 執行。

select 在編譯期和運行時的執行過程

1)select 的實現原理

select 在 Go 語言的源代碼中不存在對應的結構體,只是定義了一個 runtime.scase 結構體(在 src/runtime/select.go)表示每個 case 語句 (包含 defaut):

type scase struct {
  c    *hchan         // case中使用的chan
  elem unsafe.Pointer // 指向case包含數據的指針
}

因爲所有的非 default 的 case 基本都要求是對 Channel 的讀寫操作,所以 runtime.scase 結構體中也包含一個 runtime.hchan 類型的字段存儲 case 中使用的 Channel,另一個字段 elem 指向 case 條件包含的數據的指針,如 case ch1 <- 1,則 elem 指向常量 1。

select 語句在編譯期間會被轉換成 ir.OSELECT 類型的節點,見 src/cmd/compile/internal/walk/stmt.go 的 walkStmt() 函數:

func walkStmt(n ir.Node) ir.Node {
        ......
        switch n.Op() {
        ......
   case ir.OSELECT:
    n := n.(*ir.SelectStmt)
    walkSelect(n)
    return n        
        ......
        }
        ......
}

處理 OSELECT 類型節點的函數是 src/cmd/compile/internal/walk/select.go 的 walkSelect() 函數:

func walkSelect(sel *ir.SelectStmt) {
  lno := ir.SetPos(sel)
  if sel.Walked() {
    base.Fatalf("double walkSelect")
  }
  sel.SetWalked(true)

  init := ir.TakeInit(sel)
        // 編譯器在中間代碼生成期間會根據select中case的不同對控制語句進行優化
  init = append(init, walkSelectCases(sel.Cases)...)
  sel.Cases = nil

  sel.Compiled = init
  walkStmtList(sel.Compiled)

  base.Pos = lno
}

編譯器在中間代碼生成期間會根據 select 中 case 的不同對控制語句進行優化,這一過程都發生在 src/cmd/compile/internal/walk/select.go 的 walkSelectCases() 函數中。

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // 編譯器優化: select 沒有case時
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // 編譯器優化: select只有一個case時
  if ncas == 1 {
        ......
        }
        ......
}

下面主要是分多種情況分析 walkSelectCases() 函數對不同 case 分支條件的處理,不同的情況會調用不同的運行時函數。如圖 2.1 所示,是編譯器對不同的 case 情況的處理,在運行時會調用不同的函數

圖 2.1   編譯器對不同的 case 情況在運行時調用不同的函數

2)當 select 沒有 case

從 1.2.2 小節的事例,我們可以知道,當 select 沒有 case 時,select 所在的 goroutine 會永久阻塞,程序會直接 panic。

select{
}

從 walkSelectCases() 函數對無 case 的處理邏輯,可以看到,該種情況會直接調用 runtime.block() 函數:

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // 編譯器優化: select沒有case時
  if ncas == 0 {
       return []ir.Node{mkcallstmt("block")}
  }
        ......
}

runtime.block() 函數會調用 gopark() 函數以 waitReasonSelectNoCases 的原因掛起當前協程,並且永遠無法被喚醒,Go 程序檢測到這種情況,直接 panic:

// src/runtime/select.go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // forever
}

3)當 select 只有一個非 default 的 case

select 只有一個非 default 的 case 時,只有一個 channel,實際會被編譯器轉換爲對該 channel 的讀寫操作,和實際調用 data := <- ch 或 ch <- data 並沒有什麼區別:

ch := make(chan struct{})
select {
case data <- ch:
    fmt.Printf("ch data: %v\n", data)
}

該段代碼的 select 語句,會被編譯器轉換爲:

data := <- ch
fmt.Printf("ch data: %v\n", data)

讀取 ch 成功後,才能執行該分支的語句,否則程序一直會阻塞。具體的實現原理在 walkSelectCases() 函數中:

// src/cmd/compile/internal/walk/select.go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ......
  // 編譯器優化: select只有一個case時
  if ncas == 1 {
    cas := cases[0]         // 獲取第一個也是唯一的一個case
    ir.SetPos(cas)
    l := cas.Init()
    if cas.Comm != nil {    // case類型不是default:
      n := cas.Comm   // 獲取case的條件語句
      l = append(l, ir.TakeInit(n)...)
      switch n.Op() { // 檢查case對channel的操作類型:讀或寫
      default:        // 如果case既不是讀,也不是寫channel,則直接報錯
        base.Fatalf("select %v", n.Op())

      case ir.OSEND:
        // 如果對chan操作是寫入類型,編譯器無須做任何轉換,直接是 chan <- data

      case ir.OSELRECV2:
                                // 如果對chan操作是接收類型, 完整形式爲:data, ok := <- chan
        r := n.(*ir.AssignListStmt)
                                // 如果具體是<- chan這種形式,即接收字段 data和ok爲空,則直接轉成 <- chan
        if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
          n = r.Rhs[0]
          break
        }
                                // 否則,是 data, ok := <- chan 這種形式
        r.SetOp(ir.OAS2RECV)
      }
                        // 把編譯器處理後的case語句條件加入待執行語句列表
      l = append(l, n)
    }
                // 把case條件後要執行的語句體加入待執行語句列表
    l = append(l, cas.Body...)
                // 默認加入break類型語句,跳出select-case語句體
    l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
    return l
  }
        ......
}

從註釋中可以看出,在 select 只有一個 case 並且這個 case 不是 default 時,select 對 case 的處理就是對普通 channel 的讀寫操作。

4)當 select 有一個 channel 的 case + 一個 default 的 case

在很多講 Channel 的文章中,打印下面代碼的彙編,會看到 select 只有一個操作 channel 的 case 和一個 default 時,會調用編譯器的 runtime.selectnbrecv() 函數和 runtime.selectnbsend() 函數。

package main
import (
  "fmt"
)
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    fmt.Println("run case 1")
  default:
    fmt.Println("run default")
  }
}

編譯器會將其改寫爲:

if selectnbsend(ch, 1) {
    fmt.Println("run case 1")
} else {
    fmt.Println("run default")
}

檢查 walkSelectCases() 函數:

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ......
  // 編譯器優化: case 有兩個case,一個是普通的channel操作,一個是default
  if ncas == 2 && dflt != nil {
                // 獲取非default的case
    cas := cases[0] 
    if cas == dflt {
      cas = cases[1]
    }

    n := cas.Comm
    ir.SetPos(n)
    r := ir.NewIfStmt(base.Pos, nil, nil, nil)
    r.SetInit(cas.Init())
    var cond ir.Node
    switch n.Op() {
    default:
      base.Fatalf("select %v", n.Op())

    case ir.OSEND:
      // 如果該case是對channel的寫入操作,則調用運行時的selectnbsend 函數
      n := n.(*ir.SendStmt)
      ch := n.Chan
      cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)

    case ir.OSELRECV2:
                        // 如果該case是對channel的讀取操作,會調用運行時的selectnbrecv 函數
      n := n.(*ir.AssignListStmt)
      recv := n.Rhs[0].(*ir.UnaryExpr)
      ch := recv.X
      elem := n.Lhs[0]
      if ir.IsBlank(elem) {
        elem = typecheck.NodNil()
      }
      cond = typecheck.Temp(types.Types[types.TBOOL])
      fn := chanfn("selectnbrecv", 2, ch.Type())
      call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
      as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
      r.PtrInit().Append(typecheck.Stmt(as))
    }

    r.Cond = typecheck.Expr(cond)
    r.Body = cas.Body
                // 將default語句放入if語句的else分支
    r.Else = append(dflt.Init(), dflt.Body...)
    return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
  }
        ......
}

runtime.selectnbrecv() 函數和 runtime.selectnbsend() 函數會分別調用 runtime.cahnrecv() 函數和 runtime.chansend() 函數,我們可以看到傳入這兩個函數的第三個參數都是 false,該參數是 block,爲 false 代表非阻塞,即每次嘗試從 channel 讀寫值,如果不成功則直接返回,不會阻塞。

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

5)當select有多個channel的case

如果對如下代碼打印彙編,會發現執行 select 動作實際是調用的 runtime.selectgo() 函數:

package main
import (
  "fmt"
)
func main() {
  ch1 := make(chan int)
  ch2 := make(chan int)
  select {
  case ch1 <- 1:
    fmt.Println("run case 1")
  case data := <- ch2:
    fmt.Printf("run case 2, data is: %d", data)
  }
}

繼續分析 walkSelectCases() 函數,處理多 case 的代碼邏輯如下:

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
       ......
       // 從這裏開始是多case的情況
       // ncas是select的全部分支的個數,如果有default分支,ncas個數減一 
       if dflt != nil {
    ncas--
  }
        //定義casorder爲ncas大小的case語句的數組
  casorder := make([]*ir.CommClause, ncas)
        // 分別定義nsends爲發送channel的case個數,nrecvs爲接收channel的case個數
  nsends, nrecvs := 0, 0
        // 定義init爲多case編譯後待執行的語句列表
  var init []ir.Node

  base.Pos = sellineno
        // 定義selv爲長度爲ncas的scase類型的數組,scasetype()函數返回的就是scase結構體,包含chan和elem兩個字段
  selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
  init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))

  // 定義order爲2倍的ncas長度的TUINT16類型的數組
        // 注意:selv和order作爲runtime.selectgo()函數的入參,前者存放scase列表內存地址,後者用來做scase排序使用,排序是爲了便於挑選出待執行的case
  order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))

  ......
        // 第一個階段:遍歷case生成scase對象放到selv中
  for _, cas := range cases {
    ir.SetPos(cas)

    init = append(init, ir.TakeInit(cas)...)

    n := cas.Comm
    if n == nil { // 如果是default分支,先跳過
      continue
    }

    var i int
    var c, elem ir.Node
                // 根據case分別是發送或接收類型,獲取chan, elem的值
    switch n.Op() {
    default:
      base.Fatalf("select %v", n.Op())
    case ir.OSEND:
      n := n.(*ir.SendStmt)
      i = nsends         // 對發送channel類型的case,i從0開始遞增
      nsends++
      c = n.Chan
      elem = n.Value
    case ir.OSELRECV2:
      n := n.(*ir.AssignListStmt)
      nrecvs++
      i = ncas - nrecvs   // 對接收channel類型的case,i從ncas開始遞減
      recv := n.Rhs[0].(*ir.UnaryExpr)
      c = recv.X
      elem = n.Lhs[0]
    }
                // 編譯器對多個case排列後,發送chan的case在左邊,接收chan的case在右邊,在selv中也是如此
    casorder[i] = cas
                // 定義一個函數,寫入chan或elem到selv數組
    setField := func(f string, val ir.Node) {
      r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
      init = append(init, typecheck.Stmt(r))
    }
                // 將c代表的chan寫入selv
    c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
    setField("c", c)
                // 將elem寫入selv
    if !ir.IsBlank(elem) {
      elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
      setField("elem", elem)
    }

    ......
  }
        // 如果發送chan和接收chan的個數不等於ncas,說明代碼有錯誤,直接報錯
  if nsends+nrecvs != ncas {
    base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
  }

  // 從這裏開始執行select動作
  base.Pos = sellineno
        // 定義chosen, recvOK作爲selectgo()函數的兩個返回值,chosen 表示被選中的case的索引,recvOK表示對於接收操作,是否成功接收
  chosen := typecheck.Temp(types.Types[types.TINT])
  recvOK := typecheck.Temp(types.Types[types.TBOOL])
  r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
  r.Lhs = []ir.Node{chosen, recvOK}
        // 調用runtime.selectgo()函數作爲運行時實際執行多case的select動作的函數
  fn := typecheck.LookupRuntime("selectgo")
  var fnInit ir.Nodes
  r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
  init = append(init, fnInit...)
  init = append(init, typecheck.Stmt(r))

  // 執行完selectgo()函數後,銷燬selv和order數組.
  init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
  init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
  ......

  // 定義一個函數,根據chosen確定的case分支生成if語句,執行該分支的語句
  dispatch := func(cond ir.Node, cas *ir.CommClause) {
    cond = typecheck.Expr(cond)
    cond = typecheck.DefaultLit(cond, nil)

    r := ir.NewIfStmt(base.Pos, cond, nil, nil)

    if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
      n := n.(*ir.AssignListStmt)
      if !ir.IsBlank(n.Lhs[1]) {
        x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
        r.Body.Append(typecheck.Stmt(x))
      }
    }

    r.Body.Append(cas.Body.Take()...)
    r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
    init = append(init, r)
  }
        // 如果多case中有default分支,並且chosen小於0,執行該default分支
  if dflt != nil {
    ir.SetPos(dflt)
    dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
  }
        // 如果有chosen選中的case分支,即chosen等於i,則執行該分支
  for i, cas := range casorder {
    ir.SetPos(cas)
    dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
  }

  return init
}

從對多 case 的編譯器處理邏輯,可以看到分爲三個階段:

第一階段,生成 scase 對象數組,定義 selv 和 order 數組,selv 存放 scase 數組內存地址,order 用來做 scase 排序使用,對 scase 數組排序是爲了以某種機制選出待執行的 case;

第二階段,編譯器生成調用 runtime.selectgo() 的邏輯,selv 和 order 數組作爲入參傳入 selectgo() 函數,同時定義該函數的返回值,chosen 和 recvOK,chosen 表示被選中的 case 的索引,recvOK 表示對於接收操作,是否成功接收;

第三階段,根據 selectgo 返回值 chosen 來生成 if 語句來執行相應索引的 case。

6)select 在多 case 下調用的運行時 selectgo 函數怎樣實現多 channel 的選擇?

下面開始分析 runtime.selectgo() 函數的主要邏輯,邏輯流程圖如圖所示。

selectgo 函數處理主邏輯  

selectgo 函數首先會執行必要的初始化操作,並生成處理 case 的兩種順序:輪詢順序 polIorder 和加鎖順序 lockorder。

// cas0 指向一個類型爲 [ncases]scase 的數組
// order0 是一個指向[2*ncases]uint16,數組中的值都是 0
// 返回值有兩個, chosen 和 recvOK,分別表示選中的case的序號,和對接收操作是否接收成功的布爾值
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
  ......
  // 爲了將scase分配到棧上,這裏直接給cas1分配了64KB大小的數組,同理, 給order1分配了128KB大小的數組
  cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
  order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

        // ncases個數是發送chan個數nsends加上接收chan個數nrecvs
  ncases := nsends + nrecvs
        // scases切片是上面分配cas1數組的前ncases個元素
  scases := cas1[:ncases:ncases]
        // 順序列表pollorder是order1數組的前ncases個元素
  pollorder := order1[:ncases:ncases]
        // 加鎖列表lockorder是order1數組的第二批ncase個元素
  lockorder := order1[ncases:][:ncases:ncases]
  ......

  // 生成排列順序
  norder := 0
  for i := range scases {
    cas := &scases[i]

    // 處理case中channel爲空的情況
    if cas.c == nil {
      cas.elem = nil // 將elem置空,便於GC
      continue
    }
                // 通過fastrandn函數引入隨機性,確定pollorder列表中case的隨機順序索引
    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
  }
  pollorder = pollorder[:norder]
  lockorder = lockorder[:norder]

  // 根據chan地址確定lockorder加鎖排序列表的順序
  // 通過簡單的堆排序,以nlogn時間複雜度完成排序
  for i := range lockorder {
    j := i
    // Start with the pollorder to permute cases on the same channel.
    c := scases[pollorder[i]].c
    for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
      k := (j - 1) / 2
      lockorder[j] = lockorder[k]
      j = k
    }
    lockorder[j] = pollorder[i]
  }
  for i := len(lockorder) - 1; i >= 0; i-- {
    o := lockorder[i]
    c := scases[o].c
    lockorder[i] = lockorder[0]
    j := 0
    for {
      k := j*2 + 1
      if k >= i {
        break
      }
      if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
        k++
      }
      if c.sortkey() < scases[lockorder[k]].c.sortkey() {
        lockorder[j] = lockorder[k]
        j = k
        continue
      }
      break
    }
    lockorder[j] = o
  }
        ......
}

輪詢順序 pollorder 是通過 runtime.fastrandn 函數引入隨機性;隨機的輪詢順序可以避免 channel 的飢餓問題,保證公平性。加鎖順序 lockorder 是按照 channel 的地址排序後確定的加鎖順序,這樣能夠避免死鎖的發生。

加鎖和解鎖調用的是 runtime.sellock() 函數和 runtime.selunlock() 函數。 從下面的代碼邏輯中可以看到,兩個函數分別是按 lockorder 順序對 channel 加鎖,以及按 lockorder 逆序釋放鎖。

func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}
func selunlock(scases []scase, lockorder []uint16) {
  for i := len(lockorder) - 1; i >= 0; i-- {
    c := scases[lockorder[i]].c
    if i > 0 && c == scases[lockorder[i-1]].c {
      continue 
    }
    unlock(&c.lock)
  }
}

接下來,是 selectgo() 函數的主處理邏輯,它會分三個階段查找或等待某個 channel 準備就緒: 首先,根據 pollorder 的順序查找 scases 是否有可以立即收發的 channel;其次,將當前 goroutine 加入各 case 的 channel 對應的收發隊列上並等待其他 goroutine 的喚醒;最後,當前 goroutine 被喚醒之後找到滿足條件的 channel 並進行處理;

需要說明的是,runtime.selectgo 函數會根據不同情況通過 goto 語句跳轉到函數內部的不同標籤執行相應的邏輯。 其中包括:bufrecv:可以從 channel 緩衝區讀取數據;bufsend:可以向 channel 緩衝區寫入數據;recv:可以從休眠的發送方獲取數據;send:可以向休眠的接收方發送數據;rclose:可以從關閉的 channel 讀取 EOF;sclose:向關閉的 channel 發送數據;retc:結束調用並返回;

先看主處理邏輯的第一個階段,根據 pollorder 的順序查找 scases 是否有可以立即收發的 channel:

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
        ......
  sellock(scases, lockorder)
        ......
        // 階段一: 查找可以處理的channel
  var casi int
  var cas *scase
  var caseSuccess bool
  var caseReleaseTime int64 = -1
  var recvOK bool
  for _, casei := range pollorder {
    casi = int(casei)      // case的索引
    cas = &scases[casi]    // 當前的case
    c = cas.c

    if casi >= nsends { // 處理接收channel的case
      sg = c.sendq.dequeue()
      if sg != nil {  // 如果當前channel的sendq上有等待的goroutine,就會跳到 recv標籤並從緩衝區讀取數據後將等待goroutine中的數據放入到緩衝區中相同的位置;
        goto recv
      }
      if c.qcount > 0 { //如果當前channel的緩衝區不爲空,就會跳到bufrecv標籤處從緩衝區獲取數據;
        goto bufrecv
      }
      if c.closed != 0 {  //如果當前channel已經被關閉,就會跳到rclose做一些清除的收尾工作;
        goto rclose
      }
    } else {                      // 處理發送channel的case
      ......
      if c.closed != 0 { // 如果當前channel已經被關閉就會直接跳到sclose標籤,觸發 panic 嘗試中止程序;
        goto sclose
      }
      sg = c.recvq.dequeue()
      if sg != nil {  // 如果當前channel的recvq上有等待的goroutine,就會跳到 send標籤向channel發送數據;
        goto send
      }
      if c.qcount < c.dataqsiz { // 如果當前channel的緩衝區存在空閒位置,就會將待發送的數據存入緩衝區;
        goto bufsend
      }
    }
  }
        if !block {  // 如果是非阻塞,即包含default分支,會解鎖所有 Channel 並返回
             selunlock(scases, lockorder)
             casi = -1
             goto retc
        }
        ......
}

主要處理邏輯是:

當 case 會從 channel 中接收數據時,如果當前 channel 的 sendq 上有等待的 goroutine,就會跳到 recv 標籤並從緩衝區讀取數據後將等待 goroutine 中的數據放入到緩衝區中相同的位置;如果當前 channel 的緩衝區不爲空,就會跳到 bufrecv 標籤處從緩衝區獲取數據;如果當前 channel 已經被關閉,就會跳到 rclose 做一些清除的收尾工作。

當 case 會向 channel 發送數據時,如果當前 channel 已經被關閉,就會直接跳到 sclose 標籤,觸發 panic 嘗試中止程序;如果當前 channel 的 recvq 上有等待的 goroutine,就會跳到 send 標籤向 channel 發送數據;如果當前 channel 的緩衝區存在空閒位置,就會將待發送的數據存入緩衝區。

當 select 語句中包含 default 即 block 爲 false 時;表示前面的所有 case 都沒有被執行,這裏會解鎖所有 channel 並返回,意味着當前 select 結構中的收發都是非阻塞的。

如果沒有可以立即處理的 channel,則進入主邏輯的下一個階段,根據需要將當前 goroutine 加入 channel 對應的收發隊列上並等待其他 goroutine 的喚醒。

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
        ......
        // 階段2: 將當前goroutine根據需要掛在chan的sendq和recvq上
  gp = getg()
  if gp.waiting != nil {
    throw("gp.waiting != nil")
  }
  nextp = &gp.waiting
  for _, casei := range lockorder {  
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
                // 獲取sudog,將當前goroutine綁定到sudog上
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    if t0 != 0 {
      sg.releasetime = -1
    }
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink
                // 加入相應等待隊列
    if casi < nsends {
      c.sendq.enqueue(sg)
    } else {
      c.recvq.enqueue(sg)
    }
  }
        ......
        // 被喚醒後會根據 param 來判斷是否是由 close 操作喚醒的,所以先置爲 nil
        gp.param = nil
        ......
        // 掛起當前goroutine
        gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
        ......
}

等到 select 中的一些 channel 準備就緒之後,當前 goroutine 就會被調度器喚醒。這時會繼續執行 runtime.selectgo 函數的第三部分:

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
        ......
        // 加鎖所有的channel
        sellock(scases, lockorder)

  gp.selectDone = 0
        // param 存放喚醒 goroutine 的 sudog,如果是關閉操作喚醒的,那麼就爲 nil
  sg = (*sudog)(gp.param)
  gp.param = nil

  casi = -1
  cas = nil
  caseSuccess = false
        // 當前goroutine 的 waiting 鏈表按照lockorder順序存放着case的sudog
  sglist = gp.waiting
  // 在從 gp.waiting 取消case的sudog鏈接之前清除所有元素,便於GC
  for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
  }
        // 清楚當前goroutine的waiting鏈表,因爲被sg代表的協程喚醒了
  gp.waiting = nil

  for _, casei := range lockorder {
    k = &scases[casei]
                // 如果相等說明,goroutine是被當前case的channel收發操作喚醒的
    if sg == sglist {
      // sg喚醒了當前goroutine, 則當前G已經從sg的隊列中出隊,這裏不需要再次出隊
      casi = int(casei)
      cas = k
      caseSuccess = sglist.success
      if sglist.releasetime > 0 {
        caseReleaseTime = sglist.releasetime
      }
    } else {
                        // 不是此case喚醒當前goroutine, 將goroutine從此case的發送隊列或接收隊列出隊
      c = k.c
      if int(casei) < nsends {
        c.sendq.dequeueSudoG(sglist)
      } else {
        c.recvq.dequeueSudoG(sglist)
      }
    }
                // 釋放當前case的sudog,然後處理下一個case的sudog
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
  }
        ......
}

這裏主要是:首先,先釋放當前 goroutine 的等待隊列,因爲已經被某個 case 的 sudog 喚醒了;其次,遍歷全部的 case 的 sudog,找到喚醒當前 goroutine 的 case 的索引並返回,後面會根據它做 channel 的收發操作;最後,剩下的不是喚醒當前 goroutine 的 case,需要將當前 goroutine 從這些 case 的發送隊列或接收隊列出隊,並釋放這些 case 的 sudog;

selectgo() 函數的最後一些代碼,是循環第一階段用到的跳轉標籤代碼段;

bufsend 和 bufrecv 兩個代碼段,這兩段代碼的執行過程都很簡單,它們是向 channel 的緩衝區中發送數據或者從緩衝區中獲取數據;

兩個直接收發 channel 的情況 recv、send,會調用運行時函數 runtime.send 和 runtime.recv,這兩個函數會與處於休眠狀態的 goroutine 打交道;

向關閉的 channel 發送數據或者從關閉的 channel 中接收數據分別是 sclose 和 rclose 階段;sclose,向一個關閉的 channel 發送數據就會直接 panic 造成程序崩潰;rclose,從一個關閉 channel 中接收數據會直接清除 Channel 中的相關內容;retc 階段,退出程序。

bufrecv:
  ......
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

bufsend:
  ......
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

recv:
  // 可以直接從休眠的goroutine獲取數據
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  ......
  recvOK = true
  goto retc

rclose:
  //從一個關閉 channel 中接收數據會直接清除 Channel 中的相關內容;
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  ......
  goto retc

send:
  ......
        // 可以直接從休眠的goroutine獲取數據
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  if debugSelect {
    print("syncsend: cas0=", cas0, " c=", c, "\n")
  }
  goto retc

retc:
        // 退出selectgo()函數
  if caseReleaseTime > 0 {
    blockevent(caseReleaseTime-t0, 1)
  }
  return casi, recvOK

sclose:
  // 向一個關閉的 channel 發送數據就會直接 panic 造成程序崩潰;
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

總結

綜合上面的分析,總結如下:

編譯器會對 select 有不同的 case 的情況進行優化以提高性能。首先,編譯器對 select 沒有 case、有單 case 和單 case+default 的情況進行單獨處理,這些處理或者直接調用運行時函數,或者直接轉成對 channel 的操作,或者以非阻塞的方式訪問 channel,多種靈活的處理方式能夠提高性能,尤其是避免對 channel 的加鎖。

對最常出現的 select 有多 case 的情況,會調用 runtime.selectgo() 函數來獲取執行 case 的索引,並生成 if 語句執行該 case 的代碼。

selectgo 函數的執行分爲四個步驟:首先,隨機生成一個遍歷 case 的輪詢順序 pollorder 並根據 channel 地址生成加鎖順序 lockorder,隨機順序能夠避免 channel 飢餓,保證公平性,加鎖順序能夠避免死鎖和重複加鎖;然後,根據 pollorder 的順序查找 scases 是否有可以立即收發的 channel,如果有則獲取 case 索引進行處理;再次,如果 pollorder 順序上沒有可以直接處理的 case,則將當前 goroutine 加入各 case 的 channel 對應的收發隊列上並等待其他 goroutine 的喚醒;最後,當調度器喚醒當前 goroutine 時,會再次按照 lockorder 遍歷所有的 case,從中查找需要被處理的 case 索引進行讀寫處理,同時從所有 case 的發送接收隊列中移除掉當前 goroutine。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/0oPA4THIcvWjwMGrudKJuQ