Go: 說說 fanIn 和 fanOut

文章目錄

今天回顧下常用的兩種channel應用模式: fanInfanOut,

分別對應了,對一組相同類型chan的合併和廣播。

fanIn

將全部輸入chan都聚合到一個out chan中,在全部聚合完成後,關閉out chan.

協程版

func fanIn(chans ...<-chan interface{}) <-chan interface{} {
 out := make(chan interface{})

 go func() {
  var wg sync.WaitGroup
  wg.Add(len(chans))
  for _, ch := range chans {
   go func(ch <-chan interface{}) {
    for v := range ch {
     out <- v
    }
    wg.Done()
   }(ch)
  }
  // 等待協程全部結束
  wg.Wait()
  close(out)
 }()
 return out
}

這裏用waitGroup是防止關閉out時還有寫入(out <- v),避免panic

遞歸版

2 分遞歸併合併。

其中合併mergeTwo主要用了nil chan對讀寫均阻塞。

chan關閉時,設置爲nil,阻塞讀取。

func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {
 switch len(chans) {
 case 0:
  c := make(chan interface{})
  close(c)
  // 無可聚合chan,返回一個已關閉chan,可讀不可寫
  return c
 case 1:
  return chans[0]
 case 2:
  return mergeTwo(chans[0], chans[1])
 default:
  // 一分爲二,遞歸
  m := len(chans) / 2
  return mergeTwo(
   fanInRecur(chans[:m]...),
   fanInRecur(chans[m:]...))
 }
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
 c := make(chan interface{})
 go func() {
  defer close(c)
  for a != nil || b != nil { // 只要還有可讀的chan
   select {
   case v, ok := <-a:
    if !ok { // a 已關閉,設置爲nil
     a = nil
     continue
    }
    c <- v
   case v, ok := <-b:
    if !ok { // b 已關閉,設置爲nil
     b = nil
     continue
    }
    c <- v
   }
  }
 }()
 return c
}

反射版

利用reflect.SelectCase構造批量可Select的發送chan

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
 out := make(chan interface{})
 go func() {
  defer close(out)
  // 構造SelectCase slice
  var cases []reflect.SelectCase
  for _, c := range chans {
   cases = append(cases, reflect.SelectCase{
    Dir:  reflect.SelectRecv,
    Chan: reflect.ValueOf(c),
   })
  }

  // 循環,從cases中選擇一個可用的
  for len(cases) > 0 {
   i, v, ok := reflect.Select(cases)
   if !ok {
    // 此channel已經close, 從切片移除
    cases = append(cases[:i], cases[i+1:]...)
    continue
   }
   out <- v.Interface()
  }
 }()
 return out
}

附上壓測數據

性能對比

fanOut

同步版

最直觀的方式,直接向每一個chan都同步發送一遍 返回前關閉這組chan, 即不再寫入

func fanOut(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  defer func() { // 退出時關閉所有的輸出chan
   for i := range out {
    close(out[i])
   }
  }()

  for v := range ch { // 從輸入chan中讀取數據
   v := v
   for i := range out {
    i := i
    out[i] <- v // 放入到輸出chan中,同步方式
   }
  }
 }()
}

協程異步版

發送這裏用起協程的方式,實現異步,發送操作耗時情況下無需阻塞等待

可是有個問題,不知道你看出來沒。

func fanOut(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  defer func() { // 退出時關閉所有的輸出chan
   for i := range out {
    close(out[i])
   }
  }()

  for v := range ch { // 從輸入chan中讀取數據
   v := v
   for i := range out {
    i := i
    // 協程異步
    go func(){}
      out[i] <- v
    }()
   }
  }
 }()
}

乍一看好像沒什麼問題, 但退出時關閉時,很可能發送的協程寫入還沒完成,

畢竟這裏out之前寫入的要有人讀才能繼續寫。

這裏加waitGroup可以等待全部發送完畢在關閉

func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  var wg sync.WaitGroup
  defer func() { // 退出時關閉所有的輸出chan
   wg.Wait()
   for i := range out {
    close(out[i])
   }
  }()

  for v := range ch { // 從輸入chan中讀取數據
   v := v
   for i := range out {
    i := i
    wg.Add(1)
    go func() { // 異步,避免一個out阻塞的時候影響其他out
     out[i] <- v
     wg.Done()
    }()
   }
  }
 }()
}

反射版

構造一票chan send case, 遍歷select,發送完成的將其置爲nil阻塞,避免再次發送

不得不說,nil chan出鏡率很高啊

func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  defer func() { // 退出時關閉所有的輸出chan
   for i := range out {
    close(out[i])
   }
  }()
  cases := make([]reflect.SelectCase, len(out))
  // 構造SelectCase slice
  for i := range cases {
   cases[i].Dir = reflect.SelectSend
  }
  for v := range ch {
   v := v
   // 先完成send case構造
   for i := range cases {
    cases[i].Chan = reflect.ValueOf(out[i])
    cases[i].Send = reflect.ValueOf(v)
   }
   // 遍歷select
   for range cases {
    chosen, _, _ := reflect.Select(cases)
    // 已發送過,用nil阻塞,避免再次發送
    cases[chosen].Chan = reflect.ValueOf(nil)
   }
  }
 }()
}

附上壓測數據

性能對比

具體測試代碼詳見:concurrency[1]

參考資料

[1]

concurrency: https://github.com/NewbMiao/Dig101-Go/tree/master/concurrency/channel/schedule

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/p5_Z9yJipKNVZk89fGOcVw