使用反射操作 channel

今年教師節極客時間送給講師 4999 SVIP 卡,一直沒顧過來用,上週激活後在極客時間的衆多精品課和專欄中徜徉,收穫頗豐。尤其是在拜讀鳥窩老師的《Go 併發編程實戰課》[1] 後,get 到一個以前從未用過的 “技能點”:使用 reflect 操作 channel,這裏整理一下,把它分享給大家。

1. channel 常規語法的 “限制”

Go 語言實現了基於 CSP(Communicating Sequential Processes)理論的併發方案。方案包含兩個重要元素,一個是 Goroutine,它是 Go 應用併發設計的基本構建與執行單元;另一個就是 channel,它在併發模型中扮演着重要的角色。channel 既可以用來實現 Goroutine 間的通信,還可以實現 Goroutine 間的同步。

我們先來簡要回顧一下有關 channel 的常規語法。

我們可以通過 make(chan T, n) 創建元素類型爲 T、容量爲 n 的 channel 類型實例,比如:

ch1 := make(chan int)    // 創建一個無緩衝的channel實例ch1
ch2 := make(chan int, 5)  // 創建一個帶緩衝的channel實例ch2

Go 提供了 “<-” 操作符用於對 channel 類型變量進行發送與接收操作,下面是一些對上述 channel ch1 和 ch2 進行收發操作的代碼示例:

ch1 <- 13    // 將整型字面值13發送到無緩衝channel類型變量ch1中
n := <- ch1  // 從無緩衝channel類型變量ch1中接收一個整型值存儲到整型變量n中
ch2 <- 17    // 將整型字面值17發送到帶緩衝channel類型變量ch2中
m := <- ch2  // 從帶緩衝channel類型變量ch2中接收一個整型值存儲到整型變量m中

Go 不僅提供了單獨操作 channel 的語法,還提供了可以同時對多個 channel 進行操作的 select-case 語法,比如下面代碼:

select {
case x := <-ch1:     // 從channel ch1接收數據
  ... ...

case y, ok := <-ch2: // 從channel ch2接收數據,並根據ok值判斷ch2是否已經關閉
  ... ...

case ch3 <- z:       // 將z值發送到channel ch3中:
  ... ...

default:             // 當上面case中的channel通信均無法實施時,執行該默認分支
}

我們看到:select 語法中的 case 數量必須是固定的,我們只能把事先要交給 select“監聽” 的 channel 準備好,在 select 語句中平鋪開纔可以。這就是 select 語句常規語法的限制,即 select 語法不支持動態的 case 集合。如果我們要監聽的 channel 個數是不確定的,且在運行時會動態變化,那麼 select 語法將無法滿足我們的要求。

那怎麼突破這一限制呢?鳥窩老師告訴我們用 reflect 包 [2]。

2. reflect.Select 和 reflect.SelectCase

很多朋友可能和我一樣,因爲沒有使用過 reflect 包操作 channel,就會以爲 reflect 操作 channel 的能力是 Go 新版本才提供的,但實則不然。reflect 包中用於操作 channel 的函數 Select 以及其切片參數的元素類型 SelectCase 早在 Go 1.1 版本就加入到 Go 語言中了,有下圖爲證:

那麼如何使用這一 “古老” 的機制呢?我們一起來看一些例子。

首先我們來看第一種情況,也是最好理解的一種情況,即從一個動態的 channel 集合進行 receive operations 的 select,下面是示例代碼:

// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv/main.go
package main

import (
 "fmt"
 "math/rand"
 "reflect"
 "sync"
 "time"
)

func main() {
 var wg sync.WaitGroup
 wg.Add(2)
 var rchs []chan int
 for i := 0; i < 10; i++ {
  rchs = append(rchs, make(chan int))
 }

 // 創建SelectCase
 var cases = createRecvCases(rchs)

 // 消費者goroutine
 go func() {
  defer wg.Done()
  for {
   chosen, recv, ok := reflect.Select(cases)
   if ok {
    fmt.Printf("recv from channel [%d], val=%v\n", chosen, recv)
    continue
   }
   // one of the channels is closed, exit the goroutine
   fmt.Printf("channel [%d] closed, select goroutine exit\n", chosen)
   return
  }
 }()

 // 生產者goroutine
 go func() {
  defer wg.Done()
  var n int
  s := rand.NewSource(time.Now().Unix())
  r := rand.New(s)
  for i := 0; i < 10; i++ {
   n = r.Intn(10)
   rchs[n] <- n
  }
  close(rchs[n])
 }()

 wg.Wait()
}

