使用反射操作 channel
今年教師節極客時間送給講師 4999 SVIP 卡,一直沒顧過來用,上週激活後在極客時間的衆多精品課和專欄中徜徉,收穫頗豐。尤其是在拜讀鳥窩老師的《Go 併發編程實戰課》[1] 後,get 到一個以前從未用過的 “技能點”:使用 reflect 操作 channel,這裏整理一下,把它分享給大家。
1. channel 常規語法的 “限制”
Go 語言實現了基於 CSP(Communicating Sequential Processes)理論的併發方案。方案包含兩個重要元素,一個是 Goroutine,它是 Go 應用併發設計的基本構建與執行單元;另一個就是 channel,它在併發模型中扮演着重要的角色。channel 既可以用來實現 Goroutine 間的通信,還可以實現 Goroutine 間的同步。
我們先來簡要回顧一下有關 channel 的常規語法。
我們可以通過 make(chan T, n) 創建元素類型爲 T、容量爲 n 的 channel 類型實例,比如:
ch1 := make(chan int) // 創建一個無緩衝的channel實例ch1
ch2 := make(chan int, 5) // 創建一個帶緩衝的channel實例ch2
Go 提供了 “<-” 操作符用於對 channel 類型變量進行發送與接收操作,下面是一些對上述 channel ch1 和 ch2 進行收發操作的代碼示例:
ch1 <- 13 // 將整型字面值13發送到無緩衝channel類型變量ch1中
n := <- ch1 // 從無緩衝channel類型變量ch1中接收一個整型值存儲到整型變量n中
ch2 <- 17 // 將整型字面值17發送到帶緩衝channel類型變量ch2中
m := <- ch2 // 從帶緩衝channel類型變量ch2中接收一個整型值存儲到整型變量m中
Go 不僅提供了單獨操作 channel 的語法,還提供了可以同時對多個 channel 進行操作的 select-case 語法,比如下面代碼:
select {
case x := <-ch1: // 從channel ch1接收數據
... ...
case y, ok := <-ch2: // 從channel ch2接收數據,並根據ok值判斷ch2是否已經關閉
... ...
case ch3 <- z: // 將z值發送到channel ch3中:
... ...
default: // 當上面case中的channel通信均無法實施時,執行該默認分支
}
我們看到:select 語法中的 case 數量必須是固定的,我們只能把事先要交給 select“監聽” 的 channel 準備好,在 select 語句中平鋪開纔可以。這就是 select 語句常規語法的限制,即 select 語法不支持動態的 case 集合。如果我們要監聽的 channel 個數是不確定的,且在運行時會動態變化,那麼 select 語法將無法滿足我們的要求。
那怎麼突破這一限制呢?鳥窩老師告訴我們用 reflect 包 [2]。
2. reflect.Select 和 reflect.SelectCase
很多朋友可能和我一樣,因爲沒有使用過 reflect 包操作 channel,就會以爲 reflect 操作 channel 的能力是 Go 新版本才提供的,但實則不然。reflect 包中用於操作 channel 的函數 Select 以及其切片參數的元素類型 SelectCase 早在 Go 1.1 版本就加入到 Go 語言中了,有下圖爲證:
那麼如何使用這一 “古老” 的機制呢?我們一起來看一些例子。
首先我們來看第一種情況,也是最好理解的一種情況,即從一個動態的 channel 集合進行 receive operations 的 select,下面是示例代碼:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv/main.go
package main
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var rchs []chan int
for i := 0; i < 10; i++ {
rchs = append(rchs, make(chan int))
}
// 創建SelectCase
var cases = createRecvCases(rchs)
// 消費者goroutine
go func() {
defer wg.Done()
for {
chosen, recv, ok := reflect.Select(cases)
if ok {
fmt.Printf("recv from channel [%d], val=%v\n", chosen, recv)
continue
}
// one of the channels is closed, exit the goroutine
fmt.Printf("channel [%d] closed, select goroutine exit\n", chosen)
return
}
}()
// 生產者goroutine
go func() {
defer wg.Done()
var n int
s := rand.NewSource(time.Now().Unix())
r := rand.New(s)
for i := 0; i < 10; i++ {
n = r.Intn(10)
rchs[n] <- n
}
close(rchs[n])
}()
wg.Wait()
}
func createRecvCases(rchs []chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創建recv case
for _, ch := range rchs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
return cases
}
在這個例子中,我們通過 createRecvCases 這個函數創建一個元素類型爲 reflect.SelectCase 的切片,之後使用 reflect.Select 可以監聽這個切片集合,就像常規 select 語法那樣,從有數據的 recv Channel 集合中隨機選出一個返回。
reflect.SelectCase 有三個字段:
// $GOROOT/src/reflect/value.go
type SelectCase struct {
Dir SelectDir // direction of case
Chan Value // channel to use (for send or receive)
Send Value // value to send (for send)
}
其中 Dir 字段的值是一個 “枚舉”,枚舉值如下:
// $GOROOT/src/reflect/value.go
const (
_ SelectDir = iota
SelectSend // case Chan <- Send
SelectRecv // case <-Chan:
SelectDefault // default
)
從常量名我們也可以看出,Dir 用於標識 case 的類型,SelectRecv 表示這是一個從 channel 做 receive 操作的 case,SelectSend 表示這是一個向 channel 做 send 操作的 case;SelectDefault 則表示這是一個 default case。
構建好 SelectCase 的切片後,我們就可以將其傳給 reflect.Select 了。Select 函數的語義與 select 關鍵字語義是一致的,它會監聽傳入的所有 SelectCase,以上面示例爲例,如果所有 channel 都沒有數據,那麼 reflect.Select 會阻塞,直到某個 channel 有數據或關閉。
Select 函數有三個返回值:
// $GOROOT/src/reflect/value.go
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
對於上面示例而言,如果監聽的某個 case 有數據了,那麼 Select 的返回值 chosen 中存儲了該 channel 在 cases 切片中的下標,recv 中存儲了從 channel 收到的值,recvOK 等價於 comma, ok 模式的 ok,當正常接收到由 send channel 操作發送的值時,recvOK 爲 true,如果 channel 被 close 了,recvOK 爲 false。
上面的示例啓動了兩個 goroutine,一個 goroutine 充當消費者,由 reflect.Select 監聽一組 channel,當某個 channel 關閉時,該 goroutine 退出;另外一個 goroutine 則是隨機的向這些 channel 中發送數據,發送 10 次後,關閉其中某個 channel 通知消費者退出。
我們運行一下該示例程序,得到如下結果:
$go run main.go
recv from channel [1], val=1
recv from channel [4], val=4
recv from channel [5], val=5
recv from channel [8], val=8
recv from channel [1], val=1
recv from channel [1], val=1
recv from channel [8], val=8
recv from channel [3], val=3
recv from channel [5], val=5
recv from channel [9], val=9
channel [9] closed, select goroutine exit
我們日常編碼時經常會在 select 語句中加上 default 分支,以防止 select 完全阻塞,下面我們就來改造一下示例,讓其增加對 default 分支的支持:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv-with-default/main.go
package main
import (
"fmt"
"math/rand"
"reflect"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
var rchs []chan int
for i := 0; i < 10; i++ {
rchs = append(rchs, make(chan int))
}
// 創建SelectCase
var cases = createRecvCases(rchs, true)
// 消費者goroutine
go func() {
defer wg.Done()
for {
chosen, recv, ok := reflect.Select(cases)
if cases[chosen].Dir == reflect.SelectDefault {
fmt.Println("choose the default")
continue
}
if ok {
fmt.Printf("recv from channel [%d], val=%v\n", chosen, recv)
continue
}
// one of the channels is closed, exit the goroutine
fmt.Printf("channel [%d] closed, select goroutine exit\n", chosen)
return
}
}()
// 生產者goroutine
go func() {
defer wg.Done()
var n int
s := rand.NewSource(time.Now().Unix())
r := rand.New(s)
for i := 0; i < 10; i++ {
n = r.Intn(10)
rchs[n] <- n
}
close(rchs[n])
}()
wg.Wait()
}
func createRecvCases(rchs []chan int, withDefault bool) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創建recv case
for _, ch := range rchs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
if withDefault {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectDefault,
Chan: reflect.Value{},
Send: reflect.Value{},
})
}
return cases
}
在這個示例中,我們的 createRecvCases 函數增加了一個 withDefault 布爾型參數,當 withDefault 爲 true 時,返回的 cases 切片中將包含一個 default case。我們看到,創建 defaultCase 時,Chan 和 Send 兩個字段需要傳入空的 reflect.Value。
在消費者 goroutine 中,我們通過選出的 case 的 Dir 字段是否爲 reflect.SelectDefault 來判定是否 default case 被選出,其餘的處理邏輯不變,我們運行一下這個示例:
$go run main.go
recv from channel [8], val=8
recv from channel [8], val=8
choose the default
choose the default
choose the default
choose the default
choose the default
recv from channel [1], val=1
choose the default
choose the default
choose the default
recv from channel [3], val=3
recv from channel [6], val=6
choose the default
choose the default
recv from channel [0], val=0
choose the default
choose the default
choose the default
recv from channel [5], val=5
recv from channel [2], val=2
choose the default
choose the default
choose the default
recv from channel [2], val=2
choose the default
choose the default
recv from channel [2], val=2
choose the default
choose the default
channel [2] closed, select goroutine exit
我們看到,default case 被選擇的幾率還是蠻大的。
最後,我們再來看看如何使用 reflect 包向 channel 中發送數據,看下面示例代碼:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-send/main.go
package main
import (
"fmt"
"reflect"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
ch0, ch1, ch2 := make(chan int), make(chan int), make(chan int)
var schs = []chan int{ch0, ch1, ch2}
// 創建SelectCase
var cases = createCases(schs)
// 生產者goroutine
go func() {
defer wg.Done()
for range cases {
chosen, _, _ := reflect.Select(cases)
fmt.Printf("send to channel [%d], val=%v\n", chosen, cases[chosen].Send)
cases[chosen].Chan = reflect.Value{}
}
fmt.Println("select goroutine exit")
return
}()
// 消費者goroutine
go func() {
defer wg.Done()
for range schs {
var v int
select {
case v = <-ch0:
fmt.Printf("recv %d from ch0\n", v)
case v = <-ch1:
fmt.Printf("recv %d from ch1\n", v)
case v = <-ch2:
fmt.Printf("recv %d from ch2\n", v)
}
}
}()
wg.Wait()
}
func createCases(schs []chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創建send case
for i, ch := range schs {
n := i + 100
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectSend,
Chan: reflect.ValueOf(ch),
Send: reflect.ValueOf(n),
})
}
return cases
}
在這個示例中,我們針對三個 channel:ch0,ch1 和 ch2 創建了寫操作的 SelectCase,每個 SelectCase 的 Send 字段都被賦予了要發送給該 channel 的值,這裏使用了 “100 + 下標號”。
生產者 goroutine 中有一個 “與衆不同” 的地方,那就是每次某個寫操作觸發後,我都將該 SelectCase 中的 Chan 重置爲一個空 Value,以防止下次該 channel 被重新選出:
cases[chosen].Chan = reflect.Value{}
運行一下該示例,我們得到:
$go run main.go
recv 101 from ch1
send to channel [1], val=101
send to channel [0], val=100
recv 100 from ch0
recv 102 from ch2
send to channel [2], val=102
select goroutine exit
通過上面的幾個例子我們看到,reflect.Select 有着與 select 等價的語義,且還支持動態增刪和修改 case,功能不可爲不強大,現在還剩一點要 care,那就是它的執行性能如何呢?我們接着往下看。
3. reflect.Select 的性能
我們用 benchmark test 來對比一下常規 select 與 reflect.Select 在執行性能上的差別,下面是 benchmark 代碼:
// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-benchmark/benchmark_test.go
package main
import (
"reflect"
"testing"
)
func createCases(rchs []chan int) []reflect.SelectCase {
var cases []reflect.SelectCase
// 創建recv case
for _, ch := range rchs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
return cases
}
func BenchmarkSelect(b *testing.B) {
var c1 = make(chan int)
var c2 = make(chan int)
var c3 = make(chan int)
go func() {
for {
c1 <- 1
}
}()
go func() {
for {
c2 <- 2
}
}()
go func() {
for {
c3 <- 3
}
}()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
select {
case <-c1:
case <-c2:
case <-c3:
}
}
}
func BenchmarkReflectSelect(b *testing.B) {
var c1 = make(chan int)
var c2 = make(chan int)
var c3 = make(chan int)
go func() {
for {
c1 <- 1
}
}()
go func() {
for {
c2 <- 2
}
}()
go func() {
for {
c3 <- 3
}
}()
chs := createCases([]chan int{c1, c2, c3})
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _, _ = reflect.Select(chs)
}
}
運行一下該 benchmark:
$go test -bench .
goos: darwin
goarch: amd64
pkg: github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark
... ...
BenchmarkSelect-8 2765396 427.8 ns/op 0 B/op 0 allocs/op
BenchmarkReflectSelect-8 1839706 806.0 ns/op 112 B/op 6 allocs/op
PASS
ok github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark 3.779s
我們看到:reflect.Select 的執行效率相對於 select 還是要差的,並且在其執行過程中還要做額外的內存分配。
4. 小結
本文介紹了 reflect.Select 與 SelectCase 的結構以及如何使用它們在不同場景下操作 channel。但大多數情況下,我們是不需要使用 reflect.Select,常規 select 語法足以滿足我們的要求。並且 reflect.Select 有對 cases 數量的約束,最大支持 65536 個 cases,雖然這個約束對於大多數場合而言足夠用了。
本文涉及的示例源碼可以在這裏 [3] 下載。
Gopher Daily(Gopher 每日新聞) 歸檔倉庫 - https://github.com/bigwhite/gopherdaily
我的聯繫方式:
-
微博 (暫不可用):https://weibo.com/bigwhite20xx
-
微博 2:https://weibo.com/u/6484441286
-
博客:tonybai.com
-
github: https://github.com/bigwhite
參考資料
[1]
《Go 併發編程實戰課》: http://gk.link/a/11OCq
[2]
reflect 包: https://tonybai.com/2021/04/19/variable-operation-using-reflection-in-go
[3]
這裏: https://github.com/bigwhite/experiments/tree/master/reflect-operate-channel
[4]
“Gopher 部落” 知識星球: https://wx.zsxq.com/dweb2/index/group/51284458844544
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/E4lT4SuWKIlCZd60i7vigQ