基於 Redis 實現特殊的消息隊列

說到消息隊列,首先映入腦海的就是 RocketMQ、Kafka 等,消息隊列在各個領域都發揮了很大的作用。但是,在一些場景下,傳統的消息隊列無法滿足需求,比如以下場景:

本文將介紹一種基於 Redis 實現的消息隊列(Redis message queue, RMQ),RMQ 可以作爲傳統消息隊列的互補選擇,在傳統消息隊列沒有涉及的場景中使用 RMQ。

功能介紹

RMQ 設計爲一個二方庫,可以幫助用戶基於 Redis 快速實現消息隊列的功能,RMQ 消息隊列具有消息合併、區分優先級、支持定時消息等特性。RMQ 消息隊列可以用於異步解耦、削峯填谷,支持億級數據堆積。RMQ 消息隊列目前支持三種類型的消息,分別是 RangeMergeMessage(區間重複合併消息)、PriorityMessage(優先級消息)、FixedTimeMessage(任意定時消息)。

▐****區間重複合併消息

RangeMergeMessage 支持區間重複消息合併,發送消息時需要設置時間區間,消息延遲該時間區間長度後被消費,在該時間區間內如果發送重複的消息,重複消息將會被合併。如果消息在 Redis 服務端發生堆積,重複到來的消息依然會被合併處理。

該類型消息適用於消息重複率較高且希望重複消息合併處理的場景,對重複消息進行合併可以減少下游消費系統的壓力,減少不必要的資源消耗,將有限的資源最大化的利用,提升消費效率。

▐****優先級消息

PriorityMessage 支持給消息設置任意等級的優先級,優先級高的消息會被優先消費,相同優先級的消息被隨機消費。如果消息在 Redis 服務端發生堆積,重複的消息將被合併處理,合併後消息的優先級等於最後存儲的消息的優先級。

該類型消息適用於希望重複消息合併處理且需要設置優先級的場景,下游消費者資源有限時,合併重複消息且優先處理優先級高的消息將可以合理利用有限的資源。

▐****任意定時消息

FixedTimeMessage 支持給消息設置任意消費時間,只有消費時間到了之後消息才被消費,消費時間可精確到秒。消息到期後沒有及時被消費時,消費者將按照時間由遠及近進行消費。如果消息在 Redis 服務端發生堆積,重複的消息將被合併處理,合併後消息的消費時間等於最後存儲的消息的消費時間。

該類型消息適用於希望重複消息合併處理且需要定時消費的場景,定時消息應用場景非常豐富,比如定時打標去標、活動結束後清理動作、訂單超時關閉等。

▐****併發消費控制

使用傳統消息中間件進行集羣消費的時候,爲了避免併發處理同一元數據導致不一致問題,通常需要對元數據加分佈式鎖,頻繁的鎖衝突會導致消費效率低下。加分佈式鎖的最終目的其實就是保障屬於同一元數據的消息被串行消費。加分佈式鎖並不是最好的方案,最好的方案應該是從根上解決併發問題,讓屬於同一元數據的消息串行消費。

RMQ 消息隊列具有併發消費控制能力,屬於同一元數據的消息只會被分配給全局唯一一個線程進行消費,因此屬於同一元數據的消息將被串行消費。使用方如果需要該能力,除了需要提供 Redis,還需要提供 ZooKeeper。

▐****重試次數控制

RMQ 消息隊列支持失敗重試消費 16 次,業務返回消費失敗後,消息會被回滾並等待重試消費,重試 16 次後消息進入死信隊列,消息不再被消費,除非人工干預。

技術原理

▐****總體框架

RMQ 消息隊列由三部分組成,分別爲 ZooKeeper、RMQ 二方庫、Redis。ZooKeeper 負責維護集羣 worker 的信息,將 topic 的所有 slot 分配給全局的 woker。Redis 負責存儲消息,採用 Sorted Set 結構存儲,Store Queue 是消息存放的隊列,Prepare Queue 是採用二階段消費方式正在消費的消息存放隊列,Dead Queue 是死信隊列。RMQ 二方庫由 RmqClient、Consumer、Producer 三部分組成。RmqClient 負責 RMQ 的啓動工作,包括上報 TopicDef、Worker 給 ZooKeeper,分配 Slot 給 Worker,掃描業務定義的 MessageListener Bean。Producer 負責根據不用消息類型將消息按照指定的方式存儲到 Redis。Consumer 負責根據不用消息類型按照指定方式從 Redis 彈出消息並調用業務的 MessageListener。

▐****消息存儲

