channel 的 15 條規則和底層實現
概述
下面表格中的內容是 Go 語言中 channel
數據類型的使用規則,相信讀者已經可以熟練掌握,本文主要分析 channel
的內部實現中的數據結構和算法,所以相關的基礎概念會直接跳過, 希望讀者閱讀完本文後,可以深入理解表格中的各類規則,從應用層代碼到底層實現,能夠知其然並知其所以然。
操作規則
編譯規則
channel
的內部實現文件路徑爲 $GOROOT/src/runtime/chan.go
,筆者的 Go 版本爲 go1.19 linux/amd64
。
幾個常量
const (
// 內存對齊的最大值,這個等於 64 位 CPU 下的 cacheline 的大小
maxAlign = 8
// 計算 unsafe.Sizeof(hchan{}) 最接近的 8 的倍數
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
// 是否開啓 debug 模式
debugChan = false
)
hchan 對象
hchan
對象表示運行時的 channel
。
對於無緩衝 channel
來說,發送隊列和接收隊列至少有一個爲空,一個無緩衝 channel
和一個阻塞在該 channel
上面的 goroutine
,使用 select
語句發送和接收。
對於有緩衝 channel
來說,qcount > 0
意味着接收隊列爲空,qcount < dataqsiz
意味着發送隊列爲空。
type hchan struct {
qcount uint // channel 元素數量
dataqsiz uint // channel 緩衝區環形隊列長度
buf unsafe.Pointer // 指向緩衝區的底層數組 (針對有緩衝的 channel)
elemsize uint16 // channel 元素大小
closed uint32 // 是否關閉
elemtype *_type // channel 元素類型
sendx uint // 當前已發送元素在隊列中的索引
recvx uint // 當前已接收元素在隊列中的索引
recvq waitq // 接收 goroutine 隊列 (數據結構是鏈表)
sendq waitq // 發送 goroutine 隊列 (數據結構是鏈表)
// lock 保護結構體中的所有字段,以及 sudogs 對象中被當前 channel 阻塞的幾個字段
// 不要在持有鎖時修改另一個 goroutine 的狀態(特別是沒有進入 ready 狀態的 goroutine)
// 因爲這會導致棧收縮而發生死鎖
lock mutex
}
channel 運行時結構體
上面的圖片展示了一個典型的 channel
數據結構圖,其中各元素表示爲:
-
緩衝區環形隊列長度爲 8, 也就是最多可以存放 8 個數據
-
發送索引指向 7,接收索引指向 0,說明當前緩存隊列已滿,無法再放入數據了,此時新的發送 / 接收
goroutine
會進入發送 / 接收隊列 -
發送隊列中有 3 個
goroutine
等待發送 -
接收隊列中有 5 個
goroutine
等待接收
waitq 對象
waitq
對象表示因爲 channel
緩衝區空間不足而陷入等待的 goroutine
發送 / 接收隊列, 數據結構是雙向鏈表,其中頭節點和尾節點都是 sudog
對象,sudog
對象的字段和具體作用在之前的 GMP 調度器 - 數據結構
一文中已經講過,這裏不再贅述。
type waitq struct {
first *sudog
last *sudog
}
讀者可以停下來思考一個問題: 同一個 goroutine 有可能同時出現在發送隊列和接收隊列嗎?爲什麼?
創建 channel
編譯器會將應用層代碼中的 make(chan type, N)
語句轉換爲 makechan
函數調用。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 由編譯器檢查保證元素大小不能大於等於 64K
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 檢測 hchan 結構體大小是否是 maxAlign 的整數倍
// 並且元素的對齊單位不能超過最大對齊單位
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 檢測內存是否超過限制
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 當存儲在 buf 中的元素不包含指針時,可以消除 GC 掃描
var c *hchan
switch {
case mem == 0:
// 如果是無緩衝 channel
// 僅爲 hchan 分配內存空間
c = (*hchan)(mallocgc(hchanSize, nil, true))
// data race detector 使用當前作爲檢測點進行同步
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 如果 channel 中的元素不包含指針
// 爲 hchan 結構體和 buf 字段分配一段連續的內存空間
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 如果 channel 中的元素包含指針
// 分別爲 hchan 結構體和 buf 字段單獨分配內存空間
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 設置 channel 元素大小
c.elemsize = uint16(elem.size)
// 設置 channel 元素類型
c.elemtype = elem
// 設置 channel 緩衝區長度
c.dataqsiz = uint(size)
if debugChan {
// 如果開啓了 debug 模式
// 打印初始化信息
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
發送數據
chansend 方法
編譯器會將應用層代碼中的 c <- x
語句轉換爲 chansend1
函數調用。
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
// 編譯器將
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// 轉換爲
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
chansend1
和 selectnbsend
函數內部調用的都是 chansend
函數, chansend
函數向 channel
發送數據,並且返回是否發送成功。
chansend
函數內部的 channel
處理邏輯分爲兩種:
-
阻塞發送
-
非阻塞發送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// channel == nil
// 例如
// var a chan int
// a <- 1
if c == nil {
if !block {
// 非阻塞模式下直接返回
return false
}
// nil channel 發送數據會永久阻塞
// 掛起當前 goroutine
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// channel 非阻塞且未關閉
// 並且緩衝區已滿,直接返回
if !block && c.closed == 0 && full(c) {
return false
}
// 加鎖 (注意後續代碼中不同條件下的解鎖處理細節)
lock(&c.lock)
// channel 已經關閉,拋出 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果存在等待接收的 goroutine
// 將數據發送給等待接收的 goroutine 後,直接返回
if sg := c.recvq.dequeue(); sg != nil {
// 將數據發送給隊列第一個 goroutine
// 將數據直接傳遞給 goroutine,繞過 channel 緩衝區 (類似零拷貝的設計理念)
// 詳情見: send 函數
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// qcount 是隊列當前元素數量
// dataqsiz 是隊列總長度
// 當前元素數量小於隊列總長度時,說明還有空閒空間可供使用
if c.qcount < c.dataqsiz {
// 緩衝區未滿,還有可用空間
// 獲取下一個可以存放數據的地址 (緩衝區槽位)
qp := chanbuf(c, c.sendx)
// 將發送的數據拷貝到緩衝區
typedmemmove(c.elemtype, qp, ep)
// 發送索引 + 1
c.sendx++
// 環形隊列,當發送索引等於隊列長度時,索引重置爲 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩衝區元素數量 + 1
c.qcount++
// 解鎖
unlock(&c.lock)
return true
}
// 隊列沒有空閒空間可供使用
// 直接返回
if !block {
unlock(&c.lock)
return false
}
// --------------------------
// 接下來的流程針對的是阻塞的情況
// --------------------------
// 獲取當前發送數據的 goroutine
// 然後綁定到一個 sudog 結構體 (包裝爲運行時表示)
gp := getg()
// 獲取 sudog 結構體
// 並且設置相關字段 (包括當前的 channel,是否是 select 等)
mysg := acquireSudog()
mysg.g = gp
mysg.isSelect = false
mysg.c = c
// 將 sudog 放入發送隊列
c.sendq.enqueue(mysg)
// 掛起當前 goroutine, 進入休眠 (等待接收)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 確保發送的值一直處於有效狀態,直到接收方將其複製出來
// sudog 有一個指向棧對象的指針,保持發送的數據處於活躍狀態,避免被 GC
KeepAlive(ep)
// 取消 sudog 和 channel 綁定關係
mysg.c = nil
// 釋放 sudog
releaseSudog(mysg)
if closed {
// goroutine 被喚醒後發現 channel 已關閉, 拋出 panic
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
send 函數
send
函數用於處理 channel
數據的發送操作,函數會調用 sendDirect
直接將發送方的數據複製到接收方,或將等待接收的 goroutine
喚醒。
-
參數
sg
表示接收方goroutine
-
參數
ep
表示要發送的數據
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
// 直接拷貝數據
sendDirect(c.elemtype, sg, ep)
}
...
// 調用 goready 函數將接收方 goroutine 喚醒並標記爲可運行狀態
// 並把其放入發送方所在處理器 P 的 runnext 字段等待執行
// runnext 字段表示最高優先級的 goroutine (GMP 調度器一文中講過)
goready(gp, skip+1)
}
sendDirect 函數
sendDirect
函數用於 channel
具體的發送數據操作,將發送方 goroutine
的數據直接寫入到接收方 goroutine
。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
...
}
阻塞發送
channel
阻塞發送時,將 sudog
結構體放入發送隊列:
channel 阻塞發送示例圖
非阻塞發送
channel
非阻塞發送時,分爲兩種情況:
-
緩衝區未滿,直接將數據存入緩衝區
-
緩衝區已滿,將
sudog
結構體放入發送隊列
channel 非阻塞發送情況一示例圖
channel 非阻塞發送情況二示例圖
小結
channel 發送數據的條件分支:
-
如果
channel == nil
, 非阻塞模式直接返回,阻塞模式,休眠當前goroutine
-
如果
channel
爲非阻塞模式並且channel
未關閉,同時緩衝區已滿,直接返回 -
如果
channel
已經關閉,發生panic
-
如果
channel
接收隊列不爲空, 出隊第一個元素作爲接收方goroutine
,將數據發送給接收方goroutine
後,直接返回 -
如果
channel
緩衝區未滿,將數據存入緩衝區,直接返回 -
如果以上條件都不滿足,就獲取一個新的
sudog
結構體並放入channel
的發送隊列,同時掛起當前發送數據的goroutine
, 進入休眠 (等待接收方接收數據)
接收數據
編譯器會將應用層代碼中的 <- ch
語句轉換爲 chanrecv1
函數調用。
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
編譯器會將應用層代碼中的 x, ok <- ch
語句轉換爲 chanrecv2
函數調用。
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// 編譯器將
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// 轉換爲
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
chanrecv1
和 chanrecv2
以及 selectnbrecv
函數內部調用的都是 chanrecv
函數。
chanrecv
函數用於在 channel
上接收數據並將接收到的數據寫入參數 ep
(ep 可以設置爲 nil, 這種情況下接收到的數據將會被忽略),並有兩個返回值:
-
selected
用於在select{}
語句中表示是否會選中該分支 -
received
表示是否接收到了數據
根據參數的不同返回不同的值:
-
如果
block == false
並且沒有數據可用,返回 false, false -
如果
channel
已經關閉,返回數據的零值和 false -
如果上述兩種條件都不滿足(說明有數據可用並且 channel 未關閉),將數據賦值給參數 *ep 然後返回 true, true
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
// 非阻塞的情況下,直接返回
// 非阻塞出現在 select{} + default 場景
return
}
// 在 nil channel 上進行接收操作,會永久阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable") // 疑問:這行代碼能執行到嗎?
}
// 非阻塞模式並且接收數據操作會阻塞
// empty 函數返回 true 的情況:
// 1. 無緩衝 channel 並且沒有發送方正在阻塞
// 2. 有緩衝 channel 並且緩衝區沒有數據
if !block && empty(c) {
// 接下來再判斷 channel 是否已經關閉
if atomic.Load(&c.closed) == 0 {
// 如果是未關閉的 channel, 非阻塞且沒有可接收數據的情況下,直接返回
// 因爲 channel 關閉後就無法再打開
// 所以只要 channel 未關閉,上述方法都是原子操作 (看到的結果都是一樣的)
return
}
// 執行到這裏,說明 channel 已經關閉
// channel 關閉後就無法再打開
// 重新檢查 channel 是否存在等待接收的數據
if empty(c) {
// 沒有任何等待接收的數據
if ep != nil {
typedmemclr(c.elemtype, ep) // 清理 ep 指針中的數據
}
return true, false
}
}
...
// 加鎖 (注意後續代碼中不同條件下的解鎖處理細節)
lock(&c.lock)
if c.closed != 0 { // channel 已經關閉
if c.qcount == 0 { // 緩衝區也沒有數據了
unlock(&c.lock) // 解鎖
if ep != nil {
typedmemclr(c.elemtype, ep) // 清理 ep 指針中的數據
}
return true, false
}
} else {
// 先檢測發送的隊列是否不爲空
// 不爲空說明有阻塞在等待發送的 goroutine
if sg := c.sendq.dequeue(); sg != nil {
// 出隊發送隊列第一個 goroutine
// 如果緩衝區還有剩餘的可用空間,直接從發送 goroutine 接收數據
// 否則,從接收隊列頭部的 goroutine 開始接收數據,並將數據添加到發送隊列尾部的 goroutine
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 如果 channel 緩衝區還有數據
if c.qcount > 0 {
// 獲取 channel 接收地址
qp := chanbuf(c, c.recvx)
if ep != nil {
// 直接拷貝數據到接收地址
typedmemmove(c.elemtype, ep, qp)
}
// 清除緩衝區數據
typedmemclr(c.elemtype, qp)
// 接收索引 + 1
c.recvx++
// 環形隊列,當索引等於隊列長度時,索引重置爲 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 緩衝區元素數量 - 1
c.qcount--
// 解鎖
unlock(&c.lock)
return true, true
}
// 如果是非阻塞並且無數據可接收
// 直接返回
if !block {
unlock(&c.lock)
return false, false
}
// --------------------------
// 接下來的流程針對的是阻塞的情況
// --------------------------
// 獲取當前發送數據的 goroutine
// 然後綁定到一個 sudog 結構體 (包裝爲運行時表示)
gp := getg()
// 獲取 sudog 結構體
// 並且設置相關數據 (包括當前的 channel,是否是 select 等)
mysg := acquireSudog()
mysg.g = gp
mysg.isSelect = false
mysg.c = c
// 將 sudog 放入接收隊列
c.recvq.enqueue(mysg)
// 掛起當前 goroutine, 進入休眠 (等待發送方發送數據)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 取消 sudog 和 channel 綁定關係
mysg.c = nil
// 釋放 sudog
releaseSudog(mysg)
return true, success
}
recv 函數
recv
函數用於處理 channel
的數據接收操作。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 無緩衝 channel
if c.dataqsiz == 0 {
if ep != nil {
// 直接從發送方拷貝數據
recvDirect(c.elemtype, sg, ep)
}
} else {
// 獲取緩衝區首元素
qp := chanbuf(c, c.recvx)
if ep != nil {
// 從緩衝區拷貝數據到接收方
typedmemmove(c.elemtype, ep, qp)
}
// 從發送方拷貝數據到緩衝區
typedmemmove(c.elemtype, qp, sg.elem)
// 接收索引 + 1
c.recvx++
// 環形隊列,當索引等於隊列長度時,索引重置爲 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 除了更新接收索引外,還要更新發送索引 (賦值爲更新後的接收索引值)
// 這樣下次寫入發送數據時,才能保證寫入位置正確
c.sendx = c.recvx
}
...
// 調用 goready 函數將接收方 goroutine 喚醒並標記爲可運行狀態
// 並把其放入接收方所在處理器 P 的 runnext 字段等待執行
goready(gp, skip+1)
}
recvDirect 函數
recvDirect
函數和 sendDirect
函數作用一致,這裏不再贅述。
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
...
}
阻塞接收
channel 阻塞接收示例圖
非阻塞接收
channel
非阻塞接收時,分爲兩種情況:
-
緩衝區不爲空,直接從緩衝區讀取數據
-
緩衝區爲空,將
sudog
結構體放入接收隊列
channel 非阻塞發送情況一示例圖
channel 非阻塞發送情況二示例圖
小結
channel 接收數據的條件分支:
-
如果
channel == nil
, 非阻塞模式直接返回,阻塞模式,休眠當前goroutine
-
如果
channel
已經關閉或者緩衝區沒有等待接收的數據,直接返回 -
如果
channel
發送隊列不爲空, 出隊第一個元素作爲發送方goroutine
,將數據發送給接收方goroutine
後,直接返回 -
如果
channel
緩衝區有數據,直接從緩衝區讀取數據 -
如果以上條件都不滿足,就獲取一個新的
sudog
結構體並放入channel
的接收隊列,同時掛起當前發送數據的goroutine
, 進入休眠 (等待發送方發送數據)
關閉 channel
編譯器會將應用層代碼中的 clsoe(channel name)
語句轉換爲 closechan
函數調用。
func closechan(c *hchan) {
// 關閉一個 nil channel, 拋出 panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 加鎖,這個鎖的粒度比較大
// 會持續到釋放完所有的 sudog 才解鎖
lock(&c.lock)
// 關閉一個已經關閉的 channel, 拋出 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
// 設置 channel 狀態爲已關閉
c.closed = 1
// goroutine 列表
// 用於存放發送+接收隊列中的所有 goroutine
var glist gList
// 將接收隊列中所有 goroutine 加入 gList 列表
for {
sg := c.recvq.dequeue()
// 出隊的 sudog 爲 nil
// 說明接收隊列爲空,直接跳出循環
if sg == nil {
break
}
// 將 sg 對應的 goroutine 添加到 glist 列表
glist.push(gp)
}
// 將發送隊列中所有 goroutine 加入 gList 列表
// 當然,因爲 channel 已經關閉,所以這些 goroutine 被喚醒後發生數據時會直接 panic
for {
sg := c.sendq.dequeue()
// 出隊的 sudog 爲 nil
// 說明發送隊列爲空,直接跳出循環
if sg == nil {
break
}
// 將 sg 對應的 goroutine 添加到 glist 列表
glist.push(gp)
}
// 解鎖
unlock(&c.lock)
// 將出隊的所有 goroutine 設置爲可運行狀態
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
示意圖
這裏需要注意的是: gList
是一個棧數據結構 (後進先出),所以調用 glist.pop
方法時,首先出隊的是發送隊列的最後一個 goroutine
, 最後出隊的是接收隊列的第一個 goroutine
(圖中兩條青色的線條),雖然順序相反,但是數據的發送 / 接收不會受到影響。
channel 關閉
輔助函數
empty
empty
函數檢測從 channel
讀取數據是否會阻塞 (也就是檢測 channel
緩衝區是否爲空),主要分爲兩種情況:
-
如果
channel
沒有緩衝區,查看是否存在發送數據goroutine
-
如果
channel
有緩衝區,檢查元素數量是否等於 0
func empty(c *hchan) bool {
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}
chanbuf
chanbuf
函數用於獲取緩衝區下一個地址 (緩衝區槽位),chanbuf(c, i)
表示指向緩衝區中第 i 個槽位的指針。
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
full
full
函數檢測 channel
緩衝區是否已滿,主要分爲兩種情況:
-
如果
channel
沒有緩衝區,查看是否存在接收者 -
如果
channel
有緩衝區, 比較元素數量和緩衝區長度是否一致
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil
}
return c.qcount == c.dataqsiz
}
enqueue
enqueue
方法用於將 goroutine
放入 channel
的發送 / 接收隊列 (入隊操作),內部實現就是鏈表操作。
func (q *waitq) enqueue(sgp *sudog) {
...
}
dequeue
dequeue
方法用於出隊 channel
的發送 / 接收隊列的一個元素 (出隊操作),內部實現就是鏈表操作。
func (q *waitq) dequeue() *sudog {
...
}
小結
本文着重介紹了 channel
的運行時數據結構和常見的三個操作 (發送數據、接收數據、關閉 channel) 對應的底層算法實現,標準庫中 channel
文件源代碼有將近 900 行, 但是核心在於 hchan
結構體以及圍繞該結構體實現的各個函數方法,重點是 hchan
結構體中的 環形隊列
、發送/接收索引
, 發送/接收鏈表
字段, 理解了這 3 個字段對應的數據結構和算法,channel
的設計與實現也就完全理解了。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/QHE6Dvf36frADRrTz18W9g