學習 channel 設計:從入門到放棄

前言

哈嘍,大家好,我是asong。終於迴歸了,停更了兩週了,這兩週一直在搞留言號的事,經過漫長的等待,終於搞定了。兄弟們,以後就可以在留言區盡情開噴了,只要你敢噴,我就敢精選🐶。(因爲發生了賬號遷移,需點擊右上角重新添加星標,優質文章第一時間獲取!)

今天給大家帶來的是Go語言中的channelGo語言從出世以來就以高併發著稱,得益於其Goroutine的設計,Goroutine也就是一個可執行的輕量級協程,有了Goroutine我們可以輕鬆的運行協程,但這並不能滿足我們的需求,我們往往還希望多個線程 / 協程是能夠通信的,Go語言爲了支持多個Goroutine通信,設計了channel,本文我們就一起從GO1.15的源碼出發,看看channel到底是如何設計的。

好啦,開往幼兒園的列車就要開了,朋友們繫好安全帶,我要開車啦🐶

什麼是 channel

通過開頭的介紹我們可以知道channel是用於goroutine的數據通信,在Go中通過goroutine+channel的方式,可以簡單、高效地解決併發問題。我們先來看一下簡單的示例:

func GoroutineOne(ch chan <-string)  {
 fmt.Println("GoroutineOne running")
 ch <- "asong真帥"
 fmt.Println("GoroutineOne end of the run")
}

func GoroutineTwo(ch <- chan string)  {
 fmt.Println("GoroutineTwo running")
 fmt.Printf("女朋友說:%s\n",<-ch)
 fmt.Println("GoroutineTwo end of the run")
}


func main()  {
 ch := make(chan string)
 go GoroutineOne(ch)
 go GoroutineTwo(ch)
 time.Sleep(3 * time.Second)
}
// 運行結果
// GoroutineOne running
// GoroutineTwo running
// 女朋友說:asong真帥
// GoroutineTwo end of the run
// GoroutineOne end of the run

這裏我們運行了兩個Goroutine,在GoroutineOne中我們向channel中寫入數據,在GoroutineTwo中我們監聽channel,直到讀取到 "asong 真帥"。我們可以畫一個簡單的圖來表明一下這個順序:

上面的例子是對無緩衝channel的一個簡單應用,其實channel的使用語法還是挺多的,下面且聽我慢慢道來,畢竟是從入門到放棄嘛,那就先從入門開始。

入門channel

channel類型

channel有三種類型的定義,分別是:chanchan <-<- chan,可選的<-代表channel的方向,如果我們沒有指定方向,那麼channel就是雙向的,既可以接收數據,也可以發送數據。

chan T // 接收和發送類型爲T的數據
chan<- T // 只可以用來發送 T 類型的數據
<- chan T // 只可以用來接收 T 類型的數據

創建channel

我們可以使用make初始化channel,可以創建兩種兩種類型的channel:無緩衝的channel和有緩衝的channel

示例:

ch_no_buffer := make(chan int)
ch_no_buffer := make(chan int, 0)
ch_buffer := make(chan int, 100)

沒有設置容量或者容量設置爲0,則說明channel沒有緩存,此時只有發送方和接收方都準備好後他們纔可以進行通訊,否則就是一直阻塞。如果容量設置大於0,那就是一個帶緩衝的channel,發送方只有buffer滿了之後纔會阻塞,接收方只有緩存空了纔會阻塞。

注意:未初始化(爲 nil)的channel是不可以通信的

func main()  {
 var ch chan string
 ch <- "asong真帥"
 fmt.Println(<- ch)
}
// 運行報錯
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]:

channel入隊

channel的入隊定義如下:

"channel" <- "要入隊的值(可以是表達式)"

在無緩衝的channel中,只有在出隊方準備好後,channel纔會入隊,否則一直阻塞着,所以說無緩衝channel是同步的。

在有緩衝的channel中,緩存未滿時,就會執行入隊操作。

nilchannel中入隊會一直阻塞,導致死鎖。

channel單個出隊

channel的單個出隊定義如下:

<- "channel"

