Go: 說說 fanIn 和 fanOut
文章目錄
-
fanIn
-
協程版
-
遞歸版
-
反射版
-
fanOut
-
同步版
-
協程異步版
-
反射版
今天回顧下常用的兩種channel
應用模式: fanIn
和fanOut
,
分別對應了,對一組相同類型chan
的合併和廣播。
fanIn
將全部輸入chan
都聚合到一個out chan
中,在全部聚合完成後,關閉out chan
.
協程版
func fanIn(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
var wg sync.WaitGroup
wg.Add(len(chans))
for _, ch := range chans {
go func(ch <-chan interface{}) {
for v := range ch {
out <- v
}
wg.Done()
}(ch)
}
// 等待協程全部結束
wg.Wait()
close(out)
}()
return out
}
這裏用waitGroup
是防止關閉out
時還有寫入(out <- v
),避免panic
遞歸版
2 分遞歸併合併。
其中合併mergeTwo
主要用了nil chan
對讀寫均阻塞。
當chan
關閉時,設置爲nil
,阻塞讀取。
func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
// 無可聚合chan,返回一個已關閉chan,可讀不可寫
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default:
// 一分爲二,遞歸
m := len(chans) / 2
return mergeTwo(
fanInRecur(chans[:m]...),
fanInRecur(chans[m:]...))
}
}
func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil { // 只要還有可讀的chan
select {
case v, ok := <-a:
if !ok { // a 已關閉,設置爲nil
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok { // b 已關閉,設置爲nil
b = nil
continue
}
c <- v
}
}
}()
return c
}
反射版
利用reflect.SelectCase
構造批量可Select
的發送chan
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
out := make(chan interface{})
go func() {
defer close(out)
// 構造SelectCase slice
var cases []reflect.SelectCase
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
// 循環,從cases中選擇一個可用的
for len(cases) > 0 {
i, v, ok := reflect.Select(cases)
if !ok {
// 此channel已經close, 從切片移除
cases = append(cases[:i], cases[i+1:]...)
continue
}
out <- v.Interface()
}
}()
return out
}
附上壓測數據
性能對比
fanOut
同步版
最直觀的方式,直接向每一個chan
都同步發送一遍 返回前關閉這組chan
, 即不再寫入
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出時關閉所有的輸出chan
for i := range out {
close(out[i])
}
}()
for v := range ch { // 從輸入chan中讀取數據
v := v
for i := range out {
i := i
out[i] <- v // 放入到輸出chan中,同步方式
}
}
}()
}
協程異步版
發送這裏用起協程的方式,實現異步,發送操作耗時情況下無需阻塞等待
可是有個問題,不知道你看出來沒。
func fanOut(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出時關閉所有的輸出chan
for i := range out {
close(out[i])
}
}()
for v := range ch { // 從輸入chan中讀取數據
v := v
for i := range out {
i := i
// 協程異步
go func(){}
out[i] <- v
}()
}
}
}()
}
乍一看好像沒什麼問題, 但退出時關閉時,很可能發送的協程寫入還沒完成,
畢竟這裏out
之前寫入的要有人讀才能繼續寫。
這裏加waitGroup
可以等待全部發送完畢在關閉
func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {
go func() {
var wg sync.WaitGroup
defer func() { // 退出時關閉所有的輸出chan
wg.Wait()
for i := range out {
close(out[i])
}
}()
for v := range ch { // 從輸入chan中讀取數據
v := v
for i := range out {
i := i
wg.Add(1)
go func() { // 異步,避免一個out阻塞的時候影響其他out
out[i] <- v
wg.Done()
}()
}
}
}()
}
反射版
構造一票chan send case
, 遍歷select
,發送完成的將其置爲nil
阻塞,避免再次發送
不得不說,nil chan
出鏡率很高啊
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
go func() {
defer func() { // 退出時關閉所有的輸出chan
for i := range out {
close(out[i])
}
}()
cases := make([]reflect.SelectCase, len(out))
// 構造SelectCase slice
for i := range cases {
cases[i].Dir = reflect.SelectSend
}
for v := range ch {
v := v
// 先完成send case構造
for i := range cases {
cases[i].Chan = reflect.ValueOf(out[i])
cases[i].Send = reflect.ValueOf(v)
}
// 遍歷select
for range cases {
chosen, _, _ := reflect.Select(cases)
// 已發送過,用nil阻塞,避免再次發送
cases[chosen].Chan = reflect.ValueOf(nil)
}
}
}()
}
附上壓測數據
性能對比
具體測試代碼詳見:concurrency[1]
參考資料
[1]
concurrency: https://github.com/NewbMiao/Dig101-Go/tree/master/concurrency/channel/schedule
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/p5_Z9yJipKNVZk89fGOcVw