channel 的 15 條規則和底層實現

概述

下面表格中的內容是 Go 語言中 channel 數據類型的使用規則,相信讀者已經可以熟練掌握,本文主要分析 channel 的內部實現中的數據結構和算法,所以相關的基礎概念會直接跳過, 希望讀者閱讀完本文後,可以深入理解表格中的各類規則,從應用層代碼到底層實現,能夠知其然並知其所以然。

操作規則

tsUtdz

編譯規則

dOncLq

channel 的內部實現文件路徑爲 $GOROOT/src/runtime/chan.go,筆者的 Go 版本爲 go1.19 linux/amd64

幾個常量

const (
 // 內存對齊的最大值,這個等於 64 位 CPU 下的 cacheline 的大小
 maxAlign  = 8
 // 計算 unsafe.Sizeof(hchan{})  最接近的 8 的倍數
 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
 // 是否開啓 debug 模式
 debugChan = false
)

hchan 對象

hchan 對象表示運行時的 channel

對於無緩衝 channel 來說,發送隊列和接收隊列至少有一個爲空,一個無緩衝 channel 和一個阻塞在該 channel 上面的 goroutine,使用 select 語句發送和接收。

對於有緩衝 channel 來說,qcount > 0 意味着接收隊列爲空,qcount < dataqsiz 意味着發送隊列爲空。

type hchan struct {
 qcount   uint           // channel 元素數量
 dataqsiz uint           // channel 緩衝區環形隊列長度
 buf      unsafe.Pointer // 指向緩衝區的底層數組 (針對有緩衝的 channel)
 elemsize uint16         // channel 元素大小
 closed   uint32         // 是否關閉
 elemtype *_type         // channel 元素類型
 sendx    uint           // 當前已發送元素在隊列中的索引
 recvx    uint           // 當前已接收元素在隊列中的索引
 recvq    waitq          // 接收 goroutine 隊列 (數據結構是鏈表)
 sendq    waitq          // 發送 goroutine 隊列 (數據結構是鏈表)
 
 // lock 保護結構體中的所有字段,以及 sudogs 對象中被當前 channel 阻塞的幾個字段
 // 不要在持有鎖時修改另一個 goroutine 的狀態(特別是沒有進入 ready 狀態的 goroutine)
 // 因爲這會導致棧收縮而發生死鎖
 lock mutex
}

channel 運行時結構體

上面的圖片展示了一個典型的 channel 數據結構圖,其中各元素表示爲:

waitq 對象

waitq 對象表示因爲 channel 緩衝區空間不足而陷入等待的 goroutine 發送 / 接收隊列, 數據結構是雙向鏈表,其中頭節點和尾節點都是 sudog 對象,sudog 對象的字段和具體作用在之前的 GMP 調度器 - 數據結構 一文中已經講過,這裏不再贅述。

type waitq struct {
 first *sudog
 last  *sudog
}

讀者可以停下來思考一個問題: 同一個 goroutine 有可能同時出現在發送隊列和接收隊列嗎?爲什麼?

創建 channel

編譯器會將應用層代碼中的 make(chan type, N) 語句轉換爲 makechan 函數調用。

func makechan(t *chantype, size int) *hchan {
 elem := t.elem

 // 由編譯器檢查保證元素大小不能大於等於 64K
 if elem.size >= 1<<16 {
  throw("makechan: invalid channel element type")
 }
 
 // 檢測 hchan 結構體大小是否是 maxAlign 的整數倍
 // 並且元素的對齊單位不能超過最大對齊單位
 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  throw("makechan: bad alignment")
 }

 // 檢測內存是否超過限制
 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
 if overflow || mem > maxAlloc-hchanSize || size < 0 {
  panic(plainError("makechan: size out of range"))
 }

 // 當存儲在 buf 中的元素不包含指針時,可以消除 GC 掃描
 var c *hchan
 switch {
 case mem == 0:
  // 如果是無緩衝 channel
  // 僅爲 hchan 分配內存空間
  c = (*hchan)(mallocgc(hchanSize, nil, true))
  // data race detector 使用當前作爲檢測點進行同步
  c.buf = c.raceaddr()
 case elem.ptrdata == 0:
  // 如果 channel 中的元素不包含指針
  // 爲 hchan 結構體和 buf 字段分配一段連續的內存空間
  c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  c.buf = add(unsafe.Pointer(c), hchanSize)
 default:
  // 如果 channel 中的元素包含指針
  // 分別爲 hchan 結構體和 buf 字段單獨分配內存空間
  c = new(hchan)
  c.buf = mallocgc(mem, elem, true)
 }

 // 設置 channel 元素大小
 c.elemsize = uint16(elem.size)
 // 設置 channel 元素類型
 c.elemtype = elem
 // 設置 channel 緩衝區長度
 c.dataqsiz = uint(size)

 if debugChan {
  // 如果開啓了 debug 模式
  // 打印初始化信息
  print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
 }
 return c
}

