Golang 使用 Worker Pool 模式釋放併發性能

我們都知道 Go 具有卓越的併發特性,Worker Pool pattern(工作池模式)是一種併發設計模式,它用於管理一組工作線程以執行任務。本文用一個例子深入講解如何使用 Worker Pool 模式提升程序的併發性能。

拋出問題

快速計算一個數字數組中每個數字的平方,並返回結果數組(該數組只包含從 1 到 150 的整數)。

Pattern 1 : N routines for N numbers:

我們將根據輸入大小生成 goroutines,這意味着對於每個數字,我們將創建一個 goroutine 並將計算結果發送到輸出通道。爲了存儲平方數,我們需要創建一個結構體。

type Data struct {
   number int
   square float64
}

現在,讓我們來看一下主要邏輯。

首先,我們將創建一個 outputCh 通道來接收來自 goroutines 的計算結果。我們將使用 WaitGroup 來阻塞主函數,直到我們收到所有的計算結果。正如代碼中所示,每個數字生成一個 goroutine,並將平方數發送到 outputCh。

func ProcessData(numbers []int) []*Data {
  outputs := make([]*Data, 0)
  outputCh := make(chan *Data)
  
  var wg sync.WaitGroup
  for _, v := range numbers {
    wg.Add(1)

    go func(v int) {
      defer wg.Done()
      outputCh <- &Data{
        number: v,
        square: float64(v) * float64(v),
      }
    }(v)
  }
  
  //close the channel after finishing all go routines
  go func() {
    wg.Wait()
    close(outputCh)
  }()
  
  // receive output
  for d := range outputCh {
    outputs = append(outputs, d)
  }
  
  return outputs
}

測試下上面代碼的 BenchMark:

❯ go test -benchmem -run=^$ -bench ^BenchmarkProcessData$ go-pattern -benchtime=100000x
goos: darwin
goarch: arm64
pkg: go-pattern
BenchmarkProcessData-12   100000  101342 ns/op  14298 B/op  462 allocs/op
PASS
ok   go-pattern 10.449s

函數 ProcessData 在約 101,342 納秒 / 操作的時間內完成了 100,000 次操作,使用了約 14,298 字節 / 操作的內存和約 462 次操作的分配。測試耗時 10.449 秒。

現在讓我們使用 Worker Pool 模式進行優化。

Pattern 2: Worker Pool

在之前的示例中,我們生成了 N 個 goroutines,現在我們將限制爲 5 個 goroutine,並通過一個 workerCh 將數字提供給這些 goroutine。

func publishData(numbers []int) <-chan int {
  workerCh := make(chan int)
  go func() {
    defer close(workerCh)
    for _, v := range numbers {
        workerCh <- v
    }
  }()
  return workerCh
}

注意必須在發送所有數字後關閉 workerCh ,因爲我們需要通知工作程序輸入數據已經完成。

func ProcessData(numbers []int) []*Data {
  output := make([]*Data, 0)
  outputCh := make(chan *Data)
  workerCount := 5

  var wg sync.WaitGroup
  
  workerCh := publishData(numbers)
  
  for i := 0; i < workerCount; i++ {
    wg.Add(1)
    
    go func() {
      defer wg.Done()
      
      for v := range workerCh {
        outputCh <- &Data{
          number: v,
          square: float64(v) * float64(v),
        }
      }
    }()
  }
  
  //close the channel after finishing all go routines
  go func() {
    wg.Wait()
    close(outputCh)
  }()
  
  // receive output
  for d := range outputCh {
    output = append(output, d)
  }
  
  return output
}

所有 5 個 goroutines 都從 workerCh 接收輸入,計算完成後將結果發送到 outputCh。

來看下 worker pool 模式的 benchmark:

❯ go test -benchmem -run=^$ -bench ^BenchmarkProcessData$ go-pattern -benchtime=100000x
goos: darwin
goarch: arm64
pkg: go-pattern
BenchmarkProcessData-12  100000   65799 ns/op   7317 B/op    169 allocs/op
PASS
ok   go-pattern 6.857s

該測試在約 65,799 納秒 / 操作的時間內執行了 100,000 次操作。它平均使用了約 7,317 字節 / 操作的內存,並且每次操作分配了約 169 次。該測試成功通過,大約耗時 6.857 秒。

結論

從 benchmark 的結果來看,通過 Worker pool 限制 goroutine 數量所需的時間更短。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/hCj4Pf6I-YtVK9cc2Q1pgQ