func createRecvCases(rchs []chan int) []reflect.SelectCase {
 var cases []reflect.SelectCase

 // 創建recv case
 for _, ch := range rchs {
  cases = append(cases, reflect.SelectCase{
   Dir:  reflect.SelectRecv,
   Chan: reflect.ValueOf(ch),
  })
 }
 return cases
}

在這個例子中,我們通過 createRecvCases 這個函數創建一個元素類型爲 reflect.SelectCase 的切片,之後使用 reflect.Select 可以監聽這個切片集合,就像常規 select 語法那樣,從有數據的 recv Channel 集合中隨機選出一個返回。

reflect.SelectCase 有三個字段:

// $GOROOT/src/reflect/value.go
type SelectCase struct {
    Dir  SelectDir // direction of case
    Chan Value     // channel to use (for send or receive)
    Send Value     // value to send (for send)
}

其中 Dir 字段的值是一個 “枚舉”,枚舉值如下:

// $GOROOT/src/reflect/value.go
const (
    _             SelectDir = iota
    SelectSend              // case Chan <- Send
    SelectRecv              // case <-Chan:
    SelectDefault           // default
)

從常量名我們也可以看出,Dir 用於標識 case 的類型,SelectRecv 表示這是一個從 channel 做 receive 操作的 case,SelectSend 表示這是一個向 channel 做 send 操作的 case;SelectDefault 則表示這是一個 default case。

構建好 SelectCase 的切片後,我們就可以將其傳給 reflect.Select 了。Select 函數的語義與 select 關鍵字語義是一致的,它會監聽傳入的所有 SelectCase,以上面示例爲例,如果所有 channel 都沒有數據,那麼 reflect.Select 會阻塞,直到某個 channel 有數據或關閉。

Select 函數有三個返回值:

// $GOROOT/src/reflect/value.go
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

對於上面示例而言,如果監聽的某個 case 有數據了,那麼 Select 的返回值 chosen 中存儲了該 channel 在 cases 切片中的下標,recv 中存儲了從 channel 收到的值,recvOK 等價於 comma, ok 模式的 ok,當正常接收到由 send channel 操作發送的值時,recvOK 爲 true,如果 channel 被 close 了,recvOK 爲 false。

上面的示例啓動了兩個 goroutine,一個 goroutine 充當消費者,由 reflect.Select 監聽一組 channel,當某個 channel 關閉時,該 goroutine 退出;另外一個 goroutine 則是隨機的向這些 channel 中發送數據,發送 10 次後,關閉其中某個 channel 通知消費者退出。

我們運行一下該示例程序,得到如下結果:

$go run main.go 
recv from channel [1]val=1
recv from channel [4]val=4
recv from channel [5]val=5
recv from channel [8]val=8
recv from channel [1]val=1
recv from channel [1]val=1
recv from channel [8]val=8
recv from channel [3]val=3
recv from channel [5]val=5
recv from channel [9]val=9
channel [9] closed, select goroutine exit

我們日常編碼時經常會在 select 語句中加上 default 分支,以防止 select 完全阻塞,下面我們就來改造一下示例,讓其增加對 default 分支的支持:

// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-recv-with-default/main.go

package main

import (
 "fmt"
 "math/rand"
 "reflect"
 "sync"
 "time"
)

