使用 context、WaitGroup 優雅處理 goroutine

via:
https://justbartek.ca/p/golang-context-wg-go-routines/
作者:Bartek

你好,我是 Seekload。

今天給大家分享一篇 如何使用 context、waitGroup 實現程序快速且優雅退出 的文章!

原文如下:

最近,我正在編寫一個 “滴答器” 的應用程序,每次 “滴答” 時可能會產生數千的 goroutine。我想確保當應用終止時,即使有一些特定的 goroutine 處理比較緩慢,它也能快速而優雅地退出。

剛開始的時候,圍繞如何輸出日誌,我使用 sync.WaitGroup 實現流程控制,但我很快意識到如果我創建了很多 goroutine,即使其中很小一部分沒有立即返回,我的程序會在終止時 hang 住。這讓我重新考慮 context.WithCancel,並理解該如何重新調整我的程序,使其能快速且優雅地退出!

我們可以通過構建示例程序一步步來驗證下,最初的示例程序並不會使用前面提到的技術點。

package main

import (
 "fmt"
 "log"
 "math/rand"
 "os"
 "os/signal"
 "syscall"
 "time"
)

func doSomething(ch chan int) {
 fmt.Printf("Received job %d\n", <-ch)
}

func init() {
 rand.Seed(time.Now().Unix())
}

func main() {
 var (
  closing   = make(chan struct{})
  ticker    = time.NewTicker(1 * time.Second)
  logger    = log.New(os.Stderr, "", log.LstdFlags)
  batchSize = 6
  jobs      = make(chan int, batchSize)
 )

 go func() {
  signals := make(chan os.Signal, 1)
  signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
  <-signals
  close(closing)
 }()
loop:
 for {
  select {
  case <-closing:
   break loop
  case <-ticker.C:
   for n := 0; n < batchSize; n++ {
    jobs <- n
    go doSomething(jobs)
   }
   logger.Printf("Completed doing %d things.", batchSize)
  }
 }
}

執行程序,我們會發現 Received job ... 和 Completed doing ... 會交替輸出,輸出可能類似下面這樣:

Received job 0
Received job 1
Received job 2
2021/02/08 21:30:59 Completed doing 6 things.
Received job 3
Received job 4
Received job 5
2021/02/08 21:31:00 Completed doing 6 things.

多次打印的結果並不一致!這是合理的,我們都知道 goroutines 並不會阻塞,所以除非我們對它做些什麼,否則協程裏的代碼會立即執行。

我們添加 WaitGroup 來完善下流程,先在 var 代碼塊中定義變量:

var (
    ..
    wg sync.WaitGroup
)

調整下 loop 循環:

for n := 0; n < batchSize; n++ {
    wg.Add(1)
    jobs <- n
    go doSomething(&wg, jobs)
}
wg.Wait()
logger.Printf("Completed doing %d things.", batchSize)

最後,修改協程函數:

func doSomething(wg *sync.WaitGroup, ch chan int) {
    defer wg.Done()
    fmt.Printf("Received job %d\n", <-ch)
}

WaitGroups 會等待一組 goroutines 執行完成,仔細閱讀代碼我們發現:

  1. 每次循環時 WaitGroup 的計數器會加 1,加 1 原因是因爲在 goroutine 裏每次調用 wg.Done() 計數器會減一,這樣 goroutine 執行完成返回之後計數器能維持平衡;

  2. 在調用 logger 之前,我們添加了 wg.Wait(),這樣當程序執行到這裏的時候便會阻塞直到 WaitGroups 的計數器減爲 0。當所有 goroutines 調用 wg.Done() 之後,計數器便會恢復成 0。

很簡單,是不是?我們再次執行程序,可以看到結果比之前的更一致:

2021/02/08 21:46:47 Completed doing 6 things.
Received job 0
Received job 1
Received job 2
Received job 4
Received job 5
Received job 3
2021/02/08 21:46:48 Completed doing 6 things.
Received job 0
Received job 2
Received job 3
Received job 4
Received job 5
Received job 1

順便說一句,與預期的一樣,jobs 並不會按順序執行,因爲我們並沒有採取任何措施來確保這一點。

在我們繼續之前,按照目前的狀態執行程序並嘗試使用 Control+D 來終止程序,程序退出不會出現任何問題。

爲了證明程序需要進一步完善,讓我們添加一些代碼模擬真實業務場景。我們新建一個函數,函數里面調用外部 API 並等待請求響應。請求過程中,我們將會調用 context.WithCancel 取消請求。

首先,創建一個未使用 context 的函數。下面的代碼更復雜,有必要的話請看註釋:

func doAPICall(wg *sync.WaitGroup) error {
 defer wg.Done()

 req, err := http.NewRequest("GET""https://httpstat.us/200", nil)
 if err != nil {
  return err
 }

 // The httpstat.us API accepts a sleep parameter which sleeps the request for the
 // passed time in ms
 q := req.URL.Query()
 sleepMin := 1000
 sleepMax := 4000
 q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin))
 req.URL.RawQuery = q.Encode()

 // Make the request to the API in an anonymous function, using a channel to
 // communicate the results
 c := make(chan error, 1)
 go func() {
  // For the purposes of this example, we're not doing anything with the response.
  _, err := http.DefaultClient.Do(req)
  c <- err
 }()

 // Block until the channel is populated
 return <-c
}