發送數據

chansend 方法

編譯器會將應用層代碼中的 c <- x 語句轉換爲 chansend1 函數調用。

//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
 chansend(c, elem, true, getcallerpc())
}
// 編譯器將
//
// select {
// case c <- v:
//  ... foo
// default:
//  ... bar
// }
//
// 轉換爲
//
// if selectnbsend(c, v) {
//  ... foo
// } else {
//  ... bar
// }
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
 return chansend(c, elem, false, getcallerpc())
}

chansend1selectnbsend 函數內部調用的都是 chansend 函數, chansend 函數向 channel 發送數據,並且返回是否發送成功。

chansend 函數內部的 channel 處理邏輯分爲兩種:

  1. 阻塞發送

  2. 非阻塞發送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 // channel == nil
 // 例如 
 //  var a chan int
 //  a <- 1
 if c == nil {
  if !block {
    // 非阻塞模式下直接返回
   return false
  }
  // nil channel 發送數據會永久阻塞
  // 掛起當前 goroutine
  gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }
 
 // channel 非阻塞且未關閉
 // 並且緩衝區已滿,直接返回
  if !block && c.closed == 0 && full(c) {
  return false
 }

 // 加鎖 (注意後續代碼中不同條件下的解鎖處理細節)
 lock(&c.lock)

 // channel 已經關閉,拋出 panic
 if c.closed != 0 {
  unlock(&c.lock)
  panic(plainError("send on closed channel"))
 }

 // 如果存在等待接收的 goroutine
 // 將數據發送給等待接收的 goroutine 後,直接返回
 if sg := c.recvq.dequeue(); sg != nil {
  // 將數據發送給隊列第一個 goroutine
  // 將數據直接傳遞給 goroutine,繞過 channel 緩衝區 (類似零拷貝的設計理念)
  // 詳情見: send 函數
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
 }
 
// qcount 是隊列當前元素數量
 // dataqsiz 是隊列總長度
 // 當前元素數量小於隊列總長度時,說明還有空閒空間可供使用
 if c.qcount < c.dataqsiz {
  // 緩衝區未滿,還有可用空間
  // 獲取下一個可以存放數據的地址 (緩衝區槽位)
  qp := chanbuf(c, c.sendx)
  // 將發送的數據拷貝到緩衝區
  typedmemmove(c.elemtype, qp, ep)    
  // 發送索引 + 1
  c.sendx++                           
  // 環形隊列,當發送索引等於隊列長度時,索引重置爲 0
  if c.sendx == c.dataqsiz {          
   c.sendx = 0
  }
  // 緩衝區元素數量 + 1
  c.qcount++                     
  // 解鎖
  unlock(&c.lock)                      
  return true
 }

  // 隊列沒有空閒空間可供使用
 // 直接返回
 if !block {
  unlock(&c.lock)
  return false
 }

 // --------------------------
 // 接下來的流程針對的是阻塞的情況
 // --------------------------
 
 // 獲取當前發送數據的 goroutine
 // 然後綁定到一個 sudog 結構體 (包裝爲運行時表示) 
 gp := getg()
 // 獲取 sudog 結構體
 // 並且設置相關字段 (包括當前的 channel,是否是 select 等)
 mysg := acquireSudog()
 mysg.g = gp
 mysg.isSelect = false
 mysg.c = c
 // 將 sudog 放入發送隊列
 c.sendq.enqueue(mysg) 

 // 掛起當前 goroutine, 進入休眠 (等待接收)
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
 // 確保發送的值一直處於有效狀態,直到接收方將其複製出來
 // sudog 有一個指向棧對象的指針,保持發送的數據處於活躍狀態,避免被 GC
 KeepAlive(ep)

 // 取消 sudog 和 channel 綁定關係
 mysg.c = nil        
 // 釋放 sudog
 releaseSudog(mysg)   
 if closed {         
  // goroutine 被喚醒後發現 channel 已關閉, 拋出 panic
  if c.closed == 0 {
   throw("chansend: spurious wakeup")
  }
   panic(plainError("send on closed channel"))
 }
 return true
}

send 函數

