學習 channel 設計:從入門到放棄
前言
哈嘍,大家好,我是
asong
。終於迴歸了,停更了兩週了,這兩週一直在搞留言號的事,經過漫長的等待,終於搞定了。兄弟們,以後就可以在留言區盡情開噴了,只要你敢噴,我就敢精選🐶。(因爲發生了賬號遷移,需點擊右上角重新添加星標,優質文章第一時間獲取!)今天給大家帶來的是
Go
語言中的channel
。Go
語言從出世以來就以高併發著稱,得益於其Goroutine
的設計,Goroutine
也就是一個可執行的輕量級協程,有了Goroutine
我們可以輕鬆的運行協程,但這並不能滿足我們的需求,我們往往還希望多個線程 / 協程是能夠通信的,Go
語言爲了支持多個Goroutine
通信,設計了channel
,本文我們就一起從GO1.15
的源碼出發,看看channel
到底是如何設計的。好啦,開往幼兒園的列車就要開了,朋友們繫好安全帶,我要開車啦🐶
什麼是 channel
通過開頭的介紹我們可以知道channel
是用於goroutine
的數據通信,在Go
中通過goroutine+channel
的方式,可以簡單、高效地解決併發問題。我們先來看一下簡單的示例:
func GoroutineOne(ch chan <-string) {
fmt.Println("GoroutineOne running")
ch <- "asong真帥"
fmt.Println("GoroutineOne end of the run")
}
func GoroutineTwo(ch <- chan string) {
fmt.Println("GoroutineTwo running")
fmt.Printf("女朋友說:%s\n",<-ch)
fmt.Println("GoroutineTwo end of the run")
}
func main() {
ch := make(chan string)
go GoroutineOne(ch)
go GoroutineTwo(ch)
time.Sleep(3 * time.Second)
}
// 運行結果
// GoroutineOne running
// GoroutineTwo running
// 女朋友說:asong真帥
// GoroutineTwo end of the run
// GoroutineOne end of the run
這裏我們運行了兩個Goroutine
,在GoroutineOne
中我們向channel
中寫入數據,在GoroutineTwo
中我們監聽channel
,直到讀取到 "asong 真帥"。我們可以畫一個簡單的圖來表明一下這個順序:
上面的例子是對無緩衝channel
的一個簡單應用,其實channel
的使用語法還是挺多的,下面且聽我慢慢道來,畢竟是從入門到放棄嘛,那就先從入門開始。
入門channel
channel
類型
channel
有三種類型的定義,分別是:chan
、chan <-
、<- chan
,可選的<-
代表channel
的方向,如果我們沒有指定方向,那麼channel
就是雙向的,既可以接收數據,也可以發送數據。
chan T // 接收和發送類型爲T的數據
chan<- T // 只可以用來發送 T 類型的數據
<- chan T // 只可以用來接收 T 類型的數據
創建channel
我們可以使用make
初始化channel
,可以創建兩種兩種類型的channel
:無緩衝的channel
和有緩衝的channel
。
示例:
ch_no_buffer := make(chan int)
ch_no_buffer := make(chan int, 0)
ch_buffer := make(chan int, 100)
沒有設置容量或者容量設置爲0
,則說明channel
沒有緩存,此時只有發送方和接收方都準備好後他們纔可以進行通訊,否則就是一直阻塞。如果容量設置大於0
,那就是一個帶緩衝的channel
,發送方只有buffer
滿了之後纔會阻塞,接收方只有緩存空了纔會阻塞。
注意:未初始化(爲 nil)的channel
是不可以通信的
func main() {
var ch chan string
ch <- "asong真帥"
fmt.Println(<- ch)
}
// 運行報錯
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]:
channel
入隊
channel
的入隊定義如下:
"channel" <- "要入隊的值(可以是表達式)"
在無緩衝的channel
中,只有在出隊方準備好後,channel
纔會入隊,否則一直阻塞着,所以說無緩衝channel
是同步的。
在有緩衝的channel
中,緩存未滿時,就會執行入隊操作。
向nil
的channel
中入隊會一直阻塞,導致死鎖。
channel
單個出隊
channel
的單個出隊定義如下:
<- "channel"
無論是有無緩衝的channel
在接收不到數據時都會阻塞,直到有數據可以接收。
從nil
的channel
中接收數據會一直阻塞。
channel
的出隊還有一種非阻塞寫法,定義如下:
val, ok := <-ch
這麼寫可以判斷當前channel
是否關閉,如果這個channel
被關閉了,ok
會被設置爲false
,val
就是零值。
channel
循環出隊
我們可以使用for-range
循環處理channel
。
func main() {
ch := make(chan int,10)
go func() {
for i:=0;i<10;i++{
ch <- i
}
close(ch)
}()
for val := range ch{
fmt.Println(val)
}
fmt.Println("over")
}
range ch
會一直迭代到channel
被關閉。在使用有緩衝channel
時,配合for-range
是一個不錯的選擇。
配合select
使用
Go
語言中的select
能夠讓Goroutine
同時等待多個channel
讀或者寫,在channel
狀態未改變之前,select
會一直阻塞當前線程或Goroutine
。先看一個例子:
func fibonacci(ch chan int, done chan struct{}) {
x, y := 0, 1
for {
select {
case ch <- x:
x, y = y, x+y
case <-done:
fmt.Println("over")
return
}
}
}
func main() {
ch := make(chan int)
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
done <- struct{}{}
}()
fibonacci(ch, done)
}
select
與switch
具有相似的控制結構,與switch
不同的是,select
中的case
中的表達式必須是channel
的收發操作,當select
中的兩個case
同時被觸發時,會隨機執行其中的一個。爲什麼是隨機執行的呢?隨機的引入就是爲了避免飢餓問題的發生,如果我們每次都是按照順序依次執行的,若兩個case
一直都是滿足條件的,那麼後面的case
永遠都不會執行。
上面例子中的select
用法是阻塞式的收發操作,直到有一個channel
發生狀態改變。我們也可以在select
中使用default
語句,那麼select
語句在執行時會遇到這兩種情況:
-
當存在可以收發的
Channel
時,直接處理該Channel
對應的case
; -
當不存在可以收發的
Channel
時,執行default
中的語句;
注意:nil channel
上的操作會一直被阻塞,如果沒有default case
, 只有nil channel
的select
會一直被阻塞。
關閉channel
內建的close
方法可以用來關閉channel
。如果channel
已經關閉,不可以繼續發送數據了,否則會發生panic
,但是從這個關閉的channel
中不但可以讀取出已發送的數據,還可以不斷的讀取零值。
func main() {
ch := make(chan int, 10)
ch <- 10
ch <- 20
close(ch)
fmt.Println(<-ch) //1
fmt.Println(<-ch) //2
fmt.Println(<-ch) //0
fmt.Println(<-ch) //0
}
channel
基本設計思想
channel
設計的基本思想是:不要通過共享內存來通信,而是通過通信來實現共享內存(Do not communicate by sharing memory; instead, share memory by communicating)。
這個思想大家是否理解呢?我在這裏分享一下我的理解 (查找資料 + 個人理解),有什麼不對的,留言區指正或開噴!
什麼是使用共享內存來通信?其實就是多個線程 / 協程使用同一塊內存,通過加鎖的方式來宣佈使用某塊內存,通過解鎖來宣佈不再使用某塊內存。
什麼是通過通信來實現共享內存?其實就是把一份內存的開銷變成兩份內存開銷而已,再說的通俗一點就是,我們使用發送消息的方式來同步信息。
爲什麼鼓勵使用通過通信來實現共享內存?使用發送消息來同步信息相比於直接使用共享內存和互斥鎖是一種更高級的抽象,使用更高級的抽象能夠爲我們在程序設計上提供更好的封裝,讓程序的邏輯更加清晰;其次,消息發送在解耦方面與共享內存相比也有一定優勢,我們可以將線程的職責分成生產者和消費者,並通過消息傳遞的方式將它們解耦,不需要再依賴共享內存。
對於這個理解更深的文章,建議讀一下這篇文章:爲什麼使用通信來共享內存
channel
在設計上本質就是一個有鎖的環形隊列,包括髮送方隊列、接收方隊列、互斥鎖等結構,下面我就一起從源碼出發,剖析這個有鎖的環形隊列是怎麼設計的!
源碼剖析
數據結構
在src/runtime/chan.go
中我們可以看到hchan
的結構如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
我們來解釋一下hchan
中每個字段都是什麼意思:
-
qcount
:循環數組中的元素數量 -
dataqsiz
:循環數組的長度 -
buf
:只針對有緩衝的channel
,指向底層循環數組的指針 -
elemsize
:能夠接收和發送的元素大小 -
closed
:channel
是否關閉標誌 -
elemtype
:記錄channel
中元素的類型 -
sendx
:已發送元素在循環數組中的索引 -
recvx
:已接收元素在循環數組中的索引 -
recvq
:等待接收的goroutine
隊列 -
senq
:等待發送的goroutine
隊列 -
lock
:互斥鎖,保護hchan
中的字段,保證讀寫channel
的操作都是原子的。
這個結構結合上面那個圖理解就更清晰了:
-
buf
是指向底層的循環數組,dataqsiz
就是這個循環數組的長度,qcount
就是當前循環數組中的元素數量,緩衝的channel
纔有效。 -
elemsize
和elemtype
就是我們創建channel
時設置的容量大小和元素類型。 -
sendq
、recvq
是一個雙向鏈表結構,分別表示被阻塞的goroutine
鏈表,這些 goroutine 由於嘗試讀取channel
或向channel
發送數據而被阻塞。
對於上面的描述,我們可以畫出來這樣的一個理解圖:
channel
的創建
前面介紹channel
入門的時候我們就說到了,我們使用make
進行創建,make
在經過編譯器編譯後對應的runtime.makechan
或runtime.makechan64
。爲什麼會有這個區別呢?先看一下代碼:
// go 1.15.7
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
runtime.makechan64
本質也是調用的makechan
方法,只不過多了一個數值溢出的校驗。runtime.makechan64
是用於處理緩衝區大於 2 的 32 方,所以這兩個方法會根據傳入的參數類型和緩衝區大小進行選擇。大多數情況都是使用makechan
。我們只需要分析makechan
函數就可以了。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 對發送元素進行限制 1<<16 = 65536
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 檢查是否對齊
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 判斷是否會發生內存溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 構造hchan對象
var c *hchan
switch {
// 說明是無緩衝的channel
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
// 元素類型不包含指針,只進行一次內存分配
// 如果hchan結構體中不含指針,gc就不會掃描chan中的元素,所以我們只需要分配
// "hchan 結構體大小 + 元素大小*個數" 的內存
case elem.ptrdata == 0:
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 元素包含指針,進行兩次內存分配操作
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化hchan中的對象
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
註釋我都添加上了,應該很容易懂吧,這裏在特殊說一下分配內存這塊的內容,其實歸一下類,就只有兩塊:
-
分配一次內存:若創建的
channel
是無緩衝的,或者創建的有緩衝的channel
中存儲的類型不存在指針引用,就會調用一次mallocgc
分配一段連續的內存空間。 -
分配兩次內存:若創建的有緩衝
channel
存儲的類型存在指針引用,就會連同hchan
和底層數組同時分配一段連續的內存空間。
因爲都是調用mallocgc
方法進行內存分配,所以channel
都是在堆上創建的,會進行垃圾回收,不關閉close
方法也是沒有問題的(但是想寫出漂亮的代碼就不建議你這麼做了)。
channel
入隊
channel
發送數據部分的代碼經過編譯器編譯後對應的是runtime.chansend1
,其調用的也是runtime.chansend
方法:
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
我們主要分析一下chansend
方法,代碼有點長,我們分幾個步驟來看這段代碼:
-
前置檢查
-
加鎖 / 異常檢查
-
channel
直接發送數據 -
channel
發送數據緩衝區有可用空間 -
channel
發送數據緩衝區無可用空間
前置檢查
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
這裏最主要的檢查就是判斷當前channel
是否爲nil
,往一個nil
的channel
中發送數據時,會調用gopark
函數將當前執行的goroutine
從running
狀態轉入waiting
狀態,這讓就會導致進程出現死鎖,表象出panic
事件。
緊接着會對非阻塞的channel
進行一個上限判斷,看看是否快速失敗,這裏相對於之前的版本做了改進,使用full
方法來對hchan
結構進行校驗。
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil
}
return c.qcount == c.dataqsiz
}
這裏快速失敗校驗邏輯如下:
-
若是
qcount
與dataqsiz
大小相同(緩衝區已滿)時,則會返回失敗。 -
非阻塞且未關閉,同時底層數據
dataqsiz
大小爲0
(無緩衝channel
),如果接收方沒準備好則直接返回失敗。
加鎖 / 異常檢查
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
前置校驗通過後,在發送數據的邏輯執行之前會先爲當前的channel
加鎖,防止多個協程併發修改數據。如果Channel
已經關閉,那麼向該 Channel
發送數據時會報“send on closed channel”
錯誤並中止程序。
channel
直接發送數據
直接發送數據是指 如果已經有阻塞的接收goroutines
(即recvq
中指向非空),那麼數據將被直接發送給接收goroutine
。
if sg := c.recvq.dequeue(); sg != nil {
//找到一個等待的接收器。我們將想要發送的值直接傳遞給接收者,繞過通道緩衝區(如果有的話)。
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
這裏主要是調用Send
方法,我們來看一下這個函數:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 靜態競爭省略掉
// elem是指接收到的值存放的位置
if sg.elem != nil {
// 調用sendDirect方法直接進行內存拷貝
// 從發送者拷貝到接收者
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 綁定goroutine
gp := sg.g
// 解鎖
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒接收的 goroutine
goready(gp, skip+1)
}
我們再來看一下SendDirect
方法:
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
這裏調用了memove
方法進行內存拷貝,這裏是從一個 goroutine
直接寫另一個 goroutine
棧的操作,這樣做的好處是減少了一次內存 copy
:不用先拷貝到 channel
的buf
,直接由發送者到接收者,沒有中間商賺差價,效率得以提高,完美。
channel
發送數據緩衝區有可用空間
接着往下看代碼,判斷channel
緩衝區是否還有可用空間:
// 判斷通道緩衝區是否還有可用空間
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
// 指向下一個待發送元素在循環數組中的位置
c.sendx++
// 因爲存儲數據元素的結構是循環隊列,所以噹噹前索引號已經到隊末時,將索引號調整到隊頭
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 當前循環隊列中存儲元素數+1
c.qcount++
// 釋放鎖,發送數據完畢
unlock(&c.lock)
return true
}
這裏的幾個步驟還是挺好理解的,註釋已經添加到代碼中了,我們再來詳細解析一下:
-
如果當前緩衝區還有可用空間,則調用
chanbuf
方法獲取底層緩衝數組中sendx
索引的元素指針值 -
調用
typedmemmove
方法將發送的值拷貝到緩衝區中 -
數據拷貝成功,
sendx
進行 + 1 操作,指向下一個待發送元素在循環數組中的位置。如果下一個索引位置正好是循環隊列的長度,那麼就需要把所謂位置歸 0,因爲這是一個循環環形隊列。 -
發送數據成功後,隊列元素長度自增,至此發送數據完畢,釋放鎖,返回結果即可。
channel
發送數據緩衝區無可用空間
緩衝區空間也會有滿了的時候,這是有兩種方式可以選擇,一種是直接返回,另外一種是阻塞等待。
直接返回的代碼就很簡單了,做一個簡單的是否阻塞判斷,不阻塞的話,直接釋放鎖,返回即可。
if !block {
unlock(&c.lock)
return false
}
阻塞的話代碼稍微長一點,我們來分析一下:
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
首先通過調用gettg
獲取當前執行的goroutine
,然後調用acquireSudog
方法構造sudog
結構體,然後設置待發送信息和goroutine
等信息(sudog
通過 g
字段綁定 goroutine
,而goroutine
通過waiting
綁定 sudog
,sudog
還通過 elem
字段綁定待發送元素的地址);構造完畢後調用c.sendq.enqueue
將其放入待發送的等待隊列,最後調用gopark
方法掛起當前的goroutine
進入wait
狀態。
這裏在最後調用了KeepAlive
方法,很多人對這個比較懵逼,我來解釋一下。這個方法就是爲了保證待發送的數據處於活躍狀態,也就是分配在堆上避免被 GC。這裏我在畫一個圖解釋一下上面的綁定過程,更加深理解。
現在goroutine
處於wait
狀態了,等待被喚醒,喚醒代碼如下:
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 喚醒後channel被關閉了,直接panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉mysg上綁定的channel
mysg.c = nil
// 釋放sudog
releaseSudog(mysg)
return true
喚醒的邏輯比較簡單,首先判斷goroutine
是否還存在,不存在則拋出異常。喚醒後還有一個檢查是判斷當前channel
是否被關閉了,關閉了則觸發panic
。最後我們開始取消mysg
上的channel
綁定和sudog
的釋放。
這裏大家肯定好奇,怎麼沒有看到喚醒後執行發送數據動作?之所以有這個想法,就是我們理解錯了。在上面我們已經使goroutine
進入了wait
狀態,那麼調度器在停止g
時會記錄運行線程和方法內執行的位置,也就是這個ch <- "asong"
位置,喚醒後會在這個位置開始執行,代碼又開始重新執行了,但是我們之前進入wait
狀態的綁定是要解綁與釋放的,否則下次進來就會出現問題嘍。
接收數據
之前我們介紹過channel
接收數據有兩種方式,如下:
val := <- ch
val, ok := <- ch
它們在經過編譯器編譯後分別對應的是runtime.chanrecv1
和 runtime.chanrecv2
:
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
其實都是調用chanrecv
方法,所以我們只需要解析這個方法就可以了。接收部分的代碼和接收部分的代碼是相對應的,所以我們也可以分幾個步驟來看這部分代碼:
-
前置檢查
-
加鎖和提前返回
-
channel
直接接收數據 -
channel
緩衝區有數據 -
channel
緩衝區無數據
前置檢查
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
首先也會判斷當前channel
是否爲nil channel
,如果是nil channel
且爲非阻塞接收,則直接返回即可。如果是nil channel
且爲阻塞接收,則直接調用gopark
方法掛起當前goroutine
。
然後也會進行快速失敗檢查,這裏只會對非阻塞接收的channel
進行快速失敗檢查,檢查規則如下:
func empty(c *hchan) bool {
// c.dataqsiz is immutable.
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}
當循環隊列爲 0
且等待隊列 sendq
內沒有 goroutine
正在等待或者緩衝區數組爲空時,如果channel
還未關閉,這說明沒有要接收的數據,直接返回即可。如果channel
已經關閉了且緩存區沒有數據了,則會清理ep
指針中的數據並返回。這裏爲什麼清理ep
指針呢?ep
指針是什麼?這個ep
就是我們要接收的值存放的地址(val := <-ch val
就是ep
),即使channel
關閉了,我們也可以接收零值。
加鎖和提前返回
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
前置校驗通過後,在執行接收數據的邏輯之前會先爲當前的channel
加鎖,防止多個協程併發接收數據。同樣也會判斷當前channel
是否被關閉,如果channel
被關閉了,並且緩存區沒有數據了,則直接釋放鎖和清理ep
中的指針數據,不需要再走接下來的流程。
channel
直接接收數據
這一步與channel
直接發送數據是對應的,當發現channel
上有正在阻塞等待的發送方時,則直接進行接收。
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
等待發送隊列裏有goroutine
存在,有兩種可能:
-
非緩衝的
channel
-
緩衝的
channel
,但是緩衝區滿了
針對這兩種情況,在recv
方法中的執行邏輯是不同的:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 非緩衝channel
if c.dataqsiz == 0 {
// 未忽略接收值
if ep != nil {
// 直接從發送方拷貝數據到接收方
recvDirect(c.elemtype, sg, ep)
}
} else { // 有緩衝channel,但是緩衝區滿了
// 緩衝區滿時,接收方和發送方遊標重合了
// 因爲是循環隊列,都是遊標0的位置
// 獲取當前接收方遊標位置下的值
qp := chanbuf(c, c.recvx)
// 未忽略值的情況下直接從發送方拷貝數據到接收方
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 將發送者數據拷貝到緩衝區中
typedmemmove(c.elemtype, qp, sg.elem)
// 自增到下一個待接收位置
c.recvx++
// 如果下一個待接收位置等於隊列長度了,則下一個待接收位置爲隊頭,因爲是循環隊列
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 上面已經將發送者數據拷貝到緩衝區中了,所以緩衝區還是滿的,所以發送方位置仍然等於接收方位置。
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
// 綁定發送方goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒發送方的goroutine
goready(gp, skip+1)
}
代碼中的註釋已經很清楚了,但還是想在解釋一遍,這裏主要就是分爲兩種情況:
-
非緩衝區
channel
:未忽略接收值時直接調用recvDirect
方法直接從發送方的goroutine
調用棧中將數據拷貝到接收方的goroutine
。 -
帶緩衝區的
channel
:首先調用chanbuf
方法根據recv
索引的位置讀取緩衝區元素,並將其拷貝到接收方的內存地址;拷貝完畢後調整sendx
和recvx
索引位置。
最後別忘了還有一個操作就是調用goready
方法喚醒發送方的goroutine
可以繼續發送數據了。
channel
緩衝區有數據
我們接着往下看代碼,若當前channel
的緩衝區有數據時,代碼邏輯如下:
// 緩衝channel,buf裏有可用元素,發送方也可以正常發送
if c.qcount > 0 {
// 直接從循環隊列中找到要接收的元素
qp := chanbuf(c, c.recvx)
// 未忽略接收值,直接把緩衝區的值拷貝到接收方中
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循環數組裏相應位置的值
typedmemclr(c.elemtype, qp)
// 接收遊標向前移動
c.recvx++
// 超過循環隊列的長度時,接收遊標歸0(循環隊列)
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 循環隊列中的數據數量減1
c.qcount--
// 接收數據完畢,釋放鎖
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
這段代碼沒什麼難度,就不再解釋一遍了。
channel
緩衝區無數據
經過上面的步驟,現在可以確定目前這個channel
既沒有待發送的goroutine
,並且緩衝區也沒有數據。接下來就看我們是否阻塞等待接收數據了,也就有了如下判斷:
if !block {
unlock(&c.lock)
return false, false
}
非阻塞接收數據的話,直接返回即可;否則則進入阻塞接收模式:
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
這一部分的邏輯基本與發送阻塞部分一模一樣,大概邏輯就是獲取當前的goroutine
,然後構建sudog
結構保存待接收數據的地址信息和goroutine
信息,並將sudog
加入等待接收隊列,最後掛起當前goroutine
,等待喚醒。
接下來的環境邏輯也沒有特別要說的,與發送方喚醒部分一模一樣,不懂的可以看前面。喚醒後的主要工作就是恢復現場,釋放綁定信息。
關閉channel
使用close
可以關閉channel
,其經過編譯器編譯後對應的是runtime.closechan
方法,詳細邏輯我們通過註釋到代碼中:
func closechan(c *hchan) {
// 對一個nil的channel進行關閉會引發panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 加鎖
lock(&c.lock)
// 關閉一個已經關閉的channel也會引發channel
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// 關閉channnel標誌
c.closed = 1
// Goroutine集合
var glist gList
// 接受者的 sudog 等待隊列(recvq)加入到待清除隊列 glist 中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 發送方的sudog也加入到到待清除隊列 glist 中
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 要關閉的goroutine,發送的值設爲nil
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放了發送方和接收方後,釋放鎖就可以了。
unlock(&c.lock)
// 將所有 glist 中的 goroutine 狀態從 _Gwaiting 設置爲 _Grunnable 狀態,等待調度器的調度。
// 我們既然是從sendq和recvq中獲取的goroutine,狀態都是掛起狀態,所以需要喚醒他們,走後面的流程。
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
這裏邏輯還是比較簡單,歸納總結一下:
-
一個爲
nil
的channel
不允許進行關閉 -
不可以重複關閉
channel
-
獲取當前正在阻塞的發送或者接收的
goroutine
,他們都處於掛起狀態,然後進行喚醒。這是發送方不允許在向channel
發送數據了,但是不影響接收方繼續接收元素,如果沒有元素,獲取到的元素是零值。使用val,ok := <-ch
可以判斷當前channel
是否被關閉。
總結
哇塞,開往幼兒園的車終於停了,小松子嘮嘮叨叨一路了,你們學會了嗎?
我們從入門開始到最後的源碼剖析,其實channel
的設計一點也不復雜,源碼也是很容易看懂的,本質就是維護了一個循環隊列嘛,發送數據遵循 FIFO(First In First Out)原語,數據傳遞依賴於內存拷貝。不懂的可以再看一遍,很容易理解的哦~。
最後我想說的是:channel
內部也是使用互斥鎖,那麼channel
和互斥鎖誰更輕量呢?(評論區我們一起探討一下)。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/U0dvX3NYrd_ka7w5Pe8oQQ