在 go 語言裏用 Redis 如何實現延時任務隊列,如何延時?
在 Go 語言中,使用 Redis 實現延時任務隊列可以通過 Sorted Set(ZSet) 結合 輪詢機制 或 Pub/Sub 實現。
以下是詳細實現方案,以及延時任務與定時任務的區別和應用場景分析。
一、延時任務隊列的實現(Go + Redis)
核心思路
-
存儲結構:使用 Redis 的 Sorted Set(ZSet),以任務的 執行時間戳 作爲分數(Score),任務內容作爲成員(Member)。
-
消費者邏輯:消費者定期掃描 ZSet 中到期的任務(分數小於當前時間戳),取出並執行。
-
原子性操作:通過
ZRANGEBYSCORE和ZREM的原子操作(Lua 腳本)保證任務只被消費一次。
代碼實現(Go + go-redis)
1. 生產者(添加延時任務)
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
var rdb *redis.Client
func init() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
}
// AddDelayTask 添加延時任務
func AddDelayTask(ctx context.Context, taskID string, delay time.Duration) error {
// 計算執行時間戳(當前時間 + 延時時間)
executeTime := time.Now().Add(delay).Unix()
// 添加到 ZSet
return rdb.ZAdd(ctx, "delay_tasks", &redis.Z{
Score: float64(executeTime),
Member: taskID,
}).Err()
}
func main() {
ctx := context.Background()
// 添加一個 10 秒後執行的任務
err := AddDelayTask(ctx, "task_123", 10*time.Second)
if err != nil {
fmt.Println("添加任務失敗:", err)
}
}
2. 消費者(處理到期任務)
// 消費者邏輯
func StartConsumer(ctx context.Context) {
for {
// 獲取當前時間戳
now := time.Now().Unix()
// 查詢所有到期的任務(分數 <= 當前時間)
tasks, err := rdb.ZRangeByScore(ctx, "delay_tasks", &redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%d", now),
Offset: 0,
Count: 10, // 一次最多取10個任務
}).Result()
if err != nil {
fmt.Println("查詢任務失敗:", err)
continue
}
if len(tasks) == 0 {
// 無任務,休眠1秒
time.Sleep(1 * time.Second)
continue
}
// 使用 Lua 腳本原子化移除任務並返回任務列表
luaScript := `
local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1])
if #tasks > 0 then
redis.call('ZREM', KEYS[1], unpack(tasks))
end
return tasks
`
keys := []string{"delay_tasks"}
vals := []interface{}{now}
res, err := rdb.Eval(ctx, luaScript, keys, vals).Result()
if err != nil {
fmt.Println("執行Lua腳本失敗:", err)
continue
}
// 處理任務
if res != nil {
taskIDs := res.([]interface{})
for _, taskID := range taskIDs {
fmt.Printf("執行任務: %s\n", taskID)
// TODO: 執行具體任務邏輯
}
}
}
}
func main() {
ctx := context.Background()
go StartConsumer(ctx)
select {} // 阻塞主協程
}
關鍵優化點
-
Lua 腳本原子操作:通過 Lua 腳本保證查詢和移除任務的原子性,避免多個消費者重複消費。
-
批量處理:每次最多取 10 個任務,減少 Redis 交互次數。
-
休眠機制:無任務時休眠,避免高頻輪詢浪費資源。
二、延時任務 vs. 定時任務
1. 延時任務
-
定義:任務在指定 延遲時間後執行一次(如 30 分鐘後關閉未支付訂單)。
-
實現:通過 Redis ZSet 或消息隊列(如 RabbitMQ 的死信隊列)。
-
場景:
-
訂單超時關閉。
-
異步任務重試(失敗後延時重試)。
-
消息延遲推送(如用戶註冊後 5 分鐘發送提醒郵件)。
2. 定時任務
-
定義:任務在 固定時間點週期性執行(如每天凌晨統計報表)。
-
實現:通過定時任務框架(如
cron、go-scheduler)或時間輪算法。 -
場景:
-
每日數據統計。
-
定時清理日誌。
-
週期性的緩存刷新。
對比總結
三、業務場景選擇
1. 何時用延時任務?
-
單次延遲觸發:任務只需在未來的某個時間點執行一次。
-
動態時間調整:任務的執行時間可能動態變化(如用戶續期訂單後重置超時時間)。
-
異步解耦:避免阻塞主流程,將耗時操作延後處理。
2. 何時用定時任務?
-
週期性執行:任務需要按固定規則重複執行(如每小時、每天)。
-
無狀態觸發:任務的執行不依賴外部事件,僅依賴時間規則。
-
批處理操作:如批量生成報表、批量清理數據。
四、擴展方案
1. 高可用優化
-
多消費者負載均衡:啓動多個消費者協程,避免單點故障。
-
任務持久化:Redis 開啓 AOF 持久化,防止宕機丟失任務。
2. 高級方案
-
Redis Stream:利用 Redis 5.0+ 的 Stream 結構實現更可靠的消息隊列。
-
分佈式鎖:使用
SETNX或RedLock確保任務處理的冪等性。
3. 結合 Go 特性
-
協程池:使用
ants等庫管理協程池,控制併發量。 -
Context 超時:爲任務執行添加超時控制,避免長時間阻塞。
五、總結
-
延時任務隊列:適合單次延遲觸發的場景,通過 Redis ZSet + Lua 腳本實現高效可靠的任務管理。
-
定時任務:適合週期性任務,直接使用
cron或調度框架更簡單。 -
選型建議:根據觸發規則(單次 vs 週期)和業務複雜度選擇方案。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/ie5b0v5maLVx7FI2-22MDQ