無論是有無緩衝的channel在接收不到數據時都會阻塞,直到有數據可以接收。

nilchannel中接收數據會一直阻塞。

channel的出隊還有一種非阻塞寫法,定義如下:

val, ok := <-ch

這麼寫可以判斷當前channel是否關閉,如果這個channel被關閉了,ok會被設置爲falseval就是零值。

channel循環出隊

我們可以使用for-range循環處理channel

func main()  {
 ch := make(chan int,10)
 go func() {
  for i:=0;i<10;i++{
   ch <- i
  }
  close(ch)
 }()
 for val := range ch{
  fmt.Println(val)
 }
 fmt.Println("over")
}

range ch會一直迭代到channel被關閉。在使用有緩衝channel時,配合for-range是一個不錯的選擇。

配合select使用

Go語言中的select能夠讓Goroutine同時等待多個channel讀或者寫,在channel狀態未改變之前,select會一直阻塞當前線程或Goroutine。先看一個例子:

func fibonacci(ch chan int, done chan struct{}) {
 x, y := 0, 1
 for {
  select {
  case ch <- x:
   x, y = y, x+y
  case <-done:
   fmt.Println("over")
   return
  }
 }
}
func main() {
 ch := make(chan int)
 done := make(chan struct{})
 go func() {
  for i := 0; i < 10; i++ {
   fmt.Println(<-ch)
  }
  done <- struct{}{}
 }()
 fibonacci(ch, done)
}

selectswitch具有相似的控制結構,與switch不同的是,select中的case中的表達式必須是channel的收發操作,當select中的兩個case同時被觸發時,會隨機執行其中的一個。爲什麼是隨機執行的呢?隨機的引入就是爲了避免飢餓問題的發生,如果我們每次都是按照順序依次執行的,若兩個case一直都是滿足條件的,那麼後面的case永遠都不會執行。

上面例子中的select用法是阻塞式的收發操作,直到有一個channel發生狀態改變。我們也可以在select中使用default語句,那麼select語句在執行時會遇到這兩種情況:

注意:nil channel上的操作會一直被阻塞,如果沒有default case, 只有nil channelselect會一直被阻塞。

關閉channel

內建的close方法可以用來關閉channel。如果channel已經關閉,不可以繼續發送數據了,否則會發生panic,但是從這個關閉的channel中不但可以讀取出已發送的數據,還可以不斷的讀取零值。

func main()  {
 ch := make(chan int, 10)
 ch <- 10
 ch <- 20
 close(ch)
 fmt.Println(<-ch) //1
 fmt.Println(<-ch) //2
 fmt.Println(<-ch) //0
 fmt.Println(<-ch) //0
}

channel基本設計思想

channel設計的基本思想是:不要通過共享內存來通信,而是通過通信來實現共享內存(Do not communicate by sharing memory; instead, share memory by communicating)

這個思想大家是否理解呢?我在這裏分享一下我的理解 (查找資料 + 個人理解),有什麼不對的,留言區指正或開噴!

什麼是使用共享內存來通信?其實就是多個線程 / 協程使用同一塊內存,通過加鎖的方式來宣佈使用某塊內存,通過解鎖來宣佈不再使用某塊內存。

什麼是通過通信來實現共享內存?其實就是把一份內存的開銷變成兩份內存開銷而已,再說的通俗一點就是,我們使用發送消息的方式來同步信息。

爲什麼鼓勵使用通過通信來實現共享內存?使用發送消息來同步信息相比於直接使用共享內存和互斥鎖是一種更高級的抽象,使用更高級的抽象能夠爲我們在程序設計上提供更好的封裝,讓程序的邏輯更加清晰;其次,消息發送在解耦方面與共享內存相比也有一定優勢,我們可以將線程的職責分成生產者和消費者,並通過消息傳遞的方式將它們解耦,不需要再依賴共享內存。

對於這個理解更深的文章,建議讀一下這篇文章:爲什麼使用通信來共享內存

channel在設計上本質就是一個有鎖的環形隊列,包括髮送方隊列、接收方隊列、互斥鎖等結構,下面我就一起從源碼出發,剖析這個有鎖的環形隊列是怎麼設計的!

