Go 每日一庫之 tunny
簡介
之前寫過一篇文章介紹了ants
這個 goroutine 池實現。當時在網上查看相關資料的時候,發現了另外一個實現tunny
。趁着時間相近,正好研究一番。也好比較一下這兩個庫。那就讓我們開始吧。
快速開始
本文代碼使用 Go Modules。
創建目錄並初始化:
$ mkdir tunny && cd tunny
$ go mod init github.com/darjun/go-daily-lib/tunny
使用go get
從 GitHub 獲取tunny
庫:
$ go get -u github.com/Jeffail/tunny
爲了方便地和ants
做一個對比,我們將ants
中的示例重新用tunny
實現一遍:還是那個分段求和的例子:
const (
DataSize = 10000
DataPerTask = 100
)
func main() {
numCPUs := runtime.NumCPU()
p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var sum int
for _, n := range payload.([]int) {
sum += n
}
return sum
})
defer p.Close()
// ...
}
使用也非常簡單,首先創建一個Pool
,這裏使用tunny.NewFunc()
。
第一個參數爲池子大小,即同時有多少個 worker (也即 goroutine)在工作,這裏設置成邏輯 CPU 個數,對於 CPU 密集型任務,這個值設置太大無意義,反而有可能導致 goroutine 切換頻繁而降低性能。
第二個參數傳入一個func(interface{})interface{}
的參數作爲任務處理函數。後續傳入數據就會調用這個函數處理。
池子使用完需要關閉,這裏使用defer p.Close()
在程序退出前關閉。
然後,生成測試數據,還是 10000 個隨機數,分成 100 組:
nums := make([]int, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
處理每組數據:
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partialSums := make([]int, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
go func(i int) {
partialSums[i] = p.Process(nums[i*DataPerTask : (i+1)*DataPerTask]).(int)
wg.Done()
}(i)
}
wg.Wait()
調用p.Process()
方法,傳入任務數據,池子中會選擇空閒的 goroutine 來處理這個數據。由於我們上面設置了處理函數,goroutine 會直接調用該函數,將這個切片作爲參數傳入。
tunny
與ants
不同的是,tunny
的任務處理是同步的,即調用p.Process()
方法之後,當前 goroutine 會掛起,直到任務處理完成之後纔會被喚醒。由於是同步的,所以p.Process()
方法可以直接返回處理結果。這也是上面程序在分發任務的時候,啓動多個 goroutine 的原因。如果不是每個任務都啓動一個 goroutine,p.Process()
方法會一直等待任務完成,那麼後面的任務要等到前面的任務全部執行完之後才能執行。這樣就發揮不了併發的優勢了。
這裏注意一個小細節,我將for
循環變量作爲參數傳給 goroutine 函數了。如果不這樣做,所有 goroutine 都共用外層的i
,而且 goroutine 開始運行時,for
循環大概率已經結束了,這時i = DataSize/DataPerTask
,索引nums[i*DataPerTask : (i+1)*DataPerTask]
會越界觸發 panic。
最後統計數據,驗證結果:
var sum int
for _, s := range partialSums {
sum += s
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
運行:
$ go run main.go
finish all tasks, result is 5010172 expect:5010172
超時
默認情況下,p.Process()
會一直阻塞直到任務完成,即使當前沒有空閒 worker 也會阻塞。我們也可以使用帶超時的Process()
方法:ProcessTimed()
。傳入一個超時時間間隔,如果超過這個時間還沒有空閒 worker,或者任務還沒有處理完成,就會終止,並返回一個錯誤。
超時有 2 種情況:
-
等不到空閒的 worker:所有 worker 一直處理繁忙狀態,正在處理的任務比較耗時,無法短時間內完成;
-
任務本身比較耗時。
下面我們編寫一個計算斐波那契的函數,使用遞歸這種低效的實現方法:
func fib(n int) int {
if n <= 1 {
return 1
}
return fib(n-1) + fib(n-2)
}
我們先看任務比較耗時的情況,創建Pool
對象。爲了觀察更明顯,在處理函數中添加了time.Sleep()
語句:
p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
n := payload.(int)
result := fib(n)
time.Sleep(5 * time.Second)
return result
})
defer p.Close()
生成與池容量相等的任務數,調用p.ProcessTimed()
方法,設置超時爲 1s:
var wg sync.WaitGroup
wg.Add(numCPUs)
for i := 0; i < numCPUs; i++ {
go func(i int) {
n := rand.Intn(30)
result, err := p.ProcessTimed(n, time.Second)
nowStr := time.Now().Format("2006-01-02 15:04:05")
if err != nil {
fmt.Printf("[%s]task(%d) failed:%v\n", nowStr, i, err)
} else {
fmt.Printf("[%s]fib(%d) = %d\n", nowStr, n, result)
}
wg.Done()
}(i)
}
wg.Wait()
因爲處理函數中 sleep 5s,所以任務在執行過程中就超時了。運行:
$ go run main.go
[2021-06-10 16:36:26]task(7) failed:job request timed out
[2021-06-10 16:36:26]task(4) failed:job request timed out
[2021-06-10 16:36:26]task(1) failed:job request timed out
[2021-06-10 16:36:26]task(6) failed:job request timed out
[2021-06-10 16:36:26]task(5) failed:job request timed out
[2021-06-10 16:36:26]task(0) failed:job request timed out
[2021-06-10 16:36:26]task(3) failed:job request timed out
[2021-06-10 16:36:26]task(2) failed:job request timed out
都在同一秒中超時。
我們將任務數量翻倍,再將處理函數中的 sleep 改爲 990ms,保證前一批任務能順利完成,後續任務或者由於等不到空閒 worker,或者由於執行時間過長而超時返回。運行:
$ go run main.go
[2021-06-10 16:42:46]fib(11) = 144
[2021-06-10 16:42:46]fib(25) = 121393
[2021-06-10 16:42:46]fib(27) = 317811
[2021-06-10 16:42:46]fib(1) = 1
[2021-06-10 16:42:46]fib(18) = 4181
[2021-06-10 16:42:46]fib(29) = 832040
[2021-06-10 16:42:46]fib(17) = 2584
[2021-06-10 16:42:46]fib(20) = 10946
[2021-06-10 16:42:46]task(5) failed:job request timed out
[2021-06-10 16:42:46]task(14) failed:job request timed out
[2021-06-10 16:42:46]task(8) failed:job request timed out
[2021-06-10 16:42:46]task(7) failed:job request timed out
[2021-06-10 16:42:46]task(13) failed:job request timed out
[2021-06-10 16:42:46]task(12) failed:job request timed out
[2021-06-10 16:42:46]task(11) failed:job request timed out
[2021-06-10 16:42:46]task(6) failed:job request timed out
context
context 是協調 goroutine 的工具。tunny
支持帶context.Context
參數的方法:ProcessCtx()
。當前 context 狀態變爲Done
之後,任務也會停止執行。context 會由於超時、取消等原因切換爲Done
狀態。還是拿上面的例子:
go func(i int) {
n := rand.Intn(30)
ctx, cancel := context.WithCancel(context.Background())
if i%2 == 0 {
go func() {
time.Sleep(500 * time.Millisecond)
cancel()
}()
}
result, err := p.ProcessCtx(ctx, n)
if err != nil {
fmt.Printf("task(%d) failed:%v\n", i, err)
} else {
fmt.Printf("fib(%d) = %d\n", n, result)
}
wg.Done()
}(i)
其他代碼都一樣,我們調用p.ProcessCtx()
方法來執行任務。參數是一個可取消的Context
。對於序號爲偶數的任務,我們啓動一個 goroutine 在 500ms 之後cancel()
掉這個Context
。代碼運行結果如下:
$ go run main.go
task(4) failed:context canceled
task(6) failed:context canceled
task(0) failed:context canceled
task(2) failed:context canceled
fib(27) = 317811
fib(25) = 121393
fib(1) = 1
fib(18) = 4181
我們看到偶數序號的任務都被取消了。
源碼
tunny
的源碼更少,除去測試代碼和註釋,連 500 行都不到。那麼就一起來看一下吧。Pool
結構如下:
// src/github.com/Jeffail/tunny.go
type Pool struct {
queuedJobs int64
ctor func() Worker
workers []*workerWrapper
reqChan chan workRequest
workerMut sync.Mutex
}
Pool
結構中有一個ctor
字段,這是一個函數對象,用於返回一個實現Worker
接口的值:
type Worker interface {
Process(interface{}) interface{}
BlockUntilReady()
Interrupt()
Terminate()
}
這個接口不同的方法在任務執行的不同階段調用。最重要的當屬Process(interface{}) interface{}
方法了。這個就是執行任務的函數。tunny
提供New()
方法創建Pool
對象,這個方法需要我們自己構造ctor
函數對象,使用多有不便。tunny
提供了另外兩個默認實現closureWorker
和callbackWorker
:
type closureWorker struct {
processor func(interface{}) interface{}
}
func (w *closureWorker) Process(payload interface{}) interface{} {
return w.processor(payload)
}
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
type callbackWorker struct{}
func (w *callbackWorker) Process(payload interface{}) interface{} {
f, ok := payload.(func())
if !ok {
return ErrJobNotFunc
}
f()
return nil
}
func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt() {}
func (w *callbackWorker) Terminate() {}
tunny.NewFunc()
方法使用的就是closureWorker
:
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
return New(n, func() Worker {
return &closureWorker{
processor: f,
}
})
}
創建的closureWorker
直接將參數f
作爲任務處理函數。
tunny.NewCallback()
方法使用callbackWorker
:
func NewCallback(n int) *Pool {
return New(n, func() Worker {
return &callbackWorker{}
})
}
callbackWorker
結構中沒有處理函數,只能給它發送無參無返回值的函數對象作爲任務,它的Process()
方法就是執行這個函數。
創建Pool
對象後,都是調用它的SetSize()
方法,設置 worker 數量。在這個方法中會啓動相應數量的 goroutine:
func (p *Pool) SetSize(n int) {
p.workerMut.Lock()
defer p.workerMut.Unlock()
lWorkers := len(p.workers)
if lWorkers == n {
return
}
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// 停止過多的 worker
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}
// 等待 worker 停止
for i := n; i < lWorkers; i++ {
p.workers[i].join()
// -----------------
}
p.workers = p.workers[:n]
}
SetSize()
其實在擴容和縮容的時候也會調用。對於擴容,它會創建相應數量的 worker。對於縮容,它會將多餘的 worker 停掉。與ants
不同,tunny
的擴容縮容都是即時生效的。
代碼中,我用-----------------
標出來的地方我覺得有點問題。對於縮容,因爲底層的數組沒有變化,workers
切片長度縮小之後,數組中後面的元素實際上就訪問不到了,但是數組還持有它的引用,算是一種內存泄漏吧。所以穩妥起見最好加上p.workers[i] = nil
?
這裏創建的 worker 實際上是包裝了一層的workerWrapper
結構:
// src/github.com/Jeffail/worker.go
type workerWrapper struct {
worker Worker
interruptChan chan struct{}
reqChan chan<- workRequest
closeChan chan struct{}
closedChan chan struct{}
}
func newWorkerWrapper(
reqChan chan<- workRequest,
worker Worker,
) *workerWrapper {
w := workerWrapper{
worker: worker,
interruptChan: make(chan struct{}),
reqChan: reqChan,
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}
go w.run()
return &w
}
workerWrapper
結構創建之後會立刻調用run()
方法啓動一個 goroutine:
func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
for {
w.worker.BlockUntilReady()
select {
case w.reqChan <- workRequest{
jobChan: jobChan,
retChan: retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case <-w.closeChan:
return
}
}
}
每個 worker goroutine 都在嘗試向w.reqChan
通道中發送一個workRequest
結構數據,發送成功之後,從jobChan
中獲取任務數據,然後調用Worker.Process()
方法執行任務,最後將結果發送到retChan
通道中。這裏其實有好幾個交互。需要結合Process()
方法來看才更清晰:
func (p *Pool) Process(payload interface{}) interface{} {
request, open := <-p.reqChan
request.jobChan <- payload
payload, open = <-request.retChan
return payload
}
刪掉無相關的代碼,最後就是上面這樣。我們在調用池對象的Process()
方法時,嘗試從通道reqChan
中接收數據,然後將任務數據發送到jobChan
通道中,最後從retChan
通道中接收結果。與上面的run
流程結合來看,實際上在正常執行一個任務時,Pool
與workerWrapper
有 3 次交互。
觀察Pool
創建到workerWrapper
創建的流程,我們可以看到實際上Pool
結構中的reqChan
與workerWrapper
結構中的reqChan
是同一個通道。即workerWrapper
啓動後,會阻塞在向reqChan
通道發送數據上,直到調用了Pool
的Process*()
方法,從通道reqChan
取出數據。Process()
方法得到workRequest
會向它的jobChan
通道中發送任務數據。而workerWrapper.run()
方法成功發送數據到reqChan
之後就開始等待從jobChan
通道中接收數據,這時接收到Process()
方法發送過來的數據。開始執行w.worker.Process()
方法,然後向retChan
通道發送結果數據,Process()
方法在成功發送數據到jobChan
之後,就開始等待從retChan
通道中接收數據。接收成功之後,Process()
方法返回,workerWrapper.run()
繼續阻塞在w.reqChan <-
這條語句上,等待處理下一個任務。注意jobChan
和retChan
都是workerWrapper.run()
方法中創建的通道。
那麼超時是怎麼實現的呢?看方法ProcessTimed()
的實現:
func (p *Pool) ProcessTimed(
payload interface{},
timeout time.Duration,
) (interface{}, error) {
tout := time.NewTimer(timeout)
var request workRequest
select {
case request, open = <-p.reqChan:
case <-tout.C:
return nil, ErrJobTimedOut
}
select {
case request.jobChan <- payload:
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}
select {
case payload, open = <-request.retChan:
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}
tout.Stop()
return payload, nil
}
同樣地,刪除不相干的代碼。首先,創建一個timer
,超時時間由傳入參數指定。後面有 3 個select
語句:
-
等待從
p.reqChan
取數據,即等待有 worker 空閒; -
等待發送數據到
jobChan
,即等待 worker 從jobChan
取出任務數據; -
等待從
retChan
取數據,即等待 worker 將結果發送到retChan
。
第一種情況,如果超時了,說明 worker 都處於繁忙狀態,直接返回任務超時。後面兩種情況實際上是任務已經開始執行了,但是在規定的時間內沒有完成。這兩種情況,需要終止任務的執行。我們看到上面調用了workerRequest.interruptFunc()
方法,也就是workerWrapper.interrupt()
方法:
func (w *workerWrapper) interrupt() {
close(w.interruptChan)
w.worker.Interrupt()
}
這個方法就是簡單關閉了interrupteChan
通道,然後調用worker
對象的Interrupt()
方法,默認實現中這個方法都是空的。
interruptChan
通道關閉後,goroutine 中等待從jobChan
接收數據和等待向retChan
發送數據的操作都會取消:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
ProcessCtx()
實現也是類似的。
最後調用workerWrapper.stop()
會關閉closeChan
通道,這會導致workerWrapper.run()
方法中的for
循環跳出,進而執行defer
函數中的close(retChan)
和close(closedChan)
:
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
這裏需要關閉retChan
通道是爲了防止Process*()
方法在等待retChan
數據。
closedChan
通道關閉後,workerWrapper.join()
方法就返回了。
func (w *workerWrapper) join() {
<-w.closedChan
}
Worker
幾個方法的調用時機:
-
Process()
:執行任務時; -
Interrupt()
:任務因爲超時會被 context 取消時; -
BlockUntilReady()
:每次執行新任務前,可能需要準備一些資源; -
Terminate()
:workerWrapper.run()
中的 defer 函數中,即停止 worker 後。
這些時機在代碼中都能清晰地看到。
基於源碼,我畫了一個流程圖:
圖中省略了中斷的流程。
tunny
vs ants
tunny
設計的思路與ants
有較大的區別:
tunny
只支持同步的方式執行任務,雖然任務在另一個 goroutine 執行,但是提交任務的 goroutine 必須等待結果返回或超時。不能做其他事情。正是由於這一點,導致tunny
的設計稍微一點複雜,而且爲了支持超時和取消,設計了多個通道用於和執行任務的 goroutine 通信。一次任務執行的過程涉及多次通信,性能是有損失的。從另一方面說,同步的編程方式更符合人類的直覺。
ants
完全是異步的任務執行流程,相比tunny
性能是稍高一些的。但是也因爲它的異步特性,導致沒有任務超時、取消這些機制。而且如果需要收集結果,必須要自己編寫額外的代碼。
總結
本文介紹了另一個 goroutine 池的實現tunny
。它以同步的方式來處理任務,編寫代碼更加直觀,對任務的執行流程有更強的控制,如超時、取消等。當然實現也複雜一些。tunny
代碼不走 500 行,非常建議讀一讀。
大家如果發現好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue😄
參考
-
tunny GitHub:https://github.com/Jeffail/tunny
-
ants GitHub:github.com/panjf2000/ants
-
Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Qvh_-TI3Dglp3-ddqC_L1A