Go 流水線編程模式

流水線工作模型在工業領域內十分常見,它將工作流程分爲多個環節,每個環節根據工作強度安排合適的人員數量。良好的流水線設計儘量讓各環節的流通率平衡,最大化提高產能效率。

Go 是一門實用性語言,流水線工作模型與 Go 融合地非常融洽,只不過我們一般使用另一個名詞來表示流水線:pipeline。

pipeline

pipeline 由多個環節組成,具體在 Go 中,環節之間通過 channel 通信,同一個環節任務可以由多個 goroutine 來同時處理。

pipeline

pipeline 的核心是數據,通過 channel 來保證數據流動,每個環節的數據處理由 goroutine 完成。

除了開始環節和結束環節,每個環節都有任意數量的輸入 channel 和輸出 channel。開始環節被稱爲發送者或生產者,結束環節被稱爲接收者或消費者。

下面我們來看一個簡單的 pipeline 例子,分爲三個環節。

第一個環節,generate 函數:它充當生產者角色,將數據寫入 channel,並把該 channel 返回。當所有數據寫入完畢,關閉 channel。

func generate(nums ...int) <-chan int {
 out := make(chan int)
 go func() {
  for _, n := range nums {
   out <- n
  }
  close(out)
 }()
 return out
}

第二個環節,square 函數:它是數據處理的角色,從開始環節中的 channel  取出數據,計算平方,將結果寫入新的 channel ,並把該新的 channel 返回。當所有數據計算完畢,關閉該新 channel。

func square(in <-chan int) <-chan int {
 out := make(chan int)
 go func() {
  for n := range in {
   out <- n * n
  }
  close(out)
 }()
 return out
}

main 函數負責編排整個 pipeline ,並充當消費者角色:讀取第二個環節的 channel 數據,打印出來。

func main() {
 // Set up the pipeline.
 c := generate(2, 3)
 out := square(c)

 // Consume the output.
 for n := range out {
  fmt.Println(n)
 }
}

Fan-out,fan-in

在上述例子中,環節之間通過非緩衝的 channel 傳遞數據,節點中的數據都是單個 goroutine 處理與消費。

這種工作模式並不高效,會讓整個流水線的效率取決於最慢的環節。因爲每個環節中的任務量是不同的,這意味着我們需要的機器資源是存在差異的。任務量小的環節,儘量佔有少量的機器資源,任務量重的環節,需要更多線程並行處理。

以汽車組裝爲例,我們可以將組裝輪胎的工作分發給 4 個人一起幹,當輪胎組裝完畢之後,再交由剩下的環節。

多個 goroutine 可以從同一個 channel 讀取數據,直到該通道關閉,這稱爲 fan-out(扇出)。

這個稱呼比較形象,它將數據進行分散,所以被稱爲扇出。扇出是一種分發任務的模式。

fan-out

單個 goroutine 可以從多個輸入 channel 中讀取數據,直到所有輸入都關閉。具體做法是將輸入 channel 多路複用到同一個 channel 上,當所有輸入 channel 都關閉時,該 channel 也關閉,這稱爲 fan-in(扇入)。

它將數據進行聚合,所以被稱爲扇入。扇入是一種整合任務結果的模式。

fan-in

在汽車組裝的例子中,分發輪胎任務給每個人是 Fan-out,合併輪胎組裝結果就是 Fan-in。

channel 的多路複用

扇出的編碼模型比較簡單,本文不多研究,我們提供一個扇入編程示例。

創建一個生成器函數 generate,通過 interval 參數控制消息生成頻率。生成器返回消息 channel mc與停止 channel sc,停止 channel 用於停止生成器任務。

func generate(message string, interval time.Duration) (chan string, chan struct{}) {
 mc := make(chan string)
 sc := make(chan struct{})

 go func() {
  defer func() {
   close(sc)
  }()

  for {
   select {
   case <-sc:
    return
   default:
    time.Sleep(interval)

    mc <- message
   }
  }
 }()

 return mc, sc
}

stopGenerating 函數通過通過向 sc 中傳入空結構體,通知 generate退出,調用 close(mc) 關閉消息 channel

func stopGenerating(mc chan string, sc chan struct{}) {
 sc <- struct{}{}

 close(mc)
}

多路複用函數 multiplex 創建並返回整合消息 channel 和控制併發的 wg

func multiplex(mcs ...chan string) (chan string, *sync.WaitGroup) {
 mmc := make(chan string)
 wg := &sync.WaitGroup{}

 for _, mc := range mcs {
  wg.Add(1)

  go func(mc chan string, wg *sync.WaitGroup) {
   defer wg.Done()

   for m := range mc {
    mmc <- m
   }
  }(mc, wg)
 }

 return mmc, wg
}

main 函數中,創建兩個消息 channel 並複用它們生成 mmc ,打印來自 mmc 的每條消息。另外,我們還實現了接收系統斷信號(終端上執行 CTRL+C 即可發送中斷信號)的優雅的關閉機制。

func main() {
 // create two sample message and stop channels
 mc1, sc1 := generate("message from generator 1", 200*time.Millisecond)
 mc2, sc2 := generate("message from generator 2", 300*time.Millisecond)

 // multiplex message channels
 mmc, wg1 := multiplex(mc1, mc2)

 // create errs channel for graceful shutdown
 errs := make(chan error)

 // wait for interrupt or terminate signal
 go func() {
  sc := make(chan os.Signal, 1)
  signal.Notify(sc, syscall.SIGINT, syscall.SIGTERM)
  errs <- fmt.Errorf("%s signal received", <-sc)
 }()

 // wait for multiplexed messages
 wg2 := &sync.WaitGroup{}
 wg2.Add(1)
 go func() {
  defer wg2.Done()

  for m := range mmc {
   fmt.Println(m)
  }
 }()

 // wait for errors
 if err := <-errs; err != nil {
  fmt.Println(err.Error())
 }

 // stop generators
 stopGenerating(mc1, sc1)
 stopGenerating(mc2, sc2)
 wg1.Wait()

 // close multiplexed messages channel
 close(mmc)
 wg2.Wait()
}

總結

本文簡單介紹了流水線編程模式,它和我們熟悉的生產者 - 消費者模式非常相似。

具體到 Go 編程實踐中,pipeline 將數據流分爲多個環節,channel 用於數據流動,goroutine 用於處理數據。fan-out 用於分發任務,fan-in 用於數據整合,通過 FAN 模式可以讓流水線更好地併發。

當然,還有些細節需要注意,例如停止通知機制,可參照本文 channel 的多路複用章節示例中的 stopGenerating 函數;如何通過 sync.WaitGroup 做好併發控制,這些都是需要讀者在實際編碼中去體會掌握的。

參考

Go Concurrency Patterns: Pipelines and cancellation:https://go.dev/blog/pipelines

Multiplexing Channels In Go:https://medium.com/@ermanimer/multiplexing-channels-in-go-a7dccdcc4134

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/IuGaKbQvW7z1KsoMhB8mgg