源碼剖析

數據結構

src/runtime/chan.go中我們可以看到hchan的結構如下:

type hchan struct {
 qcount   uint           // total data in the queue
 dataqsiz uint           // size of the circular queue
 buf      unsafe.Pointer // points to an array of dataqsiz elements
 elemsize uint16
 closed   uint32
 elemtype *_type // element type
 sendx    uint   // send index
 recvx    uint   // receive index
 recvq    waitq  // list of recv waiters
 sendq    waitq  // list of send waiters
 lock mutex
}

我們來解釋一下hchan中每個字段都是什麼意思:

這個結構結合上面那個圖理解就更清晰了:

對於上面的描述,我們可以畫出來這樣的一個理解圖:

channel的創建

前面介紹channel入門的時候我們就說到了,我們使用make進行創建,make在經過編譯器編譯後對應的runtime.makechanruntime.makechan64。爲什麼會有這個區別呢?先看一下代碼:

// go 1.15.7
func makechan64(t *chantype, size int64) *hchan {
 if int64(int(size)) != size {
  panic(plainError("makechan: size out of range"))
 }

 return makechan(t, int(size))
}

runtime.makechan64本質也是調用的makechan方法,只不過多了一個數值溢出的校驗。runtime.makechan64是用於處理緩衝區大於 2 的 32 方,所以這兩個方法會根據傳入的參數類型和緩衝區大小進行選擇。大多數情況都是使用makechan。我們只需要分析makechan函數就可以了。

func makechan(t *chantype, size int) *hchan {
 elem := t.elem
 // 對發送元素進行限制 1<<16 = 65536
 if elem.size >= 1<<16 {
  throw("makechan: invalid channel element type")
 }
  // 檢查是否對齊
 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  throw("makechan: bad alignment")
 }
  // 判斷是否會發生內存溢出
 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
 if overflow || mem > maxAlloc-hchanSize || size < 0 {
  panic(plainError("makechan: size out of range"))
 }
  // 構造hchan對象
 var c *hchan
 switch {
  // 說明是無緩衝的channel
 case mem == 0:
  // Queue or element size is zero.
  c = (*hchan)(mallocgc(hchanSize, nil, true))
  // Race detector uses this location for synchronization.
  c.buf = c.raceaddr()
  // 元素類型不包含指針,只進行一次內存分配
 // 如果hchan結構體中不含指針,gc就不會掃描chan中的元素,所以我們只需要分配
  // "hchan 結構體大小 + 元素大小*個數" 的內存
 case elem.ptrdata == 0:
  // Allocate hchan and buf in one call.
  c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
  c.buf = add(unsafe.Pointer(c), hchanSize)
  // 元素包含指針,進行兩次內存分配操作
 default:
  c = new(hchan)
  c.buf = mallocgc(mem, elem, true)
 }
 // 初始化hchan中的對象
 c.elemsize = uint16(elem.size)
 c.elemtype = elem
 c.dataqsiz = uint(size)
 lockInit(&c.lock, lockRankHchan)

 if debugChan {
  print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
 }
 return c
}

註釋我都添加上了,應該很容易懂吧,這裏在特殊說一下分配內存這塊的內容,其實歸一下類,就只有兩塊:

因爲都是調用mallocgc方法進行內存分配,所以channel都是在堆上創建的,會進行垃圾回收,不關閉close方法也是沒有問題的(但是想寫出漂亮的代碼就不建議你這麼做了)。

channel入隊

channel發送數據部分的代碼經過編譯器編譯後對應的是runtime.chansend1,其調用的也是runtime.chansend方法:

func chansend1(c *hchan, elem unsafe.Pointer) {
 chansend(c, elem, true, getcallerpc())
}

我們主要分析一下chansend方法,代碼有點長,我們分幾個步驟來看這段代碼:

前置檢查

 if c == nil {
  if !block {
   return false
  }
  gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }

 if debugChan {
  print("chansend: chan=", c, "\n")
 }

 if raceenabled {
  racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
 }
 if !block && c.closed == 0 && full(c) {
  return false
 }

 var t0 int64
 if blockprofilerate > 0 {
  t0 = cputicks()
 }

