如何設計一個合適的延時隊列

項目背景

延遲隊列,它是一種帶有延遲功能的消息隊列,目前工作中有幾處需延時處理的應用場景。

可選技術參考

kafka

考慮前提:由於項目代碼與業務方交互大多采用 kafka,所以想是否能自己集成一個 kafka 延遲隊列,直接提供延遲功能,更方便使用。

大致思路:借鑑 rocketMQ 延遲隊列設計思想,創建多個 topic 用於處理不同的延遲消息,例如延遲一分鐘的任務消息,讓 topic 爲 delay-minutes-1 進行處理。

流程圖:

解決問題:如何讓延遲消息等待一段時間才發送到真正的 topic 裏面?

:KafkaConsumer 提供了暫停和恢復的 API 函數,當消費者發現不滿足消費時間條件時,可以先暫停消費者,並把消費偏移量移動到上次位置,進行等待下次消費。

缺點:kafka 內部改造複雜度較高,由於要使 consumer 進行 pause,還需要額外的做一些健康檢查操作,在狀態不對時可以報警或者重啓。另外,不支持靈活設置延時時間。

rocketMQ

考慮前提:底層代碼已經全部封裝好,直接使用,不用關心底層代碼,可以實現與業務進行解藕。

大致原理思路:

流程圖:

缺點:

redis

考慮前提: Redisson 延時隊列,代碼 redis 已經封裝好,可以直接拿來用。redisson.getBlockingQueue() 和 Redission.getDelayQueue()

大致原理思路: https://zhuanlan.zhihu.com/p/343811173

三個核心集合結構:

延時隊列:數據入隊的隊列

目標 blocking 隊列 :到期數據待 consume

timeoutSet 過期時間 zset:分數值爲 timeout,輔佐判斷元素是否過期。 

實現 Timer :

運用了 redis 的 sub/pub 功能,當有數據 put 的時候,先把它放到一個 zset 集合,同時發佈訂閱的 key,發佈內容爲數據到期的 timeout,此時客戶端開啓了一個延時任務(HashedWheelTimer),到了時間,從 zset 分頁取出到期了的數據,放入 blocking 隊列中。

缺點:

基於 Redisson 方案進行改造思路

有讚的延時隊列

https://tech.youzan.com/queuing_delay/

實現邏輯圖

各個組件含義:

job:需要異步處理的任務,是最基本單元,其中屬性包含,自定義唯一 jobid,topic 任務類型,delayTime 任務執行時間,ttrtime 執行超時時間,message 具體消息內容。 

job pool :用來存放 Job 的原信息,是個 map 結構

Delay Bucket :一組以時間爲維度的有序隊列(這裏只存放 job Id),bucket 的數據結構就是 redis 的 zset,將其分爲多個 bucket 是爲了提高掃描速度,降低消息延遲

Timer: 實時掃描各個 Bucket,並將 delay 時間小於等於當前時間的 job 放入到對應的 Ready Queue。

自己實現中,此處的 Ready Queue 替換一個共同的 kafka topic 出口:存放處於 Ready 狀態的 Job,以供客戶端消費程序消費。timer 到時間直接發送到 kafka

對比 Redisson 改動點

個人改動點

整體執行流程:

微服務延時隊列整體架構圖

例子:

kafka 共同入口 delay_entrance_topic 格式:

vOkcIq

kafka 共同出口 delay_exit_topic 格式:

4f2ZGY

擴展點

可能產生的問題

消息持久化問題:*於 Redis 自身的持久化特性,如果 Redis 數據丟失,意味着延遲消息的丟失,不過可以做主備和集羣保證。這個可以考慮後續優化將消息持久化到 MangoDB 中。

其他延時隊列思路

Netty 時間輪

HashedWheelTimer 流程圖

tickDuration: 每個格子的時間大小,每次轉動的時間

ticksPerWheel:時間輪數組大小

HashedWheelBucket: 數組,記錄 header,tail

HashedWheelTimeOut: 延時任務載體,放於 Bucket 數組中,屬性有:前後指針,round 數等

如果把時間輪看作一個 map,那麼 tickPerWheel 就爲 map 的 size,時間輪開始的時候,會設置一個 startTime,即每 ticket 都可算出延時時間,也就是 map 的 key,value 爲 bucket。

核心代碼,線程 for 循環,校驗 此刻的 bucket 的鏈表是否到了執行時間,到了就立即執行,且 ticket+1,往下走。沒有則會 sleep 一會兒。

            long deadline = tickDuration * (tick + 1);
            for (;;) {
                // 相對時間
                final long currentTime = System.nanoTime() - startTime;
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
                // <=0 說明可以撥動時鐘了
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }
                // 這裏是爲了兼容 Windows 平臺
                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                }
                try {
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

kafka 時間輪

在普通時間輪的基礎上,以空間換時間的思路,用 DelayQueue 去存儲每個 Bucket,DelayQueue 內部有個 PriorityQueue,以每個 bucket 的延時時間進行大小排序,隊首的 bucket 就爲將要執行的任務,如果到期了,則可以直接取出執行,未到則阻塞。依次循環取空優先隊列。

其中的比對時間到期,交給底層 api 去做,Condition.awaitNanos() -> parkNanos() 核心代碼:

private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
/*
 * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,
 * waits up to timeoutMs before giving up.
 */
def advanceClock(timeoutMs: Long): Boolean = {
  var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
  if (bucket != null) {
    writeLock.lock()
    try {
      while (bucket != null) {
        //驅動時間輪
        timingWheel.advanceClock(bucket.getExpiration())
       //循環buckek也就是任務列表,任務列表一個個繼續添加進時間輪以此來升級或者降級時間輪,把過期任務找出來執行
        bucket.flush(reinsert)
        //這裏就是從延遲隊列取出bucket,bucket是有延遲時間的,取出代表該bucket過期,通過bucket能取到bucket包含的任務列表
        bucket = delayQueue.poll()
      }
    } finally {
      writeLock.unlock()
    }
    true
  } else {
    false
  }
}

小問題

對於時間計算方面的問題,底層系統提供的 api 爲什麼效率更低呢?

它應該也是循環檢查到期時間,看到有的同學說,更推薦使用底層 api,原理是一樣的,它爲什麼就比放在外面要好些呢?如果有知道的同學,也可在評論區告訴作者,感恩!

XXL_JOB

主要有兩個線程:scheduleThread 負責把 5s 之後要執行的任務,從 db 中掃出來,放到 時間輪 容器中。

ringThread 負責把時針指向的每個到期的任務鏈表,交由快慢線程,rpc 調用指給調度器執行。

分佈式任務調度,多個執行器。任務持久化,任務統一先入庫,延時也是用的傳統時間輪。

總 結

兩個非常核心的問題:

PZIjA3

 所以,如果想自己設計一個延時隊列,關鍵是確定這兩個核心問題怎麼解決,其餘的根據自己的業務場景進行調整吧。

巨人肩膀

作者:九尾 1997

來源:juejin.cn/post/7040312801453146142

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/hqPy-SOo2qh0WOJOSilIbA