Go 每日一庫之 ants

簡介

處理大量併發是 Go 語言的一大優勢。語言內置了方便的併發語法,可以非常方便的創建很多個輕量級的 goroutine 併發處理任務。相比於創建多個線程,goroutine 更輕量、資源佔用更少、切換速度更快、無線程上下文切換開銷更少。但是受限於資源總量,系統中能夠創建的 goroutine 數量也是受限的。默認每個 goroutine 佔用 8KB 內存,一臺 8GB 內存的機器滿打滿算也只能創建 8GB/8KB = 1000000 個 goroutine,更何況系統還需要保留一部分內存運行日常管理任務,go 運行時需要內存運行 gc、處理 goroutine 切換等。使用的內存超過機器內存容量,系統會使用交換區(swap),導致性能急速下降。我們可以簡單驗證一下創建過多 goroutine 會發生什麼:

func main() {
  var wg sync.WaitGroup
  wg.Add(10000000)
  for i := 0; i < 10000000; i++ {
    go func() {
      time.Sleep(1 * time.Minute)
    }()
  }
  wg.Wait()
}

在我的機器上(8G 內存)運行上面的程序會報errno 1455,即Out of Memory錯誤,這很好理解。謹慎運行

另一方面,goroutine 的管理也是一個問題。goroutine 只能自己運行結束,外部沒有任何手段可以強制 j 結束一個 goroutine。如果一個 goroutine 因爲某種原因沒有自行結束,就會出現 goroutine 泄露。此外,頻繁創建 goroutine 也是一個開銷。

鑑於上述原因,自然出現了與線程池一樣的需求,即 goroutine 池。一般的 goroutine 池自動管理 goroutine 的生命週期,可以按需創建,動態縮容。向 goroutine 池提交一個任務,goroutine 池會自動安排某個 goroutine 來處理。

ants就是其中一個實現 goroutine 池的庫。

快速使用

本文代碼使用 Go Modules。

創建目錄並初始化:

$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants

安裝ants庫,使用v2版本:

$ go get -u github.com/panjf2000/ants/v2

我們接下來要實現一個計算大量整數和的程序。首先創建基礎的任務結構,並實現其執行任務方法:

type Task struct {
  index int
  nums  []int
  sum   int
  wg    *sync.WaitGroup
}

func (t *Task) Do() {
  for _, num := range t.nums {
    t.sum += num
  }

  t.wg.Done()
}

很簡單,就是將一個切片中的所有整數相加。

然後我們創建 goroutine 池,注意池使用完後需要手動關閉,這裏使用defer關閉:

p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()

func taskFunc(data interface{}) {
  task := data.(*Task)
  task.Do()
  fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}

上面調用了ants.NewPoolWithFunc()創建了一個 goroutine 池。第一個參數是池容量,即池中最多有 10 個 goroutine。第二個參數爲每次執行任務的函數。當我們調用p.Invoke(data)的時候,ants池會在其管理的 goroutine 中找出一個空閒的,讓它執行函數taskFunc,並將data作爲參數。

接着,我們模擬數據,做數據切分,生成任務,交給 ants 處理:

const (
  DataSize    = 10000
  DataPerTask = 100
)

nums := make([]int, DataSize, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  task := &Task{
    index: i + 1,
    nums:  nums[i*DataPerTask : (i+1)*DataPerTask],
    wg:    &wg,
  }

  tasks = append(tasks, task)
  p.Invoke(task)
}

wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())

隨機生成 10000 個整數,將這些整數分爲 100 份,每份 100 個,生成Task結構,調用p.Invoke(task)處理。wg.Wait()等待處理完成,然後輸出ants正在運行的 goroutine 數量,這時應該是 0。

最後我們將結果彙總,並驗證一下結果,與直接相加得到的結果做一個比較:

var sum int
for _, task := range tasks {
  sum += task.sum
}

var expect int
for _, num := range nums {
  expect += num
}

fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)

運行:

$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172

確實,任務完成之後,正在運行的 goroutine 數量變爲 0。而且我們驗證了,結果沒有偏差。另外需要注意,goroutine 池中任務的執行順序是隨機的,與提交任務的先後沒有關係。由上面運行打印的任務標識我們也能發現這一點。

函數作爲任務

ants支持將一個不接受任何參數的函數作爲任務提交給 goroutine 運行。由於不接受參數,我們提交的函數要麼不需要外部數據,只需要處理自身邏輯,否則就必須用某種方式將需要的數據傳遞進去,例如閉包。