這裏最主要的檢查就是判斷當前channel是否爲nil,往一個nilchannel中發送數據時,會調用gopark函數將當前執行的goroutinerunning狀態轉入waiting狀態,這讓就會導致進程出現死鎖,表象出panic事件。

緊接着會對非阻塞的channel進行一個上限判斷,看看是否快速失敗,這裏相對於之前的版本做了改進,使用full方法來對hchan結構進行校驗。

func full(c *hchan) bool {
 if c.dataqsiz == 0 {
  return c.recvq.first == nil
 }
 return c.qcount == c.dataqsiz
}

這裏快速失敗校驗邏輯如下:

加鎖 / 異常檢查

lock(&c.lock)

if c.closed != 0 {
  unlock(&c.lock)
  panic(plainError("send on closed channel"))
}

前置校驗通過後,在發送數據的邏輯執行之前會先爲當前的channel加鎖,防止多個協程併發修改數據。如果Channel 已經關閉,那麼向該 Channel發送數據時會報“send on closed channel”錯誤並中止程序。

channel直接發送數據

直接發送數據是指 如果已經有阻塞的接收goroutines(即recvq中指向非空),那麼數據將被直接發送給接收goroutine

if sg := c.recvq.dequeue(); sg != nil {
  //找到一個等待的接收器。我們將想要發送的值直接傳遞給接收者,繞過通道緩衝區(如果有的話)。
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
}

這裏主要是調用Send方法,我們來看一下這個函數:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 // 靜態競爭省略掉
  // elem是指接收到的值存放的位置
 if sg.elem != nil {
    // 調用sendDirect方法直接進行內存拷貝
    // 從發送者拷貝到接收者
  sendDirect(c.elemtype, sg, ep)
  sg.elem = nil
 }
  // 綁定goroutine
 gp := sg.g
  // 解鎖
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
  // 喚醒接收的 goroutine
 goready(gp, skip+1)
}

我們再來看一下SendDirect方法:

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
 dst := sg.elem
 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
 memmove(dst, src, t.size)
}

這裏調用了memove方法進行內存拷貝,這裏是從一個 goroutine 直接寫另一個 goroutine 棧的操作,這樣做的好處是減少了一次內存 copy:不用先拷貝到 channelbuf,直接由發送者到接收者,沒有中間商賺差價,效率得以提高,完美。

channel發送數據緩衝區有可用空間

接着往下看代碼,判斷channel緩衝區是否還有可用空間:

// 判斷通道緩衝區是否還有可用空間
if c.qcount < c.dataqsiz {
  qp := chanbuf(c, c.sendx)
  if raceenabled {
   raceacquire(qp)
   racerelease(qp)
  }
  typedmemmove(c.elemtype, qp, ep)
   // 指向下一個待發送元素在循環數組中的位置
  c.sendx++
   // 因爲存儲數據元素的結構是循環隊列,所以噹噹前索引號已經到隊末時,將索引號調整到隊頭
  if c.sendx == c.dataqsiz {
   c.sendx = 0
  }
   // 當前循環隊列中存儲元素數+1
  c.qcount++
   // 釋放鎖,發送數據完畢
  unlock(&c.lock)
  return true
}

這裏的幾個步驟還是挺好理解的,註釋已經添加到代碼中了,我們再來詳細解析一下:

channel發送數據緩衝區無可用空間

緩衝區空間也會有滿了的時候,這是有兩種方式可以選擇,一種是直接返回,另外一種是阻塞等待。

直接返回的代碼就很簡單了,做一個簡單的是否阻塞判斷,不阻塞的話,直接釋放鎖,返回即可。

if !block {
  unlock(&c.lock)
  return false
}

阻塞的話代碼稍微長一點,我們來分析一下:

  gp := getg()
 mysg := acquireSudog()
 mysg.releasetime = 0
 if t0 != 0 {
  mysg.releasetime = -1
 }
 mysg.elem = ep
 mysg.waitlink = nil
 mysg.g = gp
 mysg.isSelect = false
 mysg.c = c
 gp.waiting = mysg
 gp.param = nil
 c.sendq.enqueue(mysg)
 atomic.Store8(&gp.parkingOnChan, 1)
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  KeepAlive(ep)

