使用 Asynq 實現 Go 異步任務處理
1. 介紹
Asynq 是一個 Go 庫,用於對任務進行排隊並與工作人員異步處理它們。
它的工作原理:
-
客戶端將任務放入隊列
-
服務器從隊列中拉出任務併爲每個任務啓動一個工作 goroutine
-
多個工作人員同時處理任務
倉庫鏈接:https://github.com/hibiken/asynq)
2. 快速開始
2.1 準備工作
確保已安裝並運行了 redis
redis-server
安裝 asynq 軟件包
go get -u github.com/hibiken/asynq
創建項目 asynq_task,目錄結構:
2.2 Redis 連接項
Asynq 使用 Redis 作爲消息代理。
client.go 和 main.go 都需要連接到 Redis 進行寫入和讀取。
我們將使用 RedisClientOpt 指定如何連接到本地 Redis 實例。
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "",
DB: 2,
}
2.3 Task 任務
type Task struct {
// 一個簡單的字符串值,表示要執行的任務的類型.
typename string
// 有效載荷保存執行任務所需的數據,有效負載值必須是可序列化的.
payload []byte
// 保存任務的選項.
opts []Option
// 任務的結果編寫器.
w *ResultWriter
}
2.4 編寫程序
test_delivery.go : 一個封裝任務創建和任務處理的包
package test_delivery
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
"log"
)
const (
TypeEmailDelivery = "email:deliver"
)
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
if err != nil {
fmt.Println(err)
return nil, err
}
return asynq.NewTask(TypeEmailDelivery, payload), nil
}
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
var p EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
//邏輯處理start...
log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
return nil
}
client.go: 在應用程序代碼中,導入上述包並用於 Client 將任務放入隊列中。
package client
import (
"asynq_task/test_delivery"
"github.com/hibiken/asynq"
"log"
)
func EmailDeliveryTaskAdd() {
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "",
DB: 2,
})
defer client.Close()
task, err := test_delivery.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
main.go: 異步任務服務入口文件
接下來,啓動一個工作服務器以在後臺處理這些任務。要啓動後臺工作人員,使用 Server 並提供您 Handler 來處理任務。可以選擇使用 ServeMux 來創建處理程序,就像使用 net/httpHandler 一樣。
package main
import (
"asynq_task/test_delivery"
"github.com/hibiken/asynq"
"log"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "",
DB: 2,
},
asynq.Config{
// 每個進程併發執行的worker數量
Concurrency: 5,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
// See the godoc for other configuration options
},
)
mux := asynq.NewServeMux()
mux.HandleFunc(test_delivery.TypeEmailDelivery, test_delivery.HandleEmailDeliveryTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
Asynq 是一個 Go 庫 (https://github.com/hibiken/asynq),用於對任務進行排隊並與工作人員異步處理它們。用來分發異步任務
package main
import (
"asynq_task/test_delivery/client"
"time"
)
func main() {
for i := 0; i < 3; i++ {
client.EmailDeliveryTaskAdd()
time.Sleep(time.Second * 3)
}
}
5. 運行查看結果
-
首先,我們要先把異步任務啓動起來準備好接收,也就是啓動 cmd/main.go
-
啓動 test.go 文件向異步任務服務添加任務隊列
結果如下:
go run main.go
3. 細節
3.1 關於 asynq 的優雅退出
如果異步服務突然被暫停,正在執行的異步任務會 push 到隊列中,下次啓動的時候自動執行。
我們可以將一個異步任務中途 sleep 幾秒,發送一個異步任務,任務沒執行完中途停掉任務測試出結果:
再次啓動異步任務服務,發現這個任務被重新執行。
3.2 client 中 client.Enqueue 的使用
1) 立即處理任務
client.Enqueue(t1, time.Now())
2)延時處理任務, 兩小時後處理
client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))
3) 任務重試,最大重試次數爲 25 次。
client.Enqueue(task, asynq.MaxRetry(5))
4)確保任務的唯一性
4-1:使用TaskID
選項:自行生成唯一的任務 ID
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// Second task will fail, err is ErrTaskIDConflict (assuming that the first task didn't get processed yet)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
4-2:使用Unique
選項:讓 Asynq 爲任務創建唯一性鎖
err := c.Enqueue(t1, asynq.Unique(time.Hour))
另外,asynq 異步任務提供了命令行工具和 Asynqmon 用於監控和管理 Asynq 異步任務和隊列。WebUI 可以通過傳遞兩個標誌來啓用與 Prometheus 的集成。
作者:sweey_lff
原文鏈接:https://huaweicloud.csdn.net/637ef508df016f70ae4ca586.html?
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/qCnjHo0uleLDr4Sukh77zw