send 函數用於處理 channel 數據的發送操作,函數會調用 sendDirect 直接將發送方的數據複製到接收方,或將等待接收的 goroutine 喚醒。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 if sg.elem != nil {
  // 直接拷貝數據
  sendDirect(c.elemtype, sg, ep)
 }
 
 ...
 
 // 調用 goready 函數將接收方 goroutine 喚醒並標記爲可運行狀態
 // 並把其放入發送方所在處理器 P 的 runnext 字段等待執行
 // runnext 字段表示最高優先級的 goroutine (GMP 調度器一文中講過)
 goready(gp, skip+1)
}

sendDirect 函數

sendDirect 函數用於 channel 具體的發送數據操作,將發送方 goroutine 的數據直接寫入到接收方 goroutine

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  ...
}

阻塞發送

channel 阻塞發送時,將 sudog 結構體放入發送隊列:

channel 阻塞發送示例圖

非阻塞發送

channel 非阻塞發送時,分爲兩種情況:

  1. 緩衝區未滿,直接將數據存入緩衝區

  2. 緩衝區已滿,將 sudog 結構體放入發送隊列

channel 非阻塞發送情況一示例圖

channel 非阻塞發送情況二示例圖

小結

channel 發送數據的條件分支:

  1. 如果 channel == nil, 非阻塞模式直接返回,阻塞模式,休眠當前 goroutine

  2. 如果 channel 爲非阻塞模式並且 channel 未關閉,同時緩衝區已滿,直接返回

  3. 如果 channel 已經關閉,發生 panic

  4. 如果 channel 接收隊列不爲空, 出隊第一個元素作爲接收方 goroutine,將數據發送給接收方 goroutine 後,直接返回

  5. 如果 channel 緩衝區未滿,將數據存入緩衝區,直接返回

  6. 如果以上條件都不滿足,就獲取一個新的 sudog 結構體並放入 channel 的發送隊列,同時掛起當前發送數據的 goroutine, 進入休眠 (等待接收方接收數據)

接收數據

編譯器會將應用層代碼中的 <- ch 語句轉換爲 chanrecv1 函數調用。

//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
 chanrecv(c, elem, true)
}

編譯器會將應用層代碼中的 x, ok <- ch 語句轉換爲 chanrecv2 函數調用。

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
  _, received = chanrecv(c, elem, true)
  return
}
// 編譯器將
//
// select {
// case v, ok = <-c:
//  ... foo
// default:
//  ... bar
// }
//
// 轉換爲
//
// if selected, ok = selectnbrecv(&v, c); selected {
//  ... foo
// } else {
//  ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
 return chanrecv(c, elem, false)
}

chanrecv1chanrecv2 以及 selectnbrecv 函數內部調用的都是 chanrecv 函數。

chanrecv 函數用於在 channel 上接收數據並將接收到的數據寫入參數 ep (ep 可以設置爲 nil, 這種情況下接收到的數據將會被忽略),並有兩個返回值:

根據參數的不同返回不同的值:

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
      if !block {
        // 非阻塞的情況下,直接返回
        // 非阻塞出現在 select{} + default 場景
        return
      }
      // 在 nil channel 上進行接收操作,會永久阻塞
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable") // 疑問:這行代碼能執行到嗎?
 }
 
 // 非阻塞模式並且接收數據操作會阻塞
 // empty 函數返回 true 的情況:
 //    1. 無緩衝 channel 並且沒有發送方正在阻塞
 //    2. 有緩衝 channel 並且緩衝區沒有數據
  if !block && empty(c) {
    // 接下來再判斷 channel 是否已經關閉
      if atomic.Load(&c.closed) == 0 {
        // 如果是未關閉的 channel, 非阻塞且沒有可接收數據的情況下,直接返回
        // 因爲 channel 關閉後就無法再打開
        // 所以只要 channel 未關閉,上述方法都是原子操作 (看到的結果都是一樣的)
        return
      }

      // 執行到這裏,說明 channel 已經關閉
      // channel 關閉後就無法再打開
      // 重新檢查 channel 是否存在等待接收的數據
      if empty(c) {
        // 沒有任何等待接收的數據
        if ep != nil {
          typedmemclr(c.elemtype, ep) // 清理 ep 指針中的數據
        }
        return true, false
      }
 }

 ...

  
  // 加鎖 (注意後續代碼中不同條件下的解鎖處理細節)
 lock(&c.lock)
 
 if c.closed != 0 {    // channel 已經關閉
  if c.qcount == 0 {  // 緩衝區也沒有數據了
   unlock(&c.lock) // 解鎖
   if ep != nil {
    typedmemclr(c.elemtype, ep) // 清理 ep 指針中的數據
   }
   return true, false
  }
 } else {
  // 先檢測發送的隊列是否不爲空
  // 不爲空說明有阻塞在等待發送的 goroutine
  if sg := c.sendq.dequeue(); sg != nil {
   // 出隊發送隊列第一個 goroutine
   // 如果緩衝區還有剩餘的可用空間,直接從發送 goroutine 接收數據
   // 否則,從接收隊列頭部的 goroutine 開始接收數據,並將數據添加到發送隊列尾部的 goroutine
   recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   return true, true
  }
 }

  // 如果 channel 緩衝區還有數據
 if c.qcount > 0 {               
  // 獲取 channel 接收地址
  qp := chanbuf(c, c.recvx)   

  if ep != nil {
    // 直接拷貝數據到接收地址
   typedmemmove(c.elemtype, ep, qp)
  }

  // 清除緩衝區數據
  typedmemclr(c.elemtype, qp) 
  // 接收索引 + 1
  c.recvx++
  // 環形隊列,當索引等於隊列長度時,索引重置爲 0
  if c.recvx == c.dataqsiz {  
   c.recvx = 0
  }
  // 緩衝區元素數量 - 1
  c.qcount--
  // 解鎖
  unlock(&c.lock)             
  return true, true
 }

