延時消息常見實現方案

1 前言

延時消息(定時消息)指的在分佈式異步消息場景下,生產端發送一條消息,希望在指定延時或者指定時間點被消費端消費到,而不是立刻被消費。

延時消息適用的業務場景非常的廣泛,在分佈式系統環境下,延時消息的功能一般會在下沉到中間件層,通常是 MQ 中內置這個功能或者內聚成一個公共基礎服務。

本文旨在探討常見延時消息的實現方案以及方案設計的優缺點。

2 實現方案

1. 基於外部存儲實現的方案

這裏討論的外部存儲指的是在 MQ 本身自帶的存儲以外又引入的其他的存儲系統。

基於外部存儲的方案本質上都是一個套路,將 MQ 和 延時模塊 區分開來,延時消息模塊是一個獨立的服務 / 進程。延時消息先保留到其他存儲介質中,然後在消息到期時再投遞到 MQ。當然還有一些細節性的設計,比如消息進入的延時消息模塊時已經到期則直接投遞這類的邏輯,這裏不展開討論。

下述方案不同的是,採用了不同的存儲系統。

基於 數據庫(如 MySQL)

基於關係型數據庫(如 MySQL)延時消息表的方式來實現。

CREATE TABLE `delay_msg` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `delivery_time` DATETIME NOT NULL COMMENT '投遞時間',
  `payloads` blob COMMENT '消息內容',
  PRIMARY KEY (`id`),
  KEY `time_index` (`delivery_time`)
)

通過定時線程定時掃描到期的消息,然後進行投遞。定時線程的掃描間隔理論上就是你延時消息的最小時間精度。

優點:

缺點:

基於 RocksDB

RocksDB 的方案其實就是在上述方案上選擇了比較合適的存儲介質。

RocksDB 在筆者之前的文章中有聊過,LSM 樹更適合大量寫入的場景。滴滴開源的 DDMQ 中的延時消息模塊 Chronos 就是採用了這個方案。

DDMQ 這個項目簡單來說就是在 RocketMQ 外面加了一層統一的代理層,在這個代理層就可以做一些功能維度的擴展。延時消息的邏輯就是代理層實現了對延時消息的轉發,如果是延時消息,會先投遞到 RocketMQ 中 Chronos 專用的 topic 中。延時消息模塊 Chronos 消費得到延時消息轉儲到 RocksDB,後面就是類似的邏輯了,定時掃描到期的消息,然後往 RocketMQ 中投遞。

這個方案老實說是一個比較重的方案。因爲基於 RocksDB 來實現的話,從數據可用性的角度考慮,你還需要自己去處理多副本的數據同步等邏輯。

優點:

缺點:

基於 Redis

再來聊聊 Redis 的方案。下面放一個比較完善的方案。

本方案來源於:基於 Redis 實現延時隊列服務

這個方案選用 Redis 存儲在我看來有幾點考慮。

但是這個方案其實也有需要斟酌的地方,上述方案通過創建多個 Delayed Queue 來滿足對於併發性能的要求,但這也帶來了多個 Delayed Queue 如何在多個節點情況下均勻分配,並且很可能出現到期消息併發重複處理的情況,是否要引入分佈式鎖之類的併發控制設計?

在量不大的場景下,上述方案的架構其實可以蛻化成主從架構,只允許主節點來處理任務,從節點只做容災備份。實現難度更低更可控。

定時線程檢查的缺陷與改進

上述幾個方案中,都通過線程定時掃描的方案來獲取到期的消息。

定時線程的方案在消息量較少的時候,會浪費資源,在消息量非常多的時候,又會出現因爲掃描間隔設置不合理導致延時時間不準確的問題。可以藉助 JDK Timer 類中的思想,通過 wait-notify 來節省 CPU 資源。

獲取中最近的延時消息,然後 wait(執行時間 - 當前時間),這樣就不需要浪費資源到達時間時會自動響應,如果有新的消息進入,並且比我們等待的消息還要小,那麼直接 notify 喚醒,重新獲取這個更小的消息,然後又 wait,如此循環。

2. 開源 MQ 中的實現方案

再來講講目前自帶延時消息功能的開源 MQ,它們是如何實現的