首先通過調用gettg獲取當前執行的goroutine,然後調用acquireSudog方法構造sudog結構體,然後設置待發送信息和goroutine等信息(sudog 通過 g 字段綁定 goroutine,而goroutine 通過waiting綁定 sudogsudog 還通過 elem 字段綁定待發送元素的地址);構造完畢後調用c.sendq.enqueue將其放入待發送的等待隊列,最後調用gopark方法掛起當前的goroutine進入wait狀態。

這裏在最後調用了KeepAlive方法,很多人對這個比較懵逼,我來解釋一下。這個方法就是爲了保證待發送的數據處於活躍狀態,也就是分配在堆上避免被 GC。這裏我在畫一個圖解釋一下上面的綁定過程,更加深理解。

現在goroutine處於wait狀態了,等待被喚醒,喚醒代碼如下:

 if mysg != gp.waiting {
  throw("G waiting list is corrupted")
 }
 gp.waiting = nil
 gp.activeStackChans = false
 if gp.param == nil {
  if c.closed == 0 {
   throw("chansend: spurious wakeup")
  }
    // 喚醒後channel被關閉了,直接panic
  panic(plainError("send on closed channel"))
 }
 gp.param = nil
 if mysg.releasetime > 0 {
  blockevent(mysg.releasetime-t0, 2)
 }
 // 去掉mysg上綁定的channel
 mysg.c = nil
  // 釋放sudog
 releaseSudog(mysg)
 return true

喚醒的邏輯比較簡單,首先判斷goroutine是否還存在,不存在則拋出異常。喚醒後還有一個檢查是判斷當前channel是否被關閉了,關閉了則觸發panic。最後我們開始取消mysg上的channel綁定和sudog的釋放。

這裏大家肯定好奇,怎麼沒有看到喚醒後執行發送數據動作?之所以有這個想法,就是我們理解錯了。在上面我們已經使goroutine進入了wait狀態,那麼調度器在停止g 時會記錄運行線程和方法內執行的位置,也就是這個ch <- "asong"位置,喚醒後會在這個位置開始執行,代碼又開始重新執行了,但是我們之前進入wait狀態的綁定是要解綁與釋放的,否則下次進來就會出現問題嘍。

接收數據

之前我們介紹過channel接收數據有兩種方式,如下:

val := <- ch
val, ok := <- ch

它們在經過編譯器編譯後分別對應的是runtime.chanrecv1runtime.chanrecv2

//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
 chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
 _, received = chanrecv(c, elem, true)
 return
}

其實都是調用chanrecv方法,所以我們只需要解析這個方法就可以了。接收部分的代碼和接收部分的代碼是相對應的,所以我們也可以分幾個步驟來看這部分代碼:

前置檢查

if c == nil {
  if !block {
   return
}
  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  throw("unreachable")
}
if atomic.Load(&c.closed) == 0 {
   return
}
if empty(c) {
  if raceenabled {
    raceacquire(c.raceaddr())
  }
  if ep != nil {
    typedmemclr(c.elemtype, ep)
  }
   return true, false
  }
}

var t0 int64
if blockprofilerate > 0 {
 t0 = cputicks()
}

首先也會判斷當前channel是否爲nil channel,如果是nil channel且爲非阻塞接收,則直接返回即可。如果是nil channel且爲阻塞接收,則直接調用gopark方法掛起當前goroutine

然後也會進行快速失敗檢查,這裏只會對非阻塞接收的channel進行快速失敗檢查,檢查規則如下:

func empty(c *hchan) bool {
 // c.dataqsiz is immutable.
 if c.dataqsiz == 0 {
  return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
 }
 return atomic.Loaduint(&c.qcount) == 0
}

