基於 Redis 實現延時隊列服務

一、背景

在業務發展過程中,會出現一些需要延時處理的場景,比如:

  1. 訂單下單之後超過 30 分鐘用戶未支付,需要取消訂單

  2. 訂單一些評論,如果 48h 用戶未對商家評論,系統會自動產生一條默認評論

  3. 點我達訂單下單後,超過一定時間訂單未派出,需要超時取消訂單等。。

處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在數據量不大的場景下是完全沒問題,但是當數據量大的時候高頻的輪訓數據庫就會比較的耗資源,導致數據庫的慢查或者查詢超時。所以在處理這類需求時候,採用了延時隊列來完成。

二、幾種延時隊列

延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列:

1.Java 中 java.util.concurrent.DelayQueue 優點:JDK 自身實現,使用方便,量小適用
缺點:隊列消息處於 jvm 內存,不支持分佈式運行和消息持久化
2.Rocketmq 延時隊列
優點:消息持久化,分佈式
缺點:不支持任意時間精度,只支持特定 level 的延時消息
3.Rabbitmq 延時隊列(TTL+DLX 實現)
優點:消息持久化,分佈式
缺點:延時相同的消息必須扔在同一個隊列

根據自身業務和公司情況,如果實現一個自己的延時隊列服務需要考慮一下幾點:

三、 基於 Redis 實現

1.0 版本

每個延時消息必須包括以下參數:


注:上圖 1、2、3 或者 2、3 是一個事務操作
取出過期消息過程是通過一個外部定時任務每隔 1min 分鐘去查詢隊列中過期的消息,然後發送 mq && remove

2.0 版本

1.0 上有一個可改進的地方就是隊列中過期的消息是通過定時任務觸發查詢。所有有了 2.0
2.0 版本在 1.0 上做了一個優化,廢棄掉了 1min 定時任務觸發過期消息發送,採用了 java Lock await/singlal 方式實現過期消息的實時發送低延時

服務啓動會註冊 zk,獲取分配處理的 queues,啓動後臺線程監聽 zk 。
爲每個分配 queue 創建一個 pull job 。
pull job 首先會去 queue 中查詢是否有過期消息:
    Y:將取出消息交給 worker 處理
     N:查詢 queue 中最後一個成員(zset 結構默認按 score 遞增排序),如果爲空,則 await;不爲空則 await(成員 score-System.currentTimeMillis())

由於過期消息發送成功纔會從隊列中 remove,所以 pull job 會記錄上一次查詢隊列的一個 offset,每次獲取到過期消息會將 offset 向前偏移,過期消息交給 worker 處理,當 worker 由於某些異常原因處理失敗會重置 pull job 中 offset,這樣可以避免消息發送一次失敗之後沒辦法在繼續處理(除了新節點 add || remove 時候)。
當部署服務有新增,延時隊列服務會重新計算得到當前處理隊列,並將之前創建 pull job cancel,爲新處理隊列重新創建 pull job。刪除同理。

作者:Simple

來源:www.cnblogs.com/lylife/p/7881950.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/2U94GPOYGZqA95zps4afFA