RocketMQ

RocketMQ 開源版本支持延時消息,但是隻支持 18 個 Level 的延時,並不支持任意時間。只不過這個 Level 在 RocketMQ 中可以自定義的,所幸來說對普通業務算是夠用的。默認值爲 “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 個 level。

通俗的講,設定了延時 Level 的消息會被暫存在名爲 SCHEDULE_TOPIC_XXXX 的 topic 中,並根據 level 存入特定的 queue,queueId = delayTimeLevel – 1,** 即一個 queue 只存相同延時的消息,保證具有相同發送延時的消息能夠順序消費。**broker 會調度地消費 SCHEDULE_TOPIC_XXXX,將消息寫入真實的 topic。

下面是整個實現方案的示意圖,紅色代表投遞延時消息,紫色代表定時調度到期的延時消息:

優點:

缺點:

Pulsar

Pulsar 支持 “任意時間” 的延時消息,但實現方式和 RocketMQ 不同。

通俗的講,Pulsar 的延時消息會直接進入到客戶端發送指定的 Topic 中,然後在堆外內存中創建一個基於時間的優先級隊列,來維護延時消息的索引信息。延時時間最短的會放在頭上,時間越長越靠後。在進行消費邏輯時候,再判斷是否有到期需要投遞的消息,如果有就從隊列裏面拿出,根據延時消息的索引查詢到對應的消息進行消費。

如果節點崩潰,在這個 broker 節點上的 Topics 會轉移到其他可用的 broker 上,上面提到的這個優先級隊列也會被重建。

下面是 Pulsar 公衆號中對於 Pulsar 延時消息的示意圖。

乍一看會覺得這個方案其實非常簡單,還能支持任意時間的消息。但是這個方案有幾個比較大的問題:

對於前面第一點和第二點的問題,社區也設計瞭解決方案,在隊列中加入時間分區,Broker 只加載當前較近的時間片的隊列到內存,其餘時間片分區持久化磁盤,示例圖如下圖所示:

但是目前,這個方案並沒有對應的實現版本。可以在實際使用時,規定只能使用較小時間跨度的延時消息,來減少前兩點缺陷的影響。

另外,因爲內存中存的並不是延時消息的全量數據,只是索引,所以可能要積壓上百萬條延時消息纔可能對內存造成顯著影響,從這個角度來看,官方暫時沒有完善前兩個問題也可以理解了。

至於第三個問題,估計是比較難解決的,需要在數據存儲層將延時消息和正常消息區分開來,單獨存儲延時消息。

QMQ

QMQ 提供任意時間的延時 / 定時消息,你可以指定消息在未來兩年內 (可配置) 任意時間內投遞。

把 QMQ 放到最後,是因爲我覺得 QMQ 是目前開源 MQ 中延時消息設計最合理的。裏面設計的核心簡單來說就是 多級時間輪 + 延時加載 + 延時消息單獨磁盤存儲。

如果對時間輪不熟悉的可以閱讀筆者的這篇文章 從 Kafka 看時間輪算法設計

QMQ 的延時 / 定時消息使用的是兩層 hash wheel 來實現的。

第一層位於磁盤上,每個小時爲一個刻度 (默認爲一個小時一個刻度,可以根據實際情況在配置裏進行調整),每個刻度會生成一個日誌文件 (schedule log),因爲 QMQ 支持兩年內的延時消息 (默認支持兩年內,可以進行配置修改),則最多會生成 2 * 366 * 24 = 17568 個文件 (如果需要支持的最大延時時間更短,則生成的文件更少)。

第二層在內存中,當消息的投遞時間即將到來的時候,會將這個小時的消息索引 (索引包括消息在 schedule log 中的 offset 和 size) 從磁盤文件加載到內存中的 hash wheel 上,內存中的 hash wheel 則是以 500ms 爲一個刻度。

總結一下設計上的亮點:

總結

本文彙總了目前業界常見的延時消息方案,並且討論了各個方案的優缺點。希望對讀者有所啓發。

作者:Richard_Yi

來源:juejin.cn/post/7052894117105238053

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/oozm501lYckuFEb7mDqE3Q