Topic 的定義有三部分組成,topic 表示主題名稱,slotAmount 表示消息存儲劃分的槽數量,topicType 表示消息的類型。主題名稱是一個 Topic 的唯一標示,相同主題名稱 Topic 的 slotAmount 和 topicType 一定是一樣的。

消息存儲採用 Redis 的 Sorted Set 結構,爲了支持大量消息的堆積,需要把消息分散存儲到很多個槽中,slotAmount 表示該 Topic 消息存儲共使用的槽數量,槽數量一定需要是 2 的 n 次冪。在消息存儲的時候,採用對指定數據或者消息體哈希求餘得到槽位置。

上圖中 topic 劃分了 8 個槽位,編號 0-7。如果發送方指定了消息的 slotBasis,則計算 slotBasis 的 CRC32 值,CRC32 值對槽數量進行取模得到槽序號,SlotKey 設計爲 #{topic}_#{index}(也即 Redis 的鍵),其中 #{} 表示佔位符。

發送方需要保證相同內容的消息的 slotBasis 相同,如果沒有指定 slotBasis 則採用消息內容計算 SlotKey,這樣內容相同的消息體就會落在同一個 Sorted Set 裏面,所以內容相同的消息會進行合併。

Redis 的 Sorted Set 中的數據按照分數排序,實現不同類型的消息的關鍵就在於如何利用分數、如何添加消息到 Sorted Set、如何從 Sorted Set 中彈出消息。優先級消息將優先級作爲分數,消費時每次彈出分數最大的消息。任意定時消息將時間戳作爲分數,消費時每次彈出分數大於當前時間戳的一個消息。

區間重複合併消息將時間戳作爲分數,添加消息時將(當前時間戳 + 時間區間)作爲分數,消費時每次彈出分數大於當前時間戳的一個消息。

爲了保障 RMQ 消息隊列的可用性,做到每條消息至少消費一次,消費者不是直接 pop 有序集合中的元素,而是將元素從 StoreQueue 移動到 PrepareQueue 並返回消息給消費者,等消費成功後再從 PrepareQueue 從刪除,或者消費失敗後從 PreapreQueue 重新移動到 StoreQueue,這便是根據二階段提交的思想實現的二階段消費。

