如何設計一個合適的延時隊列
項目背景
延遲隊列,它是一種帶有延遲功能的消息隊列,目前工作中有幾處需延時處理的應用場景。
可選技術參考
kafka
考慮前提:由於項目代碼與業務方交互大多采用 kafka,所以想是否能自己集成一個 kafka 延遲隊列,直接提供延遲功能,更方便使用。
大致思路:借鑑 rocketMQ 延遲隊列設計思想,創建多個 topic 用於處理不同的延遲消息,例如延遲一分鐘的任務消息,讓 topic 爲 delay-minutes-1 進行處理。
-
發送延遲消息時不直接發到目標 topic,而是發到一個用於處理延遲消息的 topic,例如 delay-minutes-1
-
寫一段代碼定時拉取 delay-minutes-1 中的消息,將滿足的消息發到真正的目標主題裏。
流程圖:
解決問題:如何讓延遲消息等待一段時間才發送到真正的 topic 裏面?
答:KafkaConsumer 提供了暫停和恢復的 API 函數,當消費者發現不滿足消費時間條件時,可以先暫停消費者,並把消費偏移量移動到上次位置,進行等待下次消費。
缺點:kafka 內部改造複雜度較高,由於要使 consumer 進行 pause,還需要額外的做一些健康檢查操作,在狀態不對時可以報警或者重啓。另外,不支持靈活設置延時時間。
rocketMQ
考慮前提:底層代碼已經全部封裝好,直接使用,不用關心底層代碼,可以實現與業務進行解藕。
大致原理思路:
-
RocketMQ 將延時隊列的延時時間分爲 18 個級別,在發送 MQ 消息的時候只需要設置 delayLevel,把每種延遲時間段的消息放到同一個隊列中
-
通過一個定時器進行輪詢這些隊列,查看消息是否到期
流程圖:
缺點:
-
使用中間件,儘可能的需要熟讀底層源碼,以便後續出現問題,快速跟蹤定位。還有能找到適合的擴展點。
-
定時器採用的 timer 是單線程運行,如果延遲消息數量很大的話,可能造成消息到期也沒有發送出去的情況。
redis
考慮前提: Redisson 延時隊列,代碼 redis 已經封裝好,可以直接拿來用。redisson.getBlockingQueue() 和 Redission.getDelayQueue()
大致原理思路: https://zhuanlan.zhihu.com/p/343811173
三個核心集合結構:
延時隊列:數據入隊的隊列
目標 blocking 隊列 :到期數據待 consume
timeoutSet 過期時間 zset:分數值爲 timeout,輔佐判斷元素是否過期。
實現 Timer :
運用了 redis 的 sub/pub 功能,當有數據 put 的時候,先把它放到一個 zset 集合,同時發佈訂閱的 key,發佈內容爲數據到期的 timeout,此時客戶端開啓了一個延時任務(HashedWheelTimer),到了時間,從 zset 分頁取出到期了的數據,放入 blocking 隊列中。
缺點:
-
採用 sub/pub 機制的時候,可能會造成多個客戶端同時開啓一個時間段的延時任務,重複執行,也會有併發的安全問題,因爲涉及的要數據加入阻塞隊列,和將當前數據從 zset 移除操作。
-
默認是數據量小的時候比較穩定,數據量一大就需要構建 cluster 模式,這一塊需要自己開發
基於 Redisson 方案進行改造思路
有讚的延時隊列
https://tech.youzan.com/queuing_delay/
實現邏輯圖
各個組件含義:
job:需要異步處理的任務,是最基本單元,其中屬性包含,自定義唯一 jobid,topic 任務類型,delayTime 任務執行時間,ttrtime 執行超時時間,message 具體消息內容。
job pool :用來存放 Job 的原信息,是個 map 結構
Delay Bucket :一組以時間爲維度的有序隊列(這裏只存放 job Id),bucket 的數據結構就是 redis 的 zset,將其分爲多個 bucket 是爲了提高掃描速度,降低消息延遲
Timer: 實時掃描各個 Bucket,並將 delay 時間小於等於當前時間的 job 放入到對應的 Ready Queue。
自己實現中,此處的 Ready Queue 替換一個共同的 kafka topic 出口:存放處於 Ready 狀態的 Job,以供客戶端消費程序消費。timer 到時間直接發送到 kafka
對比 Redisson 改動點
-
去除原有 redisson 延時隊列 sub/pub 實現 timer 思路,採用輪詢 zset 頭部節點,判斷是否已到過期時間進行判斷。
-
加入線程池概念,加快消息處理,減少延時消息時間誤差。
-
cluster 模式,可用 redis 的 setnx 命令實現簡單的分佈式鎖,以保證集羣中每次只有一個 timer thread 執行。
個人改動點
- 做成通用性服務,提供統一的 push topic,和統一的 pull topic
整體執行流程:
-
各個業務方把任務發給入口 topic,生成延遲任務,放入某個桶
-
定時器時刻輪詢各個桶,當時間到達,發送消息任務到 Kafka
-
消費端可以從 Kafka 共同出口中取到任務,做相應的業務邏輯
-
出口 topic 接收到消息,Kafka 確認應答一次,保證消息不丟失
微服務延時隊列整體架構圖
例子:
kafka 共同入口 delay_entrance_topic 格式:
kafka 共同出口 delay_exit_topic 格式:
擴展點
-
減少延時時間誤差,使用線程池加快輪訓判斷時間到期
-
cluster 模式,防止其中一臺服務器掛了無法使用,高可用設計,使用定時器維護路由
-
cluseter 模式中,timer 代碼邏輯需要設置分佈式鎖,防止多臺服務器同時執行
-
消息可靠性:保證至少被消費一次,消費不成功,未應答,會重新投遞一次。
可能產生的問題
消息持久化問題:*於 Redis 自身的持久化特性,如果 Redis 數據丟失,意味着延遲消息的丟失,不過可以做主備和集羣保證。這個可以考慮後續優化將消息持久化到 MangoDB 中。
其他延時隊列思路
Netty 時間輪
HashedWheelTimer 流程圖
tickDuration: 每個格子的時間大小,每次轉動的時間
ticksPerWheel:時間輪數組大小
HashedWheelBucket: 數組,記錄 header,tail
HashedWheelTimeOut: 延時任務載體,放於 Bucket 數組中,屬性有:前後指針,round 數等
如果把時間輪看作一個 map,那麼 tickPerWheel 就爲 map 的 size,時間輪開始的時候,會設置一個 startTime,即每 ticket 都可算出延時時間,也就是 map 的 key,value 爲 bucket。
核心代碼,線程 for 循環,校驗 此刻的 bucket 的鏈表是否到了執行時間,到了就立即執行,且 ticket+1,往下走。沒有則會 sleep 一會兒。
long deadline = tickDuration * (tick + 1);
for (;;) {
// 相對時間
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// <=0 說明可以撥動時鐘了
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// 這裏是爲了兼容 Windows 平臺
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
kafka 時間輪
在普通時間輪的基礎上,以空間換時間的思路,用 DelayQueue 去存儲每個 Bucket,DelayQueue 內部有個 PriorityQueue,以每個 bucket 的延時時間進行大小排序,隊首的 bucket 就爲將要執行的任務,如果到期了,則可以直接取出執行,未到則阻塞。依次循環取空優先隊列。
其中的比對時間到期,交給底層 api 去做,Condition.awaitNanos() -> parkNanos() 核心代碼:
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
/*
* Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,
* waits up to timeoutMs before giving up.
*/
def advanceClock(timeoutMs: Long): Boolean = {
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
//驅動時間輪
timingWheel.advanceClock(bucket.getExpiration())
//循環buckek也就是任務列表,任務列表一個個繼續添加進時間輪以此來升級或者降級時間輪,把過期任務找出來執行
bucket.flush(reinsert)
//這裏就是從延遲隊列取出bucket,bucket是有延遲時間的,取出代表該bucket過期,通過bucket能取到bucket包含的任務列表
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
小問題
對於時間計算方面的問題,底層系統提供的 api 爲什麼效率更低呢?
它應該也是循環檢查到期時間,看到有的同學說,更推薦使用底層 api,原理是一樣的,它爲什麼就比放在外面要好些呢?如果有知道的同學,也可在評論區告訴作者,感恩!
XXL_JOB
主要有兩個線程:scheduleThread 負責把 5s 之後要執行的任務,從 db 中掃出來,放到 時間輪 容器中。
ringThread 負責把時針指向的每個到期的任務鏈表,交由快慢線程,rpc 調用指給調度器執行。
分佈式任務調度,多個執行器。任務持久化,任務統一先入庫,延時也是用的傳統時間輪。
總 結
兩個非常核心的問題:
-
一定先給所有的延時任務排序
-
比對時間問題,到了任務執行時間取出來
所以,如果想自己設計一個延時隊列,關鍵是確定這兩個核心問題怎麼解決,其餘的根據自己的業務場景進行調整吧。
巨人肩膀
-
https://juejin.cn/post/6845166891225317384
-
https://juejin.cn/post/6910068006244581390
-
https://juejin.cn/post/6976412313981026318
作者:九尾 1997
來源:juejin.cn/post/7040312801453146142
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/hqPy-SOo2qh0WOJOSilIbA