當循環隊列爲 0且等待隊列 sendq內沒有 goroutine 正在等待或者緩衝區數組爲空時,如果channel還未關閉,這說明沒有要接收的數據,直接返回即可。如果channel已經關閉了且緩存區沒有數據了,則會清理ep指針中的數據並返回。這裏爲什麼清理ep指針呢?ep指針是什麼?這個ep就是我們要接收的值存放的地址(val := <-ch val就是ep  ),即使channel關閉了,我們也可以接收零值。

加鎖和提前返回

 lock(&c.lock)

 if c.closed != 0 && c.qcount == 0 {
  if raceenabled {
   raceacquire(c.raceaddr())
  }
  unlock(&c.lock)
  if ep != nil {
   typedmemclr(c.elemtype, ep)
  }
  return true, false
 }

前置校驗通過後,在執行接收數據的邏輯之前會先爲當前的channel加鎖,防止多個協程併發接收數據。同樣也會判斷當前channel是否被關閉,如果channel被關閉了,並且緩存區沒有數據了,則直接釋放鎖和清理ep中的指針數據,不需要再走接下來的流程。

channel直接接收數據

這一步與channel直接發送數據是對應的,當發現channel上有正在阻塞等待的發送方時,則直接進行接收。

if sg := c.sendq.dequeue(); sg != nil {
  recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true, true
 }

等待發送隊列裏有goroutine存在,有兩種可能:

針對這兩種情況,在recv方法中的執行邏輯是不同的:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  // 非緩衝channel
 if c.dataqsiz == 0 {
    // 未忽略接收值
  if ep != nil {
   // 直接從發送方拷貝數據到接收方
   recvDirect(c.elemtype, sg, ep)
  }
 } else { // 有緩衝channel,但是緩衝區滿了
    // 緩衝區滿時,接收方和發送方遊標重合了
    // 因爲是循環隊列,都是遊標0的位置
    // 獲取當前接收方遊標位置下的值
  qp := chanbuf(c, c.recvx)
  // 未忽略值的情況下直接從發送方拷貝數據到接收方
  if ep != nil {
   typedmemmove(c.elemtype, ep, qp)
  }
  // 將發送者數據拷貝到緩衝區中
  typedmemmove(c.elemtype, qp, sg.elem)
    // 自增到下一個待接收位置
  c.recvx++
    // 如果下一個待接收位置等於隊列長度了,則下一個待接收位置爲隊頭,因爲是循環隊列
  if c.recvx == c.dataqsiz {
   c.recvx = 0
  }
    // 上面已經將發送者數據拷貝到緩衝區中了,所以緩衝區還是滿的,所以發送方位置仍然等於接收方位置。
  c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
 }
 sg.elem = nil
  // 綁定發送方goroutine
 gp := sg.g
 unlockf()
 gp.param = unsafe.Pointer(sg)
 if sg.releasetime != 0 {
  sg.releasetime = cputicks()
 }
  // 喚醒發送方的goroutine
 goready(gp, skip+1)
}

代碼中的註釋已經很清楚了,但還是想在解釋一遍,這裏主要就是分爲兩種情況:

最後別忘了還有一個操作就是調用goready方法喚醒發送方的goroutine可以繼續發送數據了。

channel緩衝區有數據

我們接着往下看代碼,若當前channel的緩衝區有數據時,代碼邏輯如下:

  // 緩衝channel,buf裏有可用元素,發送方也可以正常發送
   if c.qcount > 0 {
     // 直接從循環隊列中找到要接收的元素
  qp := chanbuf(c, c.recvx)
    // 未忽略接收值,直接把緩衝區的值拷貝到接收方中
  if ep != nil {
   typedmemmove(c.elemtype, ep, qp)
  }
     // 清理掉循環數組裏相應位置的值
  typedmemclr(c.elemtype, qp)
     // 接收遊標向前移動
  c.recvx++
     // 超過循環隊列的長度時,接收遊標歸0(循環隊列)
  if c.recvx == c.dataqsiz {
   c.recvx = 0
  }
     // 循環隊列中的數據數量減1
  c.qcount--
    // 接收數據完畢,釋放鎖
  unlock(&c.lock)
  return true, true
 }

 if !block {
  unlock(&c.lock)
  return false, false
 }