func main() {
 var wg sync.WaitGroup
 wg.Add(2)
 var rchs []chan int
 for i := 0; i < 10; i++ {
  rchs = append(rchs, make(chan int))
 }

 // 創建SelectCase
 var cases = createRecvCases(rchs, true)

 // 消費者goroutine
 go func() {
  defer wg.Done()
  for {
   chosen, recv, ok := reflect.Select(cases)
   if cases[chosen].Dir == reflect.SelectDefault {
    fmt.Println("choose the default")
    continue
   }
   if ok {
    fmt.Printf("recv from channel [%d], val=%v\n", chosen, recv)
    continue
   }
   // one of the channels is closed, exit the goroutine
   fmt.Printf("channel [%d] closed, select goroutine exit\n", chosen)
   return
  }
 }()

 // 生產者goroutine
 go func() {
  defer wg.Done()
  var n int
  s := rand.NewSource(time.Now().Unix())
  r := rand.New(s)
  for i := 0; i < 10; i++ {
   n = r.Intn(10)
   rchs[n] <- n
  }
  close(rchs[n])
 }()

 wg.Wait()
}

func createRecvCases(rchs []chan int, withDefault bool) []reflect.SelectCase {
 var cases []reflect.SelectCase

 // 創建recv case
 for _, ch := range rchs {
  cases = append(cases, reflect.SelectCase{
   Dir:  reflect.SelectRecv,
   Chan: reflect.ValueOf(ch),
  })
 }

 if withDefault {
  cases = append(cases, reflect.SelectCase{
   Dir:  reflect.SelectDefault,
   Chan: reflect.Value{},
   Send: reflect.Value{},
  })
 }

 return cases
}

在這個示例中,我們的 createRecvCases 函數增加了一個 withDefault 布爾型參數,當 withDefault 爲 true 時,返回的 cases 切片中將包含一個 default case。我們看到,創建 defaultCase 時,Chan 和 Send 兩個字段需要傳入空的 reflect.Value。

在消費者 goroutine 中,我們通過選出的 case 的 Dir 字段是否爲 reflect.SelectDefault 來判定是否 default case 被選出,其餘的處理邏輯不變,我們運行一下這個示例:

$go run main.go
recv from channel [8]val=8
recv from channel [8]val=8
choose the default
choose the default
choose the default
choose the default
choose the default
recv from channel [1]val=1
choose the default
choose the default
choose the default
recv from channel [3]val=3
recv from channel [6]val=6
choose the default
choose the default
recv from channel [0]val=0
choose the default
choose the default
choose the default
recv from channel [5]val=5
recv from channel [2]val=2
choose the default
choose the default
choose the default
recv from channel [2]val=2
choose the default
choose the default
recv from channel [2]val=2
choose the default
choose the default
channel [2] closed, select goroutine exit

我們看到,default case 被選擇的幾率還是蠻大的。

最後,我們再來看看如何使用 reflect 包向 channel 中發送數據,看下面示例代碼:

// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-send/main.go

package main

import (
 "fmt"
 "reflect"
 "sync"
)

func main() {
 var wg sync.WaitGroup
 wg.Add(2)
 ch0, ch1, ch2 := make(chan int), make(chan int), make(chan int)
 var schs = []chan int{ch0, ch1, ch2}

 // 創建SelectCase
 var cases = createCases(schs)

 // 生產者goroutine
 go func() {
  defer wg.Done()
  for range cases {
   chosen, _, _ := reflect.Select(cases)
   fmt.Printf("send to channel [%d], val=%v\n", chosen, cases[chosen].Send)
   cases[chosen].Chan = reflect.Value{}
  }
  fmt.Println("select goroutine exit")
  return
 }()

 // 消費者goroutine
 go func() {
  defer wg.Done()
  for range schs {
   var v int
   select {
   case v = <-ch0:
    fmt.Printf("recv %d from ch0\n", v)
   case v = <-ch1:
    fmt.Printf("recv %d from ch1\n", v)
   case v = <-ch2:
    fmt.Printf("recv %d from ch2\n", v)
   }
  }
 }()

 wg.Wait()
}

func createCases(schs []chan int) []reflect.SelectCase {
 var cases []reflect.SelectCase

 // 創建send case
 for i, ch := range schs {
  n := i + 100
  cases = append(cases, reflect.SelectCase{
   Dir:  reflect.SelectSend,
   Chan: reflect.ValueOf(ch),
   Send: reflect.ValueOf(n),
  })
 }

 return cases
}

在這個示例中,我們針對三個 channel:ch0,ch1 和 ch2 創建了寫操作的 SelectCase,每個 SelectCase 的 Send 字段都被賦予了要發送給該 channel 的值,這裏使用了 “100 + 下標號”。

