基於 Redis 實現延時隊列服務
一、背景
在業務發展過程中,會出現一些需要延時處理的場景,比如:
-
訂單下單之後超過 30 分鐘用戶未支付,需要取消訂單
-
訂單一些評論,如果 48h 用戶未對商家評論,系統會自動產生一條默認評論
-
點我達訂單下單後,超過一定時間訂單未派出,需要超時取消訂單等。。
處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在數據量不大的場景下是完全沒問題,但是當數據量大的時候高頻的輪訓數據庫就會比較的耗資源,導致數據庫的慢查或者查詢超時。所以在處理這類需求時候,採用了延時隊列來完成。
二、幾種延時隊列
延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列:
1.Java 中 java.util.concurrent.DelayQueue
優點:JDK 自身實現,使用方便,量小適用
缺點:隊列消息處於 jvm 內存,不支持分佈式運行和消息持久化
2.Rocketmq 延時隊列
優點:消息持久化,分佈式
缺點:不支持任意時間精度,只支持特定 level 的延時消息
3.Rabbitmq 延時隊列(TTL+DLX 實現)
優點:消息持久化,分佈式
缺點:延時相同的消息必須扔在同一個隊列
根據自身業務和公司情況,如果實現一個自己的延時隊列服務需要考慮一下幾點:
- 消息存儲
- 過期延時消息實時獲取
- 高可用性
三、 基於 Redis 實現
1.0 版本
-
功能特性
-
消息可靠性,消息持久化,消息至少被消費一次
-
實時性:存在一定的時間誤差(定時任務間隔)
-
支持指定消息 remove
-
高可用性
-
整體結構
- Messages Pool 所有的延時消息存放,結構爲 KV 結構,key 爲消息 ID,value 爲一個具體的 message(這裏選擇 Redis Hash 結構主要是因爲 hash 結構能存儲較大的數據量,數據較多時候會進行漸進式 rehash 擴容,並且對於 HSET 和 HGET 命令來說時間複雜度都是 O(1))
- Delayed Queue 是 16 個有序隊列(隊列支持水平擴展),結構爲 ZSET,value 爲 messages pool 中消息 ID,score 爲過期時間(分爲多個隊列是爲了提高掃描的速度)
- Timed Task 定時任務,負責掃描處理每個隊列過期消息
-
消息結構
每個延時消息必須包括以下參數:
-
tags:消息過期之後發送 mq 的 tags
-
keys:消息過期之後發送 mq 的 keys
-
body:消息過期之後發送 mq 的 body,提供給消費這做具體的消息處理
-
delayTime:延時發送時間(默認,delayTime、expectDate 有一個即可)
-
expectDate:期望發送時間
-
流程
注:上圖 1、2、3 或者 2、3 是一個事務操作
取出過期消息過程是通過一個外部定時任務每隔 1min 分鐘去查詢隊列中過期的消息,然後發送 mq && remove
2.0 版本
1.0 上有一個可改進的地方就是隊列中過期的消息是通過定時任務觸發查詢。所有有了 2.0
2.0 版本在 1.0 上做了一個優化,廢棄掉了 1min 定時任務觸發過期消息發送,採用了 java Lock await/singlal 方式實現過期消息的實時發送低延時
-
多節點部署結構:
- pull job:這裏分別爲每一個隊列創建了一個 pull job thread,功能很簡單,就是負責去隊列中拉取過期的消息數據(這裏保證一個隊列有且只有一個 pull job)
- worker:pull job 拉取到的過期消息會交給一個 worker thread 去處理,這樣的好處是處理過期的消息實時性更高(pull job 不必等去除過期消息全部處理完成在繼續去拉取新的過期數據)
- zookeeper coordinate:通過 zk 的操作來完成對隊列的重新分配工作,daemon thread 監聽 zk 節點的創建和刪除
- 主要流程:
服務啓動會註冊 zk,獲取分配處理的 queues,啓動後臺線程監聽 zk 。
爲每個分配 queue 創建一個 pull job 。
pull job 首先會去 queue 中查詢是否有過期消息:
Y:將取出消息交給 worker 處理
N:查詢 queue 中最後一個成員(zset 結構默認按 score 遞增排序),如果爲空,則 await;不爲空則 await(成員 score-System.currentTimeMillis())
由於過期消息發送成功纔會從隊列中 remove,所以 pull job 會記錄上一次查詢隊列的一個 offset,每次獲取到過期消息會將 offset 向前偏移,過期消息交給 worker 處理,當 worker 由於某些異常原因處理失敗會重置 pull job 中 offset,這樣可以避免消息發送一次失敗之後沒辦法在繼續處理(除了新節點 add || remove 時候)。
當部署服務有新增,延時隊列服務會重新計算得到當前處理隊列,並將之前創建 pull job cancel,爲新處理隊列重新創建 pull job。刪除同理。
作者:Simple
來源:www.cnblogs.com/lylife/p/7881950.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/2U94GPOYGZqA95zps4afFA