Go 每日一庫之 ants
簡介
處理大量併發是 Go 語言的一大優勢。語言內置了方便的併發語法,可以非常方便的創建很多個輕量級的 goroutine 併發處理任務。相比於創建多個線程,goroutine 更輕量、資源佔用更少、切換速度更快、無線程上下文切換開銷更少。但是受限於資源總量,系統中能夠創建的 goroutine 數量也是受限的。默認每個 goroutine 佔用 8KB 內存,一臺 8GB 內存的機器滿打滿算也只能創建 8GB/8KB = 1000000 個 goroutine,更何況系統還需要保留一部分內存運行日常管理任務,go 運行時需要內存運行 gc、處理 goroutine 切換等。使用的內存超過機器內存容量,系統會使用交換區(swap),導致性能急速下降。我們可以簡單驗證一下創建過多 goroutine 會發生什麼:
func main() {
var wg sync.WaitGroup
wg.Add(10000000)
for i := 0; i < 10000000; i++ {
go func() {
time.Sleep(1 * time.Minute)
}()
}
wg.Wait()
}
在我的機器上(8G 內存)運行上面的程序會報errno 1455
,即Out of Memory
錯誤,這很好理解。謹慎運行。
另一方面,goroutine 的管理也是一個問題。goroutine 只能自己運行結束,外部沒有任何手段可以強制 j 結束一個 goroutine。如果一個 goroutine 因爲某種原因沒有自行結束,就會出現 goroutine 泄露。此外,頻繁創建 goroutine 也是一個開銷。
鑑於上述原因,自然出現了與線程池一樣的需求,即 goroutine 池。一般的 goroutine 池自動管理 goroutine 的生命週期,可以按需創建,動態縮容。向 goroutine 池提交一個任務,goroutine 池會自動安排某個 goroutine 來處理。
ants
就是其中一個實現 goroutine 池的庫。
快速使用
本文代碼使用 Go Modules。
創建目錄並初始化:
$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants
安裝ants
庫,使用v2
版本:
$ go get -u github.com/panjf2000/ants/v2
我們接下來要實現一個計算大量整數和的程序。首先創建基礎的任務結構,並實現其執行任務方法:
type Task struct {
index int
nums []int
sum int
wg *sync.WaitGroup
}
func (t *Task) Do() {
for _, num := range t.nums {
t.sum += num
}
t.wg.Done()
}
很簡單,就是將一個切片中的所有整數相加。
然後我們創建 goroutine 池,注意池使用完後需要手動關閉,這裏使用defer
關閉:
p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()
func taskFunc(data interface{}) {
task := data.(*Task)
task.Do()
fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}
上面調用了ants.NewPoolWithFunc()
創建了一個 goroutine 池。第一個參數是池容量,即池中最多有 10 個 goroutine。第二個參數爲每次執行任務的函數。當我們調用p.Invoke(data)
的時候,ants
池會在其管理的 goroutine 中找出一個空閒的,讓它執行函數taskFunc
,並將data
作爲參數。
接着,我們模擬數據,做數據切分,生成任務,交給 ants
處理:
const (
DataSize = 10000
DataPerTask = 100
)
nums := make([]int, DataSize, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
task := &Task{
index: i + 1,
nums: nums[i*DataPerTask : (i+1)*DataPerTask],
wg: &wg,
}
tasks = append(tasks, task)
p.Invoke(task)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
隨機生成 10000 個整數,將這些整數分爲 100 份,每份 100 個,生成Task
結構,調用p.Invoke(task)
處理。wg.Wait()
等待處理完成,然後輸出ants
正在運行的 goroutine 數量,這時應該是 0。
最後我們將結果彙總,並驗證一下結果,與直接相加得到的結果做一個比較:
var sum int
for _, task := range tasks {
sum += task.sum
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
運行:
$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172
確實,任務完成之後,正在運行的 goroutine 數量變爲 0。而且我們驗證了,結果沒有偏差。另外需要注意,goroutine 池中任務的執行順序是隨機的,與提交任務的先後沒有關係。由上面運行打印的任務標識我們也能發現這一點。
函數作爲任務
ants
支持將一個不接受任何參數的函數作爲任務提交給 goroutine 運行。由於不接受參數,我們提交的函數要麼不需要外部數據,只需要處理自身邏輯,否則就必須用某種方式將需要的數據傳遞進去,例如閉包。
提交函數作爲任務的 goroutine 池使用ants.NewPool()
創建,它只接受一個參數表示池子的容量。調用池子對象的Submit()
方法來提交任務,將一個不接受任何參數的函數傳入。
最開始的例子可以改寫一下。增加一個任務包裝函數,將任務需要的參數作爲包裝函數的參數。包裝函數返回實際的任務函數,該任務函數就可以通過閉包訪問它需要的數據了:
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {
return func() {
for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {
*sum += num
}
fmt.Printf("task:%d sum:%d\n", i+1, *sum)
wg.Done()
}
}
調用ants.NewPool(10)
創建 goroutine 池,同樣池子用完需要釋放,這裏使用defer
:
p, _ := ants.NewPool(10)
defer p.Release()
生成模擬數據,切分任務。提交任務給ants
池執行,這裏使用taskFuncWrapper()
包裝函數生成具體的任務,然後調用p.Submit()
提交:
nums := make([]int, DataSize, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()
彙總結果,驗證:
var sum int
for _, partSum := range partSums {
sum += partSum
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)
這個程序的功能與最開始的完全相同。
執行流程
GitHub 倉庫中有個執行流程圖,我重新繪製了一下:
執行流程如下:
-
初始化 goroutine 池;
-
提交任務給 goroutine 池,檢查是否有空閒的 goroutine:
-
已到上限,檢查 goroutine 池是否是非阻塞的:
-
未到上限,創建一個新的 goroutine 處理任務
-
非阻塞,直接返回
nil
表示執行失敗 -
阻塞,等待 goroutine 空閒
-
有,獲取空閒 goroutine
-
無,檢查池中的 goroutine 數量是否已到池容量上限:
-
任務處理完成,將 goroutine 交還給池,以待處理下一個任務
選項
ants
提供了一些選項可以定製 goroutine 池的行爲。選項使用Options
結構定義:
// src/github.com/panjf2000/ants/options.go
type Options struct {
ExpiryDuration time.Duration
PreAlloc bool
MaxBlockingTasks int
Nonblocking bool
PanicHandler func(interface{})
Logger Logger
}
各個選項含義如下:
-
ExpiryDuration
:過期時間。表示 goroutine 空閒多長時間之後會被ants
池回收 -
PreAlloc
:預分配。調用NewPool()/NewPoolWithFunc()
之後預分配worker
(管理一個工作 goroutine 的結構體)切片。而且使用預分配與否會直接影響池中管理worker
的結構。見下面源碼 -
MaxBlockingTasks
:最大阻塞任務數量。即池中 goroutine 數量已到池容量,且所有 goroutine 都處理繁忙狀態,這時到來的任務會在阻塞列表等待。這個選項設置的是列表的最大長度。阻塞的任務數量達到這個值後,後續任務提交直接返回失敗 -
Nonblocking
:池是否阻塞,默認阻塞。提交任務時,如果ants
池中 goroutine 已到上限且全部繁忙,阻塞的池會將任務添加的阻塞列表等待(當然受限於阻塞列表長度,見上一個選項)。非阻塞的池直接返回失敗 -
PanicHandler
:panic 處理。遇到 panic 會調用這裏設置的處理函數 -
Logger
:指定日誌記錄器
NewPool()
部分源碼:
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
}
使用預分配時,創建loopQueueType
類型的結構,反之創建stackType
類型。這是ants
定義的兩種管理worker
的數據結構。
ants
定義了一些With*
函數來設置這些選項:
func WithOptions(options Options) Option {
return func(opts *Options) {
*opts = options
}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {
return func(opts *Options) {
opts.ExpiryDuration = expiryDuration
}
}
func WithPreAlloc(preAlloc bool) Option {
return func(opts *Options) {
opts.PreAlloc = preAlloc
}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {
return func(opts *Options) {
opts.MaxBlockingTasks = maxBlockingTasks
}
}
func WithNonblocking(nonblocking bool) Option {
return func(opts *Options) {
opts.Nonblocking = nonblocking
}
}
func WithPanicHandler(panicHandler func(interface{})) Option {
return func(opts *Options) {
opts.PanicHandler = panicHandler
}
}
func WithLogger(logger Logger) Option {
return func(opts *Options) {
opts.Logger = logger
}
}
這裏使用了 Go 語言中非常常見的一種模式,我稱之爲選項模式,非常方便地構造有大量參數,且大部分有默認值或一般不需要顯式設置的對象。
我們來驗證幾個選項。
最大等待隊列長度
ants
池設置容量之後,如果所有的 goroutine 都在處理任務。這時提交的任務默認會進入等待隊列,WithMaxBlockingTasks(maxBlockingTasks int)
可以設置等待隊列的最大長度。超過這個長度,提交任務直接返回錯誤:
func wrapper(i int, wg *sync.WaitGroup) func() {
return func() {
fmt.Printf("hello from task:%d\n", i)
time.Sleep(1 * time.Second)
wg.Done()
}
}
func main() {
p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
defer p.Release()
var wg sync.WaitGroup
wg.Add(8)
for i := 1; i <= 8; i++ {
go func(i int) {
err := p.Submit(wrapper(i, &wg))
if err != nil {
fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()
}
}(i)
}
wg.Wait()
}
上面代碼中,我們設置 goroutine 池的容量爲 4,最大阻塞隊列長度爲 2。然後一個 for 提交 8 個任務,期望結果是:4 個任務在執行,2 個任務在等待,2 個任務提交失敗。運行結果:
hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2
我們看到提交任務失敗,打印too many goroutines blocked ...
。
代碼中有 4 點需要注意:
-
提交任務必須並行進行。如果是串行提交,第 5 個任務提交時由於池中沒有空閒的 goroutine 處理該任務,
Submit()
方法會被阻塞,後續任務就都不能提交了。也就達不到驗證的目的了 -
由於任務可能提交失敗,失敗的任務不會實際執行,所以實際上
wg.Done()
次數會小於 8。因而在err != nil
分支中我們需要調用一次wg.Done()
。否則wg.Wait()
會永遠阻塞 -
爲了避免任務執行過快,空出了 goroutine,觀察不到現象,每個任務中我使用
time.Sleep(1 * time.Second)
休眠 1s -
由於 goroutine 之間的執行順序未顯式同步,故每次執行的順序不確定
由於簡單起見,前面的例子中Submit()
方法的返回值都被我們忽略了。實際開發中一定不要忽略。
非阻塞
ants
池默認是阻塞的,我們可以使用WithNonblocking(nonblocking bool)
設置其爲非阻塞。非阻塞的ants
池中,在所有 goroutine 都在處理任務時,提交新任務會直接返回錯誤:
func main() {
p, _ := ants.NewPool(2, ants.WithNonblocking(true))
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 3; i++ {
err := p.Submit(wrapper(i, &wg))
if err != nil {
fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()
}
}
wg.Wait()
}
使用上個例子中的wrapper()
函數,ants
池容量設置爲 2。連續提交 3 個任務,期望結果前兩個任務正常執行,第 3 個任務提交時返回錯誤:
hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
panic 處理器
一個魯棒性強的庫一定不會忽視錯誤的處理,特別是宕機相關的錯誤。在 Go 語言中就是 panic,也被稱爲運行時恐慌,在程序運行的過程中產生的嚴重性錯誤,例如索引越界,空指針解引用等,都會觸發 panic。如果不處理 panic,程序會直接意外退出,可能造成數據丟失的嚴重後果。
ants
中如果 goroutine 在執行任務時發生panic
,會終止當前任務的執行,將發生錯誤的堆棧輸出到os.Stderr
。注意,該 goroutine 還是會被放回池中,下次可以取出執行新的任務。
func wrapper(i int, wg *sync.WaitGroup) func() {
return func() {
fmt.Printf("hello from task:%d\n", i)
if i%2 == 0 {
panic(fmt.Sprintf("panic from task:%d", i))
}
wg.Done()
}
}
func main() {
p, _ := ants.NewPool(2)
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 2; i++ {
p.Submit(wrapper(i, &wg))
}
time.Sleep(1 * time.Second)
p.Submit(wrapper(3, &wg))
p.Submit(wrapper(5, &wg))
wg.Wait()
}
我們讓偶數個任務觸發panic
。提交兩個任務,第二個任務一定會觸發panic
。觸發panic
之後,我們還可以繼續提交任務 3、5。注意這裏沒有 4,提交任務 4 還是會觸發panic
。
上面的程序需要注意 2 點:
-
任務函數中
wg.Done()
是在panic
方法之後,如果觸發了panic
,函數中的其他正常邏輯就不會再繼續執行了。所以我們雖然wg.Add(3)
,但是一共提交了 4 個任務,其中一個任務觸發了panic
,wg.Done()
沒有正確執行。實際開發中,我們一般使用defer
語句來確保wg.Done()
一定會執行 -
在 for 循環之後,我添加了一行代碼
time.Sleep(1 * time.Second)
。如果沒有這一行,後續的兩條Submit()
方法可以直接執行,可能會導致任務很快就完成了,wg.Wait()
直接返回了,這時panic
的堆棧還沒有輸出。你可以嘗試註釋掉這行代碼運行看看結果
除了ants
提供的默認 panic 處理器,我們還可以使用WithPanicHandler(paincHandler func(interface{}))
指定我們自己編寫的 panic 處理器。處理器的參數就是傳給panic
的值:
func panicHandler(err interface{}) {
fmt.Fprintln(os.Stderr, err)
}
p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()
其餘代碼與上面的完全相同,指定了panicHandler
後觸發panic
就會執行它。運行:
hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3
看到輸出了傳給panic
函數的字符串(第二行輸出)。
默認池
爲了方便使用,很多 Go 庫都喜歡提供其核心功能類型的一個默認實現。可以直接通過庫提供的接口調用。例如net/http
,例如ants
。ants
庫中定義了一個默認的池,默認容量爲MaxInt32
。goroutine 池的各個方法都可以直接通過ants
包直接訪問:
// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
func Submit(task func()) error {
return defaultAntsPool.Submit(task)
}
func Running() int {
return defaultAntsPool.Running()
}
func Cap() int {
return defaultAntsPool.Cap()
}
func Free() int {
return defaultAntsPool.Free()
}
func Release() {
defaultAntsPool.Release()
}
func Reboot() {
defaultAntsPool.Reboot()
}
直接使用:
func main() {
defer ants.Release()
var wg sync.WaitGroup
wg.Add(2)
for i := 1; i <= 2; i++ {
ants.Submit(wrapper(i, &wg))
}
wg.Wait()
}
默認池也需要Release()
。
總結
本文介紹了 goroutine 池的由來,並藉由ants
庫介紹了基本的使用方法,和一些細節。ants
源碼不多,去掉測試的核心代碼只有 1k 行左右,建議有時間、感興趣的童鞋深入閱讀。
大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
參考
-
ants GitHub:github.com/valyala/ants
-
Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
我
我的博客:https://darjun.github.io
歡迎關注我的微信公衆號【GoUpUp】,共同學習,一起進步~
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ysG0q9LIYgWHIoY_LK-W9A