Golang 中的異步任務隊列
【導讀】本文介紹了 Go 移步任務隊列的實現。
在一些常見的場景中,如果遇到了某些請求特別耗時間,爲了不影響其它用戶的請求以及節約服務器資源,我們通常會考慮使用異步任務隊列去解決,這樣可以快速地處理請求、只返回給用戶任務創建結果,等待任務完成之後,我們再告知用戶任務的完成情況。
對於 Golang,我們可以通過 Worker pool 異步處理任務,在大多數情況下,如果不在意數據丟失以及服務器性能足夠,我們就沒有必要考慮別的方案,畢竟這樣實現非常簡單。
接下來我們先來說說如何用 Worker pool 解決異步任務的問題。
Worker pool
Worker pool,也有叫做 Goroutine pool 的,都是指用 Go channel 以及 Goroutine 來實現的任務隊列。Go channel 本質上就是一個隊列,因此用作任務隊列是很自然的。
在我們不用 Go channel 的時候,我們也許會使用這樣的方式來處理異步任務:
for i := 0;i < 100;i++ {
go func() {
// process job
}()
}
這樣的方式是不推薦的,因爲在請求量到達一定程度,系統處理不過來的時候,會造成 Goroutine 的爆炸,拖慢整個系統的運行,甚至程序的崩潰,數據也就完全丟失了。
如果我們用簡單的方式,可以看看接下來的例子:一個發送者(也叫做生產者),一個接受者(也叫做消費者,或者 Worker):
type Job struct {...}
jobChan := make(chan Job)
quit := make(chan bool)
go func() {
select {
case job := <-jobChan:
case <- quit:
return
}
}()
for i := 0;i < 100;i++ {
jobChan <- Job{...}
}
close(jobChan)
quit <- true
如果 Worker 不夠,我們可以增加,這樣可以並行處理任務:
for i := 0;i < 10;i++ {
go func() {
for job := range jobChan {
// process job
}
}()
}
這樣,一個非常簡單的 Worker pool 就完成了,只是,它對任務的處理還會有問題,比如無法設置超時、無法處理 panic 錯誤等。
實際上,目前已經有很多的開源庫可以幫你實現了,以 worker pool 爲關鍵詞在 GitHub 上可以搜到一大堆:
-
GitHub - Jeffail/tunny: A goroutine pool for Go
-
GitHub - gammazero/workerpool: Concurrency limiting goroutine pool
那麼,它們的缺點呢?
很明顯,它們的缺點就在於缺乏管理,可以說是完全不管任務的結果,即使我們加日誌輸出也只是爲了簡單監控,更要命的就是進程重啓的時候,比如進程掛了,或者程序更新,都會導致數據丟失,畢竟生產者與消費者在一個進程中的時候,會互相影響(搶佔 CPU 與內存資源)。因此前面我也說了,在不管這兩個問題的時候,可以考慮用。
如果數據很重要(實際上,我認爲用戶上傳的業務數據都重要,不能丟失),爲了解決這些問題,我們必須換一種解決方案。
分佈式異步任務隊列
接下來再說說異步的分佈式任務隊列,要用到這個工具的時候,我們大致有以下幾個需求:
-
分佈式:生產者與消費者隔離;
-
數據持久化:在程序重啓的時候,不丟失已有的數據;
-
任務重試:會有任務偶然失敗的場景,重試是最簡單的方式,但需要保證任務的執行時是冪等的;
-
任務延時:延遲執行,比如 5 分鐘後給用戶發紅包;
-
任務結果的臨時存儲,可用於儲存;
-
任務處理情況監控:及時發現任務執行出錯情況;
對於 Python 來說,有個大名鼎鼎的 Celery(https://github.com/celery/celery),它完全包含上面的功能。它包含兩個比較重要的組件:一個是消息隊列,比如 Redis/RabbitMQ 等,Celery 中叫做 Broker,然後還需要有數據庫,用於存儲任務狀態,叫做 Result Backend。
顯然對於 Go 也有很多不錯的開源庫,其中一個學 Celery 的是 Machinery(github.com/RichardKnop/machinery),它目前能滿足大部分需求,而且一直在積極維護,也是我們團隊目前在用的。
它目前支持的 Broker 有 AMQP(RabbitMQ)、Redis、AWS SQS、GCP Pub/Sub,目前對國內同行來說,RabbitMQ 或者 Redis 會相對比較合適。
另外它還支持幾個高級用法:
-
Groups:允許你定義多個並行的任務,在最後取任務結果的時候,可以一起返回;
-
Chords:允許你定義一個回調任務,在 Group 任務執行完畢後執行對應的回調任務;
-
Chains:允許你定義串行執行的任務,任務將會被串行執行;
說了優點,再說說它的缺點:
-
任務監控支持不夠,目前只有分佈式追蹤 opentracing 的支持,假如我要使用 prometheus,會比較困難,它的自定義錯誤處理過於簡單,連上下文都不給你;
-
傳入的參數目前只支持非常簡單的參數,不支持 struct、map,還得定義參數的類型,這樣的方式會將這個庫限制在 Golang 世界中,而無法拓展適用於其它語言;
P.S.
其實對於 Goroutine 的方案,在以下兩種情況下,可以考慮使用:
-
必須同步返回給用戶請求結果;
-
服務器資源足夠,僅僅用 Worker pool 就能降低請求的響應時長到可接受範圍;
這兩種方案都會返回請求結果,失敗的情況下靠客戶端重新請求來解決數據丟失的問題。
轉自:
blog.xizhibei.me/2019/07/15/asynchronous-task-queue-in-golang/
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ux4377znicI0kD_TmtXW0Q