// 如果是非阻塞並且無數據可接收
 // 直接返回
 if !block {
  unlock(&c.lock)
  return false, false
 }

 // --------------------------
 // 接下來的流程針對的是阻塞的情況
 // --------------------------

 // 獲取當前發送數據的 goroutine
 // 然後綁定到一個 sudog 結構體 (包裝爲運行時表示)
 gp := getg()
 // 獲取 sudog 結構體
 // 並且設置相關數據 (包括當前的 channel,是否是 select 等)
 mysg := acquireSudog()
 mysg.g = gp
 mysg.isSelect = false
 mysg.c = c

 // 將 sudog 放入接收隊列
 c.recvq.enqueue(mysg)   
 // 掛起當前 goroutine, 進入休眠 (等待發送方發送數據)
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

 // 取消 sudog 和 channel 綁定關係
 mysg.c = nil
 // 釋放 sudog
 releaseSudog(mysg)  
 return true, success
}

recv 函數

recv 函數用於處理 channel 的數據接收操作。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 // 無緩衝 channel
 if c.dataqsiz == 0 {    
  if ep != nil {
   // 直接從發送方拷貝數據
   recvDirect(c.elemtype, sg, ep)
  }
 } else {
  // 獲取緩衝區首元素
  qp := chanbuf(c, c.recvx)               
  if ep != nil {
    // 從緩衝區拷貝數據到接收方
   typedmemmove(c.elemtype, ep, qp)    
  }
  // 從發送方拷貝數據到緩衝區
  typedmemmove(c.elemtype, qp, sg.elem)   
  // 接收索引 + 1
  c.recvx++
  // 環形隊列,當索引等於隊列長度時,索引重置爲 0
  if c.recvx == c.dataqsiz {              
   c.recvx = 0
  }
  // 除了更新接收索引外,還要更新發送索引 (賦值爲更新後的接收索引值)
  // 這樣下次寫入發送數據時,才能保證寫入位置正確
  c.sendx = c.recvx
 }
 
 ...

 // 調用 goready 函數將接收方 goroutine 喚醒並標記爲可運行狀態
 // 並把其放入接收方所在處理器 P 的 runnext 字段等待執行
 goready(gp, skip+1)
}

recvDirect 函數

recvDirect 函數和 sendDirect 函數作用一致,這裏不再贅述。

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
  ...
}

阻塞接收

channel 阻塞接收示例圖

非阻塞接收

channel 非阻塞接收時,分爲兩種情況:

  1. 緩衝區不爲空,直接從緩衝區讀取數據

  2. 緩衝區爲空,將 sudog 結構體放入接收隊列

channel 非阻塞發送情況一示例圖

channel 非阻塞發送情況二示例圖

小結

channel 接收數據的條件分支:

  1. 如果 channel == nil, 非阻塞模式直接返回,阻塞模式,休眠當前 goroutine

  2. 如果 channel 已經關閉或者緩衝區沒有等待接收的數據,直接返回

  3. 如果 channel 發送隊列不爲空, 出隊第一個元素作爲發送方 goroutine,將數據發送給接收方 goroutine 後,直接返回

  4. 如果 channel 緩衝區有數據,直接從緩衝區讀取數據

  5. 如果以上條件都不滿足,就獲取一個新的 sudog 結構體並放入 channel 的接收隊列,同時掛起當前發送數據的 goroutine, 進入休眠 (等待發送方發送數據)

