在 go 語言裏用 Redis 如何實現延時任務隊列,如何延時?

在 Go 語言中,使用 Redis 實現延時任務隊列可以通過 Sorted Set(ZSet) 結合 輪詢機制Pub/Sub 實現。

以下是詳細實現方案,以及延時任務與定時任務的區別和應用場景分析。

一、延時任務隊列的實現(Go + Redis)

核心思路

  1. 存儲結構:使用 Redis 的 Sorted Set(ZSet),以任務的 執行時間戳 作爲分數(Score),任務內容作爲成員(Member)。

  2. 消費者邏輯:消費者定期掃描 ZSet 中到期的任務(分數小於當前時間戳),取出並執行。

  3. 原子性操作:通過 ZRANGEBYSCOREZREM 的原子操作(Lua 腳本)保證任務只被消費一次。

代碼實現(Go + go-redis)

1. 生產者(添加延時任務)
package main



import (

	"context"

	"fmt"

	"time"

	"github.com/go-redis/redis/v8"

)



var rdb *redis.Client



func init() {

	rdb = redis.NewClient(&redis.Options{

		Addr:     "localhost:6379",

		Password: "",

		DB:       0,

	})

}



// AddDelayTask 添加延時任務

func AddDelayTask(ctx context.Context, taskID string, delay time.Duration) error {

	// 計算執行時間戳(當前時間 + 延時時間)

	executeTime := time.Now().Add(delay).Unix()

	// 添加到 ZSet

	return rdb.ZAdd(ctx, "delay_tasks", &redis.Z{

		Score:  float64(executeTime),

		Member: taskID,

	}).Err()

}



func main() {

	ctx := context.Background()

	// 添加一個 10 秒後執行的任務

	err := AddDelayTask(ctx, "task_123", 10*time.Second)

	if err != nil {

		fmt.Println("添加任務失敗:", err)

	}

}
2. 消費者(處理到期任務)
// 消費者邏輯

func StartConsumer(ctx context.Context) {

	for {

		// 獲取當前時間戳

		now := time.Now().Unix()

		// 查詢所有到期的任務(分數 <= 當前時間)

		tasks, err := rdb.ZRangeByScore(ctx, "delay_tasks", &redis.ZRangeBy{

			Min:    "0",

			Max:    fmt.Sprintf("%d", now),

			Offset: 0,

			Count:  10, // 一次最多取10個任務

		}).Result()



		if err != nil {

			fmt.Println("查詢任務失敗:", err)

			continue

		}



		if len(tasks) == 0 {

			// 無任務,休眠1秒

			time.Sleep(1 * time.Second)

			continue

		}



		// 使用 Lua 腳本原子化移除任務並返回任務列表

		luaScript := `

			local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1])

			if #tasks > 0 then

				redis.call('ZREM', KEYS[1], unpack(tasks))

			end

			return tasks

		`

		keys := []string{"delay_tasks"}

		vals := []interface{}{now}



		res, err := rdb.Eval(ctx, luaScript, keys, vals).Result()

		if err != nil {

			fmt.Println("執行Lua腳本失敗:", err)

			continue

		}



		// 處理任務

		if res != nil {

			taskIDs := res.([]interface{})

			for _, taskID := range taskIDs {

				fmt.Printf("執行任務: %s\n", taskID)

				// TODO: 執行具體任務邏輯

			}

		}

	}

}



func main() {

	ctx := context.Background()

	go StartConsumer(ctx)

	select {} // 阻塞主協程

}

關鍵優化點

二、延時任務 vs. 定時任務

1. 延時任務

2. 定時任務

對比總結

三、業務場景選擇

1. 何時用延時任務?

2. 何時用定時任務?

四、擴展方案

1. 高可用優化

2. 高級方案

3. 結合 Go 特性

五、總結

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ie5b0v5maLVx7FI2-22MDQ