生產者 goroutine 中有一個 “與衆不同” 的地方,那就是每次某個寫操作觸發後,我都將該 SelectCase 中的 Chan 重置爲一個空 Value,以防止下次該 channel 被重新選出:

    cases[chosen].Chan = reflect.Value{}

運行一下該示例,我們得到:

$go run main.go
recv 101 from ch1
send to channel [1]val=101
send to channel [0]val=100
recv 100 from ch0
recv 102 from ch2
send to channel [2]val=102
select goroutine exit

通過上面的幾個例子我們看到,reflect.Select 有着與 select 等價的語義,且還支持動態增刪和修改 case,功能不可爲不強大,現在還剩一點要 care,那就是它的執行性能如何呢?我們接着往下看。

3. reflect.Select 的性能

我們用 benchmark test 來對比一下常規 select 與 reflect.Select 在執行性能上的差別,下面是 benchmark 代碼:

// github.com/bigwhite/experiments/tree/master/reflect-operate-channel/select-benchmark/benchmark_test.go
package main

import (
 "reflect"
 "testing"
)

func createCases(rchs []chan int) []reflect.SelectCase {
 var cases []reflect.SelectCase

 // 創建recv case
 for _, ch := range rchs {
  cases = append(cases, reflect.SelectCase{
   Dir:  reflect.SelectRecv,
   Chan: reflect.ValueOf(ch),
  })
 }
 return cases
}

func BenchmarkSelect(b *testing.B) {
 var c1 = make(chan int)
 var c2 = make(chan int)
 var c3 = make(chan int)

 go func() {
  for {
   c1 <- 1
  }
 }()
 go func() {
  for {
   c2 <- 2
  }
 }()
 go func() {
  for {
   c3 <- 3
  }
 }()

 b.ReportAllocs()
 b.ResetTimer()
 for i := 0; i < b.N; i++ {
  select {
  case <-c1:
  case <-c2:
  case <-c3:
  }
 }
}

func BenchmarkReflectSelect(b *testing.B) {
 var c1 = make(chan int)
 var c2 = make(chan int)
 var c3 = make(chan int)

 go func() {
  for {
   c1 <- 1
  }
 }()
 go func() {
  for {
   c2 <- 2
  }
 }()
 go func() {
  for {
   c3 <- 3
  }
 }()

 chs := createCases([]chan int{c1, c2, c3})

 b.ReportAllocs()
 b.ResetTimer()

 for i := 0; i < b.N; i++ {
  _, _, _ = reflect.Select(chs)
 }
}

運行一下該 benchmark:

$go test -bench .
goos: darwin
goarch: amd64
pkg: github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark
... ...
BenchmarkSelect-8            2765396        427.8 ns/op        0 B/op        0 allocs/op
BenchmarkReflectSelect-8     1839706        806.0 ns/op      112 B/op        6 allocs/op
PASS
ok   github.com/bigwhite/experiments/reflect-operate-channel/select-benchmark 3.779s

我們看到:reflect.Select 的執行效率相對於 select 還是要差的,並且在其執行過程中還要做額外的內存分配。

4. 小結

本文介紹了 reflect.Select 與 SelectCase 的結構以及如何使用它們在不同場景下操作 channel。但大多數情況下,我們是不需要使用 reflect.Select,常規 select 語法足以滿足我們的要求。並且 reflect.Select 有對 cases 數量的約束,最大支持 65536 個 cases,雖然這個約束對於大多數場合而言足夠用了。

本文涉及的示例源碼可以在這裏 [3] 下載。


Gopher Daily(Gopher 每日新聞) 歸檔倉庫 - https://github.com/bigwhite/gopherdaily

我的聯繫方式:

參考資料

[1] 

《Go 併發編程實戰課》: http://gk.link/a/11OCq

[2] 

reflect 包: https://tonybai.com/2021/04/19/variable-operation-using-reflection-in-go

[3] 

這裏: https://github.com/bigwhite/experiments/tree/master/reflect-operate-channel

[4] 

“Gopher 部落” 知識星球: https://wx.zsxq.com/dweb2/index/group/51284458844544

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/E4lT4SuWKIlCZd60i7vigQ