提交函數作爲任務的 goroutine 池使用ants.NewPool()創建,它只接受一個參數表示池子的容量。調用池子對象的Submit()方法來提交任務,將一個不接受任何參數的函數傳入。

最開始的例子可以改寫一下。增加一個任務包裝函數,將任務需要的參數作爲包裝函數的參數。包裝函數返回實際的任務函數,該任務函數就可以通過閉包訪問它需要的數據了:

type taskFunc func()

func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {
  return func() {
    for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {
      *sum += num
    }

    fmt.Printf("task:%d sum:%d\n", i+1, *sum)
    wg.Done()
  }
}

調用ants.NewPool(10)創建 goroutine 池,同樣池子用完需要釋放,這裏使用defer

p, _ := ants.NewPool(10)
defer p.Release()

生成模擬數據,切分任務。提交任務給ants池執行,這裏使用taskFuncWrapper()包裝函數生成具體的任務,然後調用p.Submit()提交:

nums := make([]int, DataSize, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  p.Submit(taskFuncWrapper(nums, i, &partSums[i]&wg))
}
wg.Wait()

彙總結果,驗證:

var sum int
for _, partSum := range partSums {
  sum += partSum
}

var expect int
for _, num := range nums {
  expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

這個程序的功能與最開始的完全相同。

執行流程

GitHub 倉庫中有個執行流程圖,我重新繪製了一下:

執行流程如下:

選項

ants提供了一些選項可以定製 goroutine 池的行爲。選項使用Options結構定義:

// src/github.com/panjf2000/ants/options.go
type Options struct {
  ExpiryDuration time.Duration
  PreAlloc bool
  MaxBlockingTasks int
  Nonblocking bool
  PanicHandler func(interface{})
  Logger Logger
}

各個選項含義如下:

NewPool()部分源碼:

if p.options.PreAlloc {
  if size == -1 {
    return nil, ErrInvalidPreAllocSize
  }
  p.workers = newWorkerArray(loopQueueType, size)
} else {
  p.workers = newWorkerArray(stackType, 0)
}

使用預分配時,創建loopQueueType類型的結構,反之創建stackType類型。這是ants定義的兩種管理worker的數據結構。

ants定義了一些With*函數來設置這些選項:

func WithOptions(options Options) Option {
  return func(opts *Options) {
    *opts = options
  }
}

func WithExpiryDuration(expiryDuration time.Duration) Option {
  return func(opts *Options) {
    opts.ExpiryDuration = expiryDuration
  }
}

func WithPreAlloc(preAlloc bool) Option {
  return func(opts *Options) {
    opts.PreAlloc = preAlloc
  }
}

func WithMaxBlockingTasks(maxBlockingTasks int) Option {
  return func(opts *Options) {
    opts.MaxBlockingTasks = maxBlockingTasks
  }
}

func WithNonblocking(nonblocking bool) Option {
  return func(opts *Options) {
    opts.Nonblocking = nonblocking
  }
}

func WithPanicHandler(panicHandler func(interface{})) Option {
  return func(opts *Options) {
    opts.PanicHandler = panicHandler
  }
}

func WithLogger(logger Logger) Option {
  return func(opts *Options) {
    opts.Logger = logger
  }
}

這裏使用了 Go 語言中非常常見的一種模式,我稱之爲選項模式,非常方便地構造有大量參數,且大部分有默認值或一般不需要顯式設置的對象。

我們來驗證幾個選項。

最大等待隊列長度

ants池設置容量之後,如果所有的 goroutine 都在處理任務。這時提交的任務默認會進入等待隊列,WithMaxBlockingTasks(maxBlockingTasks int)可以設置等待隊列的最大長度。超過這個長度,提交任務直接返回錯誤:

func wrapper(i int, wg *sync.WaitGroup) func() {
  return func() {
    fmt.Printf("hello from task:%d\n", i)
    time.Sleep(1 * time.Second)
    wg.Done()
  }
}

func main() {
  p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(8)
  for i := 1; i <= 8; i++ {
    go func(i int) {
      err := p.Submit(wrapper(i, &wg))
      if err != nil {
        fmt.Printf("task:%d err:%v\n", i, err)
        wg.Done()
      }
    }(i)
  }

  wg.Wait()
}

上面代碼中,我們設置 goroutine 池的容量爲 4,最大阻塞隊列長度爲 2。然後一個 for 提交 8 個任務,期望結果是:4 個任務在執行,2 個任務在等待,2 個任務提交失敗。運行結果:

hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2

我們看到提交任務失敗,打印too many goroutines blocked ...

代碼中有 4 點需要注意:

由於簡單起見,前面的例子中Submit()方法的返回值都被我們忽略了。實際開發中一定不要忽略。

非阻塞

ants池默認是阻塞的,我們可以使用WithNonblocking(nonblocking bool)設置其爲非阻塞。非阻塞的ants池中,在所有 goroutine 都在處理任務時,提交新任務會直接返回錯誤:

func main() {
  p, _ := ants.NewPool(2, ants.WithNonblocking(true))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 3; i++ {
    err := p.Submit(wrapper(i, &wg))
    if err != nil {
      fmt.Printf("task:%d err:%v\n", i, err)
      wg.Done()
    }
  }

  wg.Wait()
}

使用上個例子中的wrapper()函數,ants池容量設置爲 2。連續提交 3 個任務,期望結果前兩個任務正常執行,第 3 個任務提交時返回錯誤:

hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1

panic 處理器

一個魯棒性強的庫一定不會忽視錯誤的處理,特別是宕機相關的錯誤。在 Go 語言中就是 panic,也被稱爲運行時恐慌,在程序運行的過程中產生的嚴重性錯誤,例如索引越界,空指針解引用等,都會觸發 panic。如果不處理 panic,程序會直接意外退出,可能造成數據丟失的嚴重後果。

ants中如果 goroutine 在執行任務時發生panic,會終止當前任務的執行,將發生錯誤的堆棧輸出到os.Stderr注意,該 goroutine 還是會被放回池中,下次可以取出執行新的任務

func wrapper(i int, wg *sync.WaitGroup) func() {
  return func() {
    fmt.Printf("hello from task:%d\n", i)
    if i%2 == 0 {
      panic(fmt.Sprintf("panic from task:%d", i))
    }
    wg.Done()
  }
}

func main() {
  p, _ := ants.NewPool(2)
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 2; i++ {
    p.Submit(wrapper(i, &wg))
  }

  time.Sleep(1 * time.Second)
  p.Submit(wrapper(3, &wg))
  p.Submit(wrapper(5, &wg))
  wg.Wait()
}

我們讓偶數個任務觸發panic。提交兩個任務,第二個任務一定會觸發panic。觸發panic之後,我們還可以繼續提交任務 3、5。注意這裏沒有 4,提交任務 4 還是會觸發panic

上面的程序需要注意 2 點:

除了ants提供的默認 panic 處理器,我們還可以使用WithPanicHandler(paincHandler func(interface{}))指定我們自己編寫的 panic 處理器。處理器的參數就是傳給panic的值:

func panicHandler(err interface{}) {
  fmt.Fprintln(os.Stderr, err)
}

p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()

其餘代碼與上面的完全相同,指定了panicHandler後觸發panic就會執行它。運行:

hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3

看到輸出了傳給panic函數的字符串(第二行輸出)。

默認池

爲了方便使用,很多 Go 庫都喜歡提供其核心功能類型的一個默認實現。可以直接通過庫提供的接口調用。例如net/http,例如antsants庫中定義了一個默認的池,默認容量爲MaxInt32。goroutine 池的各個方法都可以直接通過ants包直接訪問:

// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)

func Submit(task func()) error {
  return defaultAntsPool.Submit(task)
}

func Running() int {
  return defaultAntsPool.Running()
}

func Cap() int {
  return defaultAntsPool.Cap()
}

func Free() int {
  return defaultAntsPool.Free()
}

func Release() {
  defaultAntsPool.Release()
}

func Reboot() {
  defaultAntsPool.Reboot()
}

直接使用:

func main() {
  defer ants.Release()

  var wg sync.WaitGroup
  wg.Add(2)
  for i := 1; i <= 2; i++ {
    ants.Submit(wrapper(i, &wg))
  }
  wg.Wait()
}

默認池也需要Release()

總結

本文介紹了 goroutine 池的由來,並藉由ants庫介紹了基本的使用方法,和一些細節。ants源碼不多,去掉測試的核心代碼只有 1k 行左右,建議有時間、感興趣的童鞋深入閱讀。

大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄

參考

  1. ants GitHub:github.com/valyala/ants

  2. Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

歡迎關注我的微信公衆號【GoUpUp】,共同學習,一起進步~

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ysG0q9LIYgWHIoY_LK-W9A