這段代碼沒什麼難度,就不再解釋一遍了。

channel緩衝區無數據

經過上面的步驟,現在可以確定目前這個channel既沒有待發送的goroutine,並且緩衝區也沒有數據。接下來就看我們是否阻塞等待接收數據了,也就有了如下判斷:

 if !block {
  unlock(&c.lock)
  return false, false
 }

非阻塞接收數據的話,直接返回即可;否則則進入阻塞接收模式:

  gp := getg()
 mysg := acquireSudog()
 mysg.releasetime = 0
 if t0 != 0 {
  mysg.releasetime = -1
 }
 mysg.elem = ep
 mysg.waitlink = nil
 gp.waiting = mysg
 mysg.g = gp
 mysg.isSelect = false
 mysg.c = c
 gp.param = nil
 c.recvq.enqueue(mysg)
 atomic.Store8(&gp.parkingOnChan, 1)
 gopark(chanparkcommit,  unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

這一部分的邏輯基本與發送阻塞部分一模一樣,大概邏輯就是獲取當前的goroutine,然後構建sudog結構保存待接收數據的地址信息和goroutine信息,並將sudog加入等待接收隊列,最後掛起當前goroutine,等待喚醒。

接下來的環境邏輯也沒有特別要說的,與發送方喚醒部分一模一樣,不懂的可以看前面。喚醒後的主要工作就是恢復現場,釋放綁定信息。

關閉channel

使用close可以關閉channel,其經過編譯器編譯後對應的是runtime.closechan方法,詳細邏輯我們通過註釋到代碼中:

func closechan(c *hchan) {
  // 對一個nil的channel進行關閉會引發panic
 if c == nil {
  panic(plainError("close of nil channel"))
 }
  // 加鎖
 lock(&c.lock)
  // 關閉一個已經關閉的channel也會引發channel
 if c.closed != 0 {
  unlock(&c.lock)
  panic(plainError("close of closed channel"))
 }
 // 關閉channnel標誌
 c.closed = 1
 // Goroutine集合
 var glist gList

 // 接受者的 sudog 等待隊列(recvq)加入到待清除隊列 glist 中
 for {
  sg := c.recvq.dequeue()
  if sg == nil {
   break
  }
  if sg.elem != nil {
   typedmemclr(c.elemtype, sg.elem)
   sg.elem = nil
  }
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }

 // 發送方的sudog也加入到到待清除隊列 glist 中
 for {
  sg := c.sendq.dequeue()
  if sg == nil {
   break
  }
    // 要關閉的goroutine,發送的值設爲nil
  sg.elem = nil
  if sg.releasetime != 0 {
   sg.releasetime = cputicks()
  }
  gp := sg.g
  gp.param = nil
  if raceenabled {
   raceacquireg(gp, c.raceaddr())
  }
  glist.push(gp)
 }
  // 釋放了發送方和接收方後,釋放鎖就可以了。
 unlock(&c.lock)

 // 將所有 glist 中的 goroutine 狀態從 _Gwaiting 設置爲 _Grunnable 狀態,等待調度器的調度。
  // 我們既然是從sendq和recvq中獲取的goroutine,狀態都是掛起狀態,所以需要喚醒他們,走後面的流程。
 for !glist.empty() {
  gp := glist.pop()
  gp.schedlink = 0
  goready(gp, 3)
 }
}

這裏邏輯還是比較簡單,歸納總結一下:

總結

哇塞,開往幼兒園的車終於停了,小松子嘮嘮叨叨一路了,你們學會了嗎?

我們從入門開始到最後的源碼剖析,其實channel的設計一點也不復雜,源碼也是很容易看懂的,本質就是維護了一個循環隊列嘛,發送數據遵循 FIFO(First In First Out)原語,數據傳遞依賴於內存拷貝。不懂的可以再看一遍,很容易理解的哦~。

最後我想說的是:channel內部也是使用互斥鎖,那麼channel和互斥鎖誰更輕量呢?(評論區我們一起探討一下)。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/U0dvX3NYrd_ka7w5Pe8oQQ