golang channel 使用總結
不同於傳統的多線程併發模型使用共享內存來實現線程間通信的方式,golang 的哲學是通過 channel 進行協程 (goroutine) 之間的通信來實現數據共享:
Do not communicate by sharing memory; instead, share memory by communicating.
這種方式的優點是通過提供原子的通信原語,避免了競態情形 (race condition) 下複雜的鎖機制。channel 可以看成一個 FIFO 隊列,對 FIFO 隊列的讀寫都是原子的操作,不需要加鎖。對 channel 的操作行爲結果總結如下:
讀取一個已關閉的 channel 時,總是能讀取到對應類型的零值,爲了和讀取非空未關閉 channel 的行爲區別,可以使用兩個接收值:
// ok is false when ch is closed
v, ok := <-ch
golang 中大部分類型都是值類型(只有 slice / channel / map 是引用類型),讀 / 寫類型是值類型的 channel 時,如果元素 size 比較大時,應該使用指針代替,避免頻繁的內存拷貝開銷。
內部實現
如圖所示,在 channel 的內部實現中(具體定義在 $GOROOT/src/runtime/chan.go
裏),維護了 3 個隊列:
-
讀等待協程隊列 recvq,維護了阻塞在讀此 channel 的協程列表
-
寫等待協程隊列 sendq,維護了阻塞在寫此 channel 的協程列表
-
緩衝數據隊列 buf,用環形隊列實現,不帶緩衝的 channel 此隊列 size 則爲 0
img
當協程嘗試從未關閉的 channel 中讀取數據時,內部的操作如下:
-
當 buf 非空時,此時 recvq 必爲空,buf 彈出一個元素給讀協程,讀協程獲得數據後繼續執行,此時若 sendq 非空,則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫數據入隊列 buf ,此時讀取操作
<- ch
未阻塞; -
當 buf 爲空但 sendq 非空時(不帶緩衝的 channel),則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫數據直接傳遞給讀協程,讀協程繼續執行,此時讀取操作
<- ch
未阻塞; -
當 buf 爲空並且 sendq 也爲空時,讀協程入隊列 recvq 並轉入 blocking 狀態,當後續有其他協程往 channel 寫數據時,讀協程纔會重新轉入 running 狀態,此時讀取操作
<- ch
阻塞。
類似的,當協程嘗試往未關閉的 channel 中寫入數據時,內部的操作如下:
-
當隊列 recvq 非空時,此時隊列 buf 必爲空,從 recvq 彈出一個讀協程接收待寫數據,此讀協程此時結束阻塞並轉入 running 狀態,寫協程繼續執行,此時寫入操作
ch <-
未阻塞; -
當隊列 recvq 爲空但 buf 未滿時,此時 sendq 必爲空,寫協程的待寫數據入 buf 然後繼續執行,此時寫入操作
ch <-
未阻塞; -
當隊列 recvq 爲空並且 buf 爲滿時,此時寫協程入隊列 sendq 並轉入 blokcing 狀態,當後續有其他協程從 channel 中讀數據時,寫協程纔會重新轉入 running 狀態,此時寫入操作
ch <-
阻塞。
當關閉 non-nil channel 時,內部的操作如下:
-
當隊列 recvq 非空時,此時 buf 必爲空,recvq 中的所有協程都將收到對應類型的零值然後結束阻塞狀態;
-
當隊列 sendq 非空時,此時 buf 必爲滿,sendq 中的所有協程都會產生 panic ,在 buf 中數據仍然會保留直到被其他協程讀取。
使用場景
除了常規的用來在協程之間傳遞數據外,本節列出了一些特殊的使用 channel 的場景。
futures / promises
golang 雖然沒有直接提供 futrue / promise 模型的操作原語,但通過 goroutine 和 channel 可以實現類似的功能:
package main
import (
"io/ioutil"
"log"
"net/http"
)
// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {
c := make(chan []byte, 1)
go func() {
var body []byte
defer func() {
c <- body
}()
res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()
body, _ = ioutil.ReadAll(res.Body)
}()
return c
}
func main() {
future := RequestFuture("https://api.github.com/users/octocat/orgs")
body := <-future
log.Printf("reponse length: %d", len(body))
}
條件變量 (condition variable)
類型於 POSIX 接口中線程通知其他線程某個事件發生的條件變量,channel 的特性也可以用來當成協程之間同步的條件變量。因爲 channel 只是用來通知,所以 channel 中具體的數據類型和值並不重要,這種場景一般用 strct {}
作爲 channel 的類型。
一對一通知
類似 pthread_cond_signal()
的功能,用來在一個協程中通知另個某一個協程事件發生:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
nums := make([]int, 100)
go func() {
time.Sleep(time.Second)
for i := 0; i < len(nums); i++ {
nums[i] = i
}
// send a finish signal
ch <- struct{}{}
}()
// wait for finish signal
<-ch
fmt.Println(nums)
}
廣播通知
類似 pthread_cond_broadcast()
的功能。利用從已關閉的 channel 讀取數據時總是非阻塞的特性,可以實現在一個協程中向其他多個協程廣播某個事件發生的通知:
package main
import (
"fmt"
"time"
)
func main() {
N := 10
exit := make(chan struct{})
done := make(chan struct{}, N)
// start N worker goroutines
for i := 0; i < N; i++ {
go func(n int) {
for {
select {
// wait for exit signal
case <-exit:
fmt.Printf("worker goroutine #%d exit\n", n)
done <- struct{}{}
return
case <-time.After(time.Second):
fmt.Printf("worker goroutine #%d is working...\n", n)
}
}
}(i)
}
time.Sleep(3 * time.Second)
// broadcast exit signal
close(exit)
// wait for all worker goroutines exit
for i := 0; i < N; i++ {
<-done
}
fmt.Println("main goroutine exit")
}
信號量
channel 的讀 / 寫相當於信號量的 P / V 操作,下面的示例程序中 channel 相當於信號量:
package main
import (
"log"
"math/rand"
"time"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeConsumer(customerId int) {
log.Print("-> consumer#", customerId, " enters the bar")
seat := <-bar // need a seat to drink
log.Print("consumer#", customerId, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
log.Print("<- consumer#", customerId, " frees seat#", seat)
bar <- seat // free the seat and leave the bar
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10) // the bar has 10 seats
// Place seats in an bar.
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId) // none of the sends will block
}
// a new consumer try to enter the bar for each second
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeConsumer(customerId)
}
}
互斥量
互斥量相當於二元信號裏,所以 cap 爲 1 的 channel 可以當成互斥量使用:
package main
import "fmt"
func main() {
mutex := make(chan struct{}, 1) // the capacity must be one
counter := 0
increase := func() {
mutex <- struct{}{} // lock
counter++
<-mutex // unlock
}
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}
關閉 channel
關閉不再需要使用的 channel 並不是必須的。跟其他資源比如打開的文件、socket 連接不一樣,這類資源使用完後不關閉後會造成句柄泄露,channel 使用完後不關閉也沒有關係,channel 沒有被任何協程用到後最終會被 GC 回收。關閉 channel 一般是用來通知其他協程某個任務已經完成了。golang 也沒有直接提供判斷 channel 是否已經關閉的接口,雖然可以用其他不太優雅的方式自己實現一個:
func isClosed(ch chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}
不過實現一個這樣的接口也沒什麼必要。因爲就算通過 isClosed()
得到當前 channel 當前還未關閉,如果試圖往 channel 裏寫數據,仍然可能會發生 panic ,因爲在調用 isClosed()
後,其他協程可能已經把 channel 關閉了。關閉 channel 時應該注意以下準則:
-
不要在讀取端關閉 channel ,因爲寫入端無法知道 channel 是否已經關閉,往已關閉的 channel 寫數據會 panic ;
-
有多個寫入端時,不要再寫入端關閉 channle ,因爲其他寫入端無法知道 channel 是否已經關閉,關閉已經關閉的 channel 會發生 panic ;
-
如果只有一個寫入端,可以在這個寫入端放心關閉 channel 。
關閉 channel 粗暴一點的做法是隨意關閉,如果產生了 panic 就用 recover 避免進程掛掉。稍好一點的方案是使用標準庫的 sync
包來做關閉 channel 時的協程同步,不過使用起來也稍微複雜些。下面介紹一種優雅些的做法。
一寫多讀
這種場景下這個唯一的寫入端可以關閉 channel 用來通知讀取端所有數據都已經寫入完成了。讀取端只需要用 for range
把 channel 中數據遍歷完就可以了,當 channel 關閉時,for range
仍然會將 channel 緩衝中的數據全部遍歷完然後再退出循環:
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
send := func() {
for i := 0; i < 100; i++ {
ch <- i
}
// signal sending finish
close(ch)
}
recv := func(id int) {
defer wg.Done()
for i := range ch {
fmt.Printf("receiver #%d get %d\n", id, i)
}
fmt.Printf("receiver #%d exit\n", id)
}
wg.Add(3)
go recv(0)
go recv(1)
go recv(2)
send()
wg.Wait()
}
多寫一讀
這種場景下雖然可以用 sync.Once
來解決多個寫入端重複關閉 channel 的問題,但更優雅的辦法設置一個額外的 channel ,由讀取端通過關閉來通知寫入端任務完成不要再繼續再寫入數據了:
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func() {
count := 0
for i := range ch {
fmt.Printf("receiver get %d\n", i)
count++
if count >= 1000 {
// signal recving finish
close(done)
return
}
}
}
wg.Add(3)
go send(0)
go send(1)
go send(2)
recv()
wg.Wait()
}
多寫多讀
這種場景稍微複雜,和上面的例子一樣,也需要設置一個額外 channel 用來通知多個寫入端和讀取端。另外需要起一個額外的協程來通過關閉這個 channel 來廣播通知:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func(id int) {
defer wg.Done()
for {
select {
case <-done:
// get exit signal
fmt.Printf("receiver #%d exit\n", id)
return
case i := <-ch:
fmt.Printf("receiver #%d get %d\n", id, i)
time.Sleep(time.Millisecond)
}
}
}
wg.Add(6)
go send(0)
go send(1)
go send(2)
go recv(0)
go recv(1)
go recv(2)
time.Sleep(time.Second)
// signal finish
close(done)
// wait all sender and receiver exit
wg.Wait()
}
總結
channle 作爲 golang 最重要的特性,用起來還是比較爽的。傳統的 C 裏要實現類型的功能的話,一般需要用到 socket 或者 FIFO 來實現,另外還要考慮數據包的完整性與併發衝突的問題,channel 則屏蔽了這些底層細節,使用者只需要考慮讀寫就可以了。channel 是引用類型,瞭解一下 channel 底層的機制對更好的使用 channel 還是很用必要的。雖然操作原語簡單,但涉及到阻塞的問題,使用不當可能會造成死鎖或者無限制的協程創建最終導致進程掛掉。
channel 除在可以用來在協程之間通信外,其阻塞和喚醒協程的特性也可以用作協程之間的同步機制,文中也用示例簡單介紹了這種場景下的用法。
關閉 channel 並不是必須的,只要沒有協程沒用引用 channel ,最終會被 GC 清理。所以使用的時候要特別注意,不要讓協程阻塞在 channel 上,這種情況很難檢測到,而且會造成 channel 和阻塞在 channel 的協程佔有的資源無法被 GC 清理最終導致內存泄露。
channle 方便 golang 程序使用 CSP 的編程範形,但是 golang 是一種多範形的編程語言,golang 也支持傳統的通過共享內存來通信的編程方式。終極的原則是根據場景選擇合適的編程範型,不要因爲 channel 好用而濫用 CSP 。
轉自:ExplorerMan
cnblogs.com/ExMan/p/11710017.html
Go 開發大全
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/cga4frDCPx6fOjJJx9DlAg