Go: orDone 的兩種實現

文章目錄

orDone 是一種併發控制模式,旨在多任務場景下實現,有一個任務成功返回即立即結束等待。

今天我們來看下兩種不同的實現方式:

方式一 遞歸

利用二分法遞歸, 將所有待監聽信號的chanselect起來,

當有第一個chan返回時,close orDone 來通知讀取方已有第一個任務返回

代碼如下比較直觀:

// 傳入多個併發chan,返回是否結束的 orDone chan
func Or(channels ...<-chan interface{}) <-chan interface{} {
 // 只有零個或者1個chan
 switch len(channels) {
 case 0:
        // 返回nil, 讓讀取阻塞等待
  return nil
 case 1:
  return channels[0]
 }

 orDone := make(chan interface{})
 go func() {
        // 返回時利用close做結束信號的廣播
  defer close(orDone)

        // 利用select監聽第一個chan的返回
  switch len(channels) {
  case 2: // 直接select
   select {
   case <-channels[0]:
   case <-channels[1]:
   }
  default: // 二分法遞歸處理
   m := len(channels) / 2
   select {
   case <-Or(channels[:m]...):
   case <-Or(channels[m:]...):
   }
  }
 }()

 return orDone
}

方式二 利用反射

這裏要用到reflect.SelectCase, 他可以描述一種selectcase, 來指明其接受的是chan的讀取或發送

type SelectCase struct {
 Dir  SelectDir // direction of case
 Chan Value     // channel to use (for send or receive)
 Send Value     // value to send (for send)
}

有了這個,就可以之間遍歷,不用遞歸來實現有限的select case構造

最後用reflect.Select(cases)監聽信號就可以了,代碼如下:

func OrInReflect(channels ...<-chan interface{}) <-chan interface{} {
 // 只有0個或者1個
 switch len(channels) {
 case 0:
  return nil
 case 1:
  return channels[0]
 }

 orDone := make(chan interface{})
 go func() {
  defer close(orDone)
  // 利用反射構建SelectCase,這裏是讀取
  var cases []reflect.SelectCase
  for _, c := range channels {
   cases = append(cases, reflect.SelectCase{
    Dir:  reflect.SelectRecv,
    Chan: reflect.ValueOf(c),
   })
  }

  // 隨機選擇一個可用的case
  reflect.Select(cases)
 }()

 return orDone
}

性能差異

這兩種都可以支持大量chan的信號監聽,那性能差異大麼

雖說遞歸開銷肯定不小,反射也不一定效率高,拿個壓測來試試吧

先構造一下大量併發chan

func repeat(
 done <-chan interface{},
    // 外部傳入done控制是否結束
 values ...interface{},
) <-chan interface{} {
 valueStream := make(chan interface{})
 go func() {
        // 返回時釋放
  defer close(valueStream)
  for {
   for _, v := range values {
    select {
    case <-done:
     return
    case valueStream <- v:
    }
   }
  }
 }()
 return valueStream
}

然後壓測

func BenchmarkOr(b *testing.B) {
 done := make(chan interface{})
 defer close(done)
 num := 100
 streams := make([]<-chan interface{}, num)
 for i := range streams {
  streams[i] = repeat(done[]int{1, 2, 3})
 }
 b.ResetTimer()
 for i := 0; i < b.N; i++ {
  <-Or(streams...)
 }
}

func BenchmarkOrInReflect(b *testing.B) {
 // 代碼類似
}

跑了下結果如下:

goos: darwin
goarch: amd64
pkg: github.com/NewbMiao/Dig101-Go/concurrency/channel/schedule/orDone
BenchmarkOr-12                 31815      38136 ns/op     9551 B/op       99 allocs/op
BenchmarkOrInReflect-12        55797      21755 ns/op    25232 B/op      112 allocs/op

可以看出,大量併發chan場景下, 反射使用內存更多些,但速度更快。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/aWKQP-4RjCYCDJMPlkSCGg