關閉 channel

編譯器會將應用層代碼中的 clsoe(channel name) 語句轉換爲 closechan 函數調用。

func closechan(c *hchan) {
 // 關閉一個 nil channel, 拋出 panic
 if c == nil {
  panic(plainError("close of nil channel"))
 }

 // 加鎖,這個鎖的粒度比較大
 // 會持續到釋放完所有的 sudog 才解鎖
 lock(&c.lock)
 
 // 關閉一個已經關閉的 channel, 拋出 panic
 if c.closed != 0 {
  unlock(&c.lock)
  panic(plainError("close of closed channel"))
 }

 ...

 // 設置 channel 狀態爲已關閉
 c.closed = 1    

 // goroutine 列表
 // 用於存放發送+接收隊列中的所有 goroutine
 var glist gList

 // 將接收隊列中所有 goroutine 加入 gList 列表
 for {
  sg := c.recvq.dequeue()
  // 出隊的 sudog 爲 nil
  // 說明接收隊列爲空,直接跳出循環
  if sg == nil {
   break
  }
  // 將 sg 對應的 goroutine 添加到 glist 列表
  glist.push(gp)
 }

 // 將發送隊列中所有 goroutine 加入 gList 列表
 // 當然,因爲 channel 已經關閉,所以這些 goroutine 被喚醒後發生數據時會直接 panic
 for {
  sg := c.sendq.dequeue()
  // 出隊的 sudog 爲 nil
  // 說明發送隊列爲空,直接跳出循環
  if sg == nil {
   break
  }
  
   // 將 sg 對應的 goroutine 添加到 glist 列表
   glist.push(gp)
 }

 // 解鎖
 unlock(&c.lock) 

 // 將出隊的所有 goroutine 設置爲可運行狀態
 for !glist.empty() {
  gp := glist.pop()
  gp.schedlink = 0
  goready(gp, 3)
 }
}

示意圖

這裏需要注意的是: gList 是一個棧數據結構 (後進先出),所以調用 glist.pop 方法時,首先出隊的是發送隊列的最後一個 goroutine, 最後出隊的是接收隊列的第一個 goroutine (圖中兩條青色的線條),雖然順序相反,但是數據的發送 / 接收不會受到影響。

channel 關閉

輔助函數

empty

empty 函數檢測從 channel 讀取數據是否會阻塞 (也就是檢測 channel 緩衝區是否爲空),主要分爲兩種情況:

  1. 如果 channel 沒有緩衝區,查看是否存在發送數據 goroutine

  2. 如果 channel 有緩衝區,檢查元素數量是否等於 0

func empty(c *hchan) bool {
    if c.dataqsiz == 0 {
      return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
    }
    return atomic.Loaduint(&c.qcount) == 0
}

chanbuf

chanbuf 函數用於獲取緩衝區下一個地址 (緩衝區槽位),chanbuf(c, i) 表示指向緩衝區中第 i 個槽位的指針。

func chanbuf(c *hchan, i uint) unsafe.Pointer {
   return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

full

full 函數檢測 channel 緩衝區是否已滿,主要分爲兩種情況:

  1. 如果 channel 沒有緩衝區,查看是否存在接收者

  2. 如果 channel 有緩衝區, 比較元素數量和緩衝區長度是否一致

func full(c *hchan) bool {
    if c.dataqsiz == 0 {
      return c.recvq.first == nil
    }
    return c.qcount == c.dataqsiz
}

enqueue

enqueue 方法用於將 goroutine 放入 channel 的發送 / 接收隊列 (入隊操作),內部實現就是鏈表操作。

func (q *waitq) enqueue(sgp *sudog) {
   ...
}

dequeue

dequeue 方法用於出隊 channel 的發送 / 接收隊列的一個元素 (出隊操作),內部實現就是鏈表操作。

func (q *waitq) dequeue() *sudog {
   ...
}

小結

本文着重介紹了 channel 的運行時數據結構和常見的三個操作 (發送數據、接收數據、關閉 channel) 對應的底層算法實現,標準庫中 channel 文件源代碼有將近 900 行, 但是核心在於 hchan 結構體以及圍繞該結構體實現的各個函數方法,重點是 hchan 結構體中的 環形隊列發送/接收索引, 發送/接收鏈表 字段, 理解了這 3 個字段對應的數據結構和算法,channel 的設計與實現也就完全理解了。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QHE6Dvf36frADRrTz18W9g