修改定時器 “滴答”,刪除調用 doSomething() 的代碼、刪除 jobs channel(不會再使用到它)並且調用 doAPICall()。

for n := 0; n < batchSize; n++ {
    wg.Add(1)
    go doAPICall(&wg)
}

執行程序並再次嘗試退出程序:

現在來演示 context.WithCancel 如何進一步控制程序取消。當 context.WithCancel 初始化之後,會返回一個 context 和取消函數 CancelFunc()。這個取消函數會取消 context,第一次聽到這個會困惑。閱讀 Go 官方博客的文章 Go Concurrency Patterns: Context[1] 對於進一步理解 context.WithCancel 會有所幫助,推薦閱讀完本篇文章之後再看!

ok,我們回到正文。爲了實現取消流程控制,需要修改下代碼。首先,使用 context 創建一個取消函數:

var (
    ctx, cancel = context.WithCancel(context.Background())
    ...
)

接着,在匿名函數里監聽程序終止的信號,signals 被通知之後調用 CancelFunc,這意味着上下文將被視爲已取消:

go func() {
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
    <-signals
    logger.Println("Initiating shutdown of producer.")
    cancel()
    close(closing)
}()

接着,調整 doAPICall() 函數,多接收一個 context 參數;使用 select-case 修改函數返回,等待 ctx.Done 或等待請求響應。爲了簡介,只展示了函數部分代碼:

func doAPICall(ctx context.Context, ....) {
    // Cancel the request if ctx.Done is closed or await the response
    select {
    case <-ctx.Done():
           return ctx.Err()
    case err := <-c:
        return err
    }
}

最後,確保調用 doAPICall() 函數時傳遞了 context 參數。現在,運行程序並多次在不同的時間點終止程序。

現在會發生什麼?程序會立即退出。select-case 代碼會監聽 ctx.Done 是否關閉或者接口請求是否響應,哪個 case 的 channel 信號先到就先執行誰。當應用程序終止時,ctx.Done() 優先執行並且函數提前返回,不再關心請求是否響應。WaitGroup 的作用沒變 - 等待一組 goroutines 完成。現在,程序的終止流程得到很大改善。

Go 的基本哲學之一就是:

Don't communicate by sharing memory; share memory by communicating.

這裏,我們使用 channel 在 goroutines 之間傳遞引用,這使得我們能夠改進應用程序的流程。

有很多種辦法可以用來改善流程,例如,我們不跨 goroutine 接收 API 的響應或者錯誤。值得慶幸的是,Go 很容易就可以實現這點,因此可以將它視爲一個起點,如果你還想完善,可以嘗試下這些想法。

下面是完整的示例,僅供參考:

package main

import (
 "context"
 "fmt"
 "log"
 "math/rand"
 "net/http"
 "os"
 "os/signal"
 "sync"
 "syscall"
 "time"
)

func doAPICall(ctx context.Context, wg *sync.WaitGroup) error {
 defer wg.Done()

 req, err := http.NewRequest("GET""https://httpstat.us/200", nil)
 if err != nil {
  return err
 }

 // The httpstat.us API accepts a sleep parameter which sleeps the request for the
 // passed time in ms
 q := req.URL.Query()
 sleepMin := 1000
 sleepMax := 4000
 q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin))
 req.URL.RawQuery = q.Encode()

 c := make(chan error, 1)
 go func() {
  // For the purposes of this example, we're not doing anything with the response.
  _, err := http.DefaultClient.Do(req)
  c <- err
 }()

 // Block until either channel is populated or closed
 select {
 case <-ctx.Done():
  return ctx.Err()
 case err := <-c:
  return err
 }
}

func init() {
 rand.Seed(time.Now().Unix())
}

func main() {
 var (
  closing     = make(chan struct{})
  ticker      = time.NewTicker(1 * time.Second)
  logger      = log.New(os.Stderr, "", log.LstdFlags)
  batchSize   = 6
  wg          sync.WaitGroup
  ctx, cancel = context.WithCancel(context.Background())
 )

 go func() {
  signals := make(chan os.Signal, 1)
  signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
  <-signals
  cancel()
  close(closing)
 }()
loop:
 for {
  select {
  case <-closing:
   break loop
  case <-ticker.C:
   for n := 0; n < batchSize; n++ {
    wg.Add(1)
    go doAPICall(ctx, &wg)
   }
   wg.Wait()
   logger.Printf("Completed doing %d things.", batchSize)
  }
 }
}

最後一點,本文部分代碼受到博文 Go Concurrency Patterns: Context[2] 的啓發,再次推薦這篇文章。這篇文章還介紹了其他控制函數,比如:context.WithTimeout 等。Go 官方博客是每個人都應該閱讀的寶庫!

參考資料

[1]

Go Concurrency Patterns: Context: https://blog.golang.org/context

[2]

Go Concurrency Patterns: Context: https://blog.golang.org/context

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/SUVHEei2vS6mrWsVJb4mbA