在後面將會詳細介紹二階段消費的實現思路,這裏重點介紹下 PrepareQueue 的存儲設計。StoreQueue 中每一個 Slot 對應 PrepareQueue 中的 Slot,PrepareQueue 的 SlotKey 設計爲 prepare{#{topic}#{index}}。PrepareQueue 採用 Sorted Set 作爲存儲,消息移動到 PrepareQueue 時刻對應的(秒級時間戳 * 1000 + 重試次數)作爲分數,字符串存儲的是消息體內容。這裏分數的設計與重試次數的設計密切相關,所以在重試次數設計章節詳細介紹。

PrepareQueue 的 SlotKey 設計中需要注意的一點,由於消息從 StoreQueue 移動到 PrepareQueue 是通過 Lua 腳本操作的,因此需要保證 Lua 腳本操作的 Slot 在同一個 Redis 節點上,如何保證 PrepareQueue 的 SlotKey 和對應的 StoreQueue 的 SlotKey 被 hash 到同一個 Redis 槽中呢。Redis 的 hash tag 功能可以指定 SlotKey 中只有某一部分參與計算 hash,這一部分採用 {} 包括,因此 PrepareQueue 的 SlotKey 中採用 {} 包括了 StoreQueue 的 SlotKey。

消息重試消費 16 次後,消息將進入 DeadQueue。DeadQueue 的 SlotKey 設計爲 prepare{#{topic}#{index}},這裏同樣採用 hash tag 功能保證 DeadQueue 的 SlotKey 與對應 StoreQueue 的 SlotKey 存儲在同一 Redis 節點。

▐****生產者

生產者的任務就是將消息添加到 Redis 的 Sorted Set 中。首先,需要計算出消息添加到 Redis 的 SlotKey,如果發送方指定了消息的 slotBasis(否則採用 content 代替),則計算 slotBasis 的 CRC32 值,CRC32 值對槽數量進行取模得到槽序號,SlotKey 設計爲 #{topic}_#{index},其中 #{} 表示佔位符。然後,不同類型的消息有不同的添加方式,因此分佈講述三種類型消息的添加過程。

發送該消息時需要設置 timeRange,timeRange 必須大於 0,單位爲毫秒,表示消息將延遲 timeRange 毫秒後被消費,期間到來的重複消息將被合併,合併後的消息依然維持原來的消費時間。

因此在存儲該類型消息的時候,採用(當前時間戳 + timeRange)作爲分數,添加消息採用 Lua 腳本執行,保證操作的原子性,Lua 腳本首先採用 zscore 命令檢查消息是否已經存在,如果已經存在則直接返回,如果不存在則執行 zadd 命令添加。

發送該消息時需要設置 priority,priority 必須大於 16,表示消息的優先級,數值越大表示優先級越高。因此在存儲該類型消息的時候,採用 priority 作爲分數,採用 zadd 命令直接添加。

發送該類型消息時需要設置 fixedTime,fixedTime 必須大於當前時間,表示消費時間戳,當前時間大於該消費時間戳的時候,消息纔會被消費。因此在存儲該類型消息的時候,採用 fixedTime 作爲分數,採用命令 zadd 直接添加。

▐****消費者

三種消費模式

一般消息隊列存在三種消費模式,分別是:最多消費一次、至少消費一次、只消費一次。最多消費一次模式消息可能丟失,一般不怎麼使用。至少消費一次模式消息不會丟失,但是可能存在重複消費,比較常用。只消費一次模式消息被精確只消費一次,實現較困難,一般需要業務記錄冪等 ID 來實現。RMQ 實現了至少消費一次的模式,那麼如何保證消息至少被消費一次呢?

至少消費一次模式實現的難點

從最簡單的消費模式——最多消費一次說起,消費者端只需要從消息隊列服務中取出消息就行,即執行 Redis 的 zpopmax 命令,不倫消費者是否接收到該消息併成功消費,消息隊列服務都認爲消息消費成功。最多一次消費模式導致消息丟失的因素可能有:網絡丟包導致消費者沒有接收到消息,消費者接收到消息但在消費的時候宕機了,消費者接收到消息但消費失敗。針對消費失敗導致消息丟失的情況比較好解決,只需要把消費失敗的消息重新放入消息隊列服務就行,但是網絡丟包和消費系統異常導致的消息丟失問題不好解決。

可能有人會想到,我們不把元素從有序集合中 pop 出來,我們先查詢優先級最高的元素,然後消費,再刪除消費成功的元素,但是這樣消息服務隊列就變成了同步阻塞隊列,性能會很差。

至少消費一次模式的實現

至少消費一次的問題比較類似銀行轉賬問題,A 向 B 賬戶轉賬 100 元,如何保障 A 賬戶扣減 100 同時 B 賬戶增加 100,因此我們可以想到二階段提交的思想。第一個準備階段,A、B 分別進行資源凍結並持久化 undo 和 redo 日誌,A、B 分別告訴協調者已經準備好;第二個提交階段,協調者告訴 A、B 進行提交,A、B 分別提交事務。RMQ 基於二階段提交的思想來實現至少消費一次的模式。

RMQ 存儲設計中 PrepareQueue 的作用就是用來凍結資源並記錄事務日誌,消費者端即是參與者也是協調者。第一個準備階段,消費者端通過執行 Lua 腳本從 StoreQueue 中 Pop 消息並存儲到 PrepareQueue,同時消息傳輸到消費者端,消費者端消費該消息;第二個提交階段,消費者端根據消費結果是否成功協調消息隊列服務是提交還是回滾,如果消費成功則提交事務,該消息從 PrepareQueue 中刪除,如果消費失敗則回滾事務,消費者端將該消息從 PrepareQueue 移動到 StoreQueue,如果因爲各種異常導致 PrepareQueue 中消息滯留超時,超時後將自動執行回滾操作。二階段消費的流程圖如下所示。

實現方案的異常情況分析

我們來分析下采用二階段消費方案可能存在的異常情況,從以下分析來看二階段消費方案可以保障消息至少被消費一次。

  1. 網絡丟包導致消費者沒有接收到消息,這時消息已經記錄到 PrepareQueue,如果到了超時時間,消息被回滾放回 StoreQueue,等待下次被消費,消息不丟失。

  2. 消費者接收到了消息,但是消費者還沒來得及消費完成系統就宕機了,消息消費超時到了後,消息會被重新放入 StoreQueue,等待下次被消費,消息不丟失。

  3. 消費者接收到了消息並消費成功,消費者端在協調事務提交的時候宕機了,消息消費超時到了後,消息會被重新放入 StoreQueue,等待下次被消費,消息被重複消費。

  4. 消費者接收到了消息但消費失敗,消費者端在協調事務提交的時候宕機了,消息消費超時到了後,消息會被重新放入 StoreQueue,等待下次被消費,消息不丟失。

  5. 消費者接收到了消息並消費成功,但是由於 fullgc 等原因使消費時間太長,PrepareQueue 中的消息由於超時已經回滾到 StoreQueue,等待下次被消費,消息被重複消費。

採用二階段消費方式,需要將消息在 StoreQueue 和 PrepareQueue 之間移動,如何實現重試次數控制呢,其關鍵在 StoreQueue 和 PrepareQueue 的分數設計。

PrepareQueue 的分數需要與時間相關,正常情況下,消費者不管消費失敗還是消費成功,都會從 PrepareQueue 刪除消息,當消費者系統發生異常或者宕機的時候,消息就無法從 PrepareQueue 中刪除,我們也不知道消費者是否消費成功,爲保障消息至少被消費一次,我們需要做到超時回滾,因此分數需要與消費時間相關。當 PrepareQueue 中的消息發生超時的時候,將消息從 PrepareQueue 移動到 StoreQueue。

因此 PrepareQueue 的分數設計爲:秒級時間戳 * 1000 + 重試次數。不同類型的消息首次存儲到 StoreQueue 中的分數表示的含義不盡相同,區間重複合併消息和任意定時消息存儲時的分數表示消費時間戳,優先級消息存儲時的分數表示優先級。如果消息消費失敗,消息從 PrepareQueue 回滾到 StoreQueue,所有類型的消息存儲時的分數都表示剩餘重試次數,剩餘重試次數從 16 次不斷降低最後爲 0,消息進入死信隊列。消息在 StoreQueue 和 PrepareQueue 之間移動流程如下:

不同類型的消息在消費的時候 Pop 消息的方式不一樣,因此接下來分別講述三種類型消息的 Pop 方式。

區間重複合併消息

該消息存儲的分數設計爲消費時間戳,當前時間大於消息的消費時間戳時,該消息應該被消費。因此採用 Redis 命令 ZRANGEBYSCORE 彈出分數小於當前時間戳的一條消息。

優先級消息

該消息存儲的分數設計爲優先級,優先級越高分數越大,因此採用 Redis 命令 ZPOPMAX 彈出分數最大的一條消息。

任意定時消息

該消息存儲的分數設計爲消費時間戳,當前時間大於消息的消費時間戳時,該消息應該被消費。因此採用 Redis 命令 ZRANGEBYSCORE 彈出分數小於當前時間戳的一條消息。

相關應用

▐****主圖價格表達項目

在主圖價格表達中需要實現一個功能,商品價格發生變化時將商品價格打印在商品主圖上面,那麼需要在價格發生變動的時候觸發合成一張帶價格的圖片,每一次觸發合圖時計算價格都是獲取當前最新的價格。上游價格變化的因素很多,變化很頻繁,下游合圖消耗 GPU 資源較大,處理容量較低。因此需要儘可能合併觸發合圖消息,減輕下游處理壓力,於是使用了 RMQ 作爲消息隊列來進行削峯填谷、消息合併。不僅如此,還可以根據商家等級劃分觸發合圖消息的等級,使 KA 商家能夠優先得到處理,縮短價格變化的延遲。

在線上實際環境中,集羣共 130 臺機器,RMQ 消息隊列的發送消息能力和消費消息能力均可以達到 5w tps,而且這並不是峯值,理論上可以達到 10w tps。

▐****在線數據圈選引擎

在線數據圈選引擎需要處理各種來源的大量動態數據,需要將一段時間區間內的消息合併處理,減少處理壓力,並且在對同一元數據進行併發處理需要加分佈式鎖,鎖衝突導致消費效率下降。RMQ 的區間重複合併消息和併發消費控制能力可以幫助解決這些問題。目前,在線數據圈選引擎已經採用了 RMQ 消息隊列作爲核心組件,RMQ 消息隊列發揮了很大的作用。

總結

本文提出了一種可實現的基於 Redis 的消息隊列,充分利用 Sorted Set 結構設計了消息合併、優先級、定時等特性,與傳統消息隊列形成互補,彌補傳統消息隊列這方面特性的缺失。爲了實現高可用,本文在二階段提交的思想上進行改進設計了二階段消費方式,保障消息至少被消費一次。

未來將基於 Redis 的特性打造更多獨特的功能,與傳統消息中間件形成互補。在消費控制方面會增加流量自動調控能力,根據消息類型調控消費速度,減少因爲某種類型消息消費瓶頸導致整體消費性能下降。

作者 | 默達

編輯 | 橙子君

出品 | 阿里巴巴新零售淘系技術

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QTDrVEhp4XBFEowgaJcOgw