萬字長文解析如何基於 Redis 實現消息隊列
0 前言
去年年底,和大家分享了一個個人項目的實現方案——基於協程池架構實現分佈式定時器 XTimer.
在這次分享中我有提到, xtimer 前身是基於消息隊列架構實現的 workflow timer. 之所以在演進過程中把實現方式改爲協程池,主要是考慮到消息隊列組件存在比較高的使用和維護成本. 然而從技術流程本身出發,基於消息隊列的實現方式實際上對於分佈式定時器的核心流程解耦以及縱向架構擴展都是有利的.
1 消息隊列
要想把本期分享話題聊到位,首先我們需要理清楚,一個合格的消息隊列(MQ,message queue)應該具備哪些核心能力.
1.1 核心能力
談到消息隊列,其最重要的兩項能力就是解耦和削峯.
針對於 mq 的解耦功能而言,這裏舉個生活中的例子來幫助大家進一步理解這個概念.
假設我們在網上購買了一些商品. 在上班開會的時候,快遞小哥打通了我們的電話,告知物品已經送達到我們的家門口.
這個時候,倘若這個快遞小哥是個比較耿直的人,強烈要求我們必須立刻過去當面簽收,這個流程才能結束. 那此時的我們就尬住了. 我們得和領導打個招呼,趕回家去簽收快遞. 在我們到家之前,快遞小哥也必須一直守在原地等我們回去簽收交接. 整個流程是比較僵硬的,在這一次交接動作完成之前,雙方都沒辦法再靈活處理其他事項了.
上述例子就類似於我們在業務流程中基於 http/rpc 發起的一次同步請求,上游(快遞小哥)在發出請求後(打電話),會阻塞等待下游(作爲簽收方的我們)給到反饋(完成簽收操作),否則整個流程會一直阻塞住.
然而在實際場景中,我們知道還有一個叫作 “快遞超市” 的存在. 當快遞到達時,快遞小哥可以將我們的物品先存放在快遞超市中,登記好接收方的個人信息後,並給接收方發完通知短信後,快遞小哥就可以先撤離現場,去忙活其他事情了. 接下來,快遞超市會爲接收方承擔起託管快遞的職責,接收方只需要選擇在合適的時間去快遞超市收取物品即可.
這個流程相比之下就顯得靈活很多,由於有快遞超市這個緩衝區的存在,使得我們和快遞小哥之間的交互流程能夠實現解耦. 在這個流程中,快遞小哥就類似於生產者 producer,我們作爲接收方,類似於消費者 consumer,而負責承上啓下、託管快遞的快遞超市則類似於消息隊列 mq.
聊完了這個生活場景,我們再從技術視角出發,對 mq 所帶來的解耦能力進行一輪闡述:
-
在有了 mq 後,producer 不需要過分關心 consumer 的身份信息,只需要把消息按照指定的協議投遞到對應的 topic 即可
-
producer 在處理請求時,只需要把消息投遞到 mq 即可認爲流程處理結束,相比於同步請求下游,整個流程會更加輕便靈活,擁有更高的吞吐量
-
因爲有 mq 作爲緩衝層. 下游 consumer 可以設定好合適的消費限流參數,按照指定的速率進行消費,能夠在很大程度上對 consumer 起到保護作用
下面我們再用同樣的例子說明一下消息隊列的另一項核心功能——削峯.
假設現在正值雙十一時期,我們剁手一通買買買,導致同時有大量的快遞在同一個時段到達. 這時候,快遞超市就爲我們起到 “削峯” 的效果. 不論快遞數量的多少,我們不用第一時間立刻進行響應處理,而是能夠選擇在合適的時間到達快遞超市進行取件. 如果快遞數量很大,我們一次拿不完的話,我們也可以量力而行,每次只收取一部分,分成多個批次處理.
這個流程就類似於 mq 所帶來的消息削峯的能力. 在實際的生產環境中,倘若上游請求量很大,而下游都需要第一時間進行同步響應的話,這對於下游系統可能產生很大的負荷. 此時如果能把同步流程轉爲異步,把消息放到 mq 組件中進行一輪緩衝,讓下游可以根據自身的處理能力,按照自己的節奏消化這部分積攢的流量,這對於下游系統來說能起到很好的保護作用.
前面是從宏觀功能的視角出發,聊到了 mq 對應的異步流程所具備的優勢. 下面我們再談一談,作爲 mq 組件需要具備哪些基礎能力:
-
消息不丟失
-
producer 將 msg 投遞到 mq 時不出現丟失
-
msg 存放在 mq 時不出現丟失
-
consumer 從 mq 消費 msg 時不出現丟失
針對於上述第二點,各 mq 組件在實現上大抵上是基於數據落盤 + 數據備份的方式保證的.
而針對於上述的一、三點,則是通過兩個交互環節中的 ack 機制保證的. 以 producer 投遞 msg 到 mq 的環節爲例,只要 mq 沒有給到投遞成功的 ack 反饋,那麼 producer 就應該把本次投遞流程視爲失敗,執行重新投遞的操作. consumer 的消費流程同樣如此.
因此,mq 交互流程主要通過 ack 機制保證消息投遞以及消費環節做到 at least once(至少一次)的語義,然而無法保證消息不重複的問題. 因此,處於最下游的消費者 consumer 需要能夠具備消息冪等去重的能力,避免流程被重複處理.
- 支持消息存儲
另一項能力是支持消息的存儲. 以我們前面提到的取快遞的例子來說,快遞超市需要有一個實體店面,店面具有着一定的容量能夠存放一定數量的快遞. 這樣當下遊 consumer 沒來得及第一時間消費消息時,消息能緩存在 mq 組件中一段時間,讓消費方自由選擇合適的時間過來進行消費.
1.2 流程類型
我們定義 mq 類型時,可以從多個維度出發. 這裏我主要根據 consumer 消費的流程,將 mq 分爲 push 型和 pull 型.
- push 型:
push 型指的是當 producer 將消息投遞到 mq 時,由 mq 負責將消息以推送的形式主動發送給各個建立了訂閱關係的消費方.
- pull 型:
pull 型指的是當 mq 中存在消息時,由 consumer 主動執行拉取消息的操作.
關於以上兩種 mq 類型,在這裏個人有一些比較淺顯的認知:
對於 push 型,存在的優勢是:
-
流程實時性比較強,消息來了就執行推送
-
比較契合發佈 / 訂閱的模型
劣勢:
- 對下游 consumer 的保護力度不夠. mq 的核心功能是解耦、削峯,本質上是提供了一個緩衝的空間,讓 consumer 能根據自己的消費能力在合適的時機進行消息處理. 所以 push 型在這方面體現的優勢不夠明顯,消息到達後就需要向各個 consumer 發起推送. 不過這個問題可以在一定程度上通過消費限流的方式加以彌補.
對於 pull 型則剛好相反:
-
優勢是:下游握有消費操作的主動權,能選擇在合適的時機執行消費操作
-
劣勢是:實時性會弱一些,和主動 pull 的輪詢機制有關
1.3 redis 實現 mq 的問題
理清楚 mq 的核心功能和訴求後,下面我們先明確一下基於 redis 實現 mq 存在的一類通用問題:
- 存儲昂貴
redis 本身是基於內存實現的緩存組件,因此在存儲消息時總容量相對有限.
- 數據丟失
此外,redis 存儲消息時會不可避免地存在數據丟失的風險,可以從兩個方面出發考慮:
-
內存是易失性存儲. 即便 redis 中有 rdb/aof 之類的持久化機制加以彌補,但這個持久化流程是異步執行的,無法提供百分百的保證力度
-
redis 走的是 ap 高可用流派,數據的主從複製流程是異步執行的,主從切換時數據存在弱一致問題
以上問題,不論是在 redis 緩存數據還是實現 mq 的流程中都是存在的,這個問題我們在選型使用 redis 時需要做到了然於心,這一點在後續 4.3 小節中會進一步展開說明.
2 redis list
基於 redis 實現 mq 的方式之一是使用 redis 中的 list 結構.
redis 中的 list 是一個雙向鏈表,天然契合 mq 中的 queue 隊列模型. 我們在使用 redis list 實現 mq 時,可以生產消息的流程具象化爲一次將數據追加到 list 尾部的操作;同時,我們可以把消費消息的流程具象化爲一次從 list 頭部摘取數據的操作. 如此這般,一個簡易版的消息隊列就實現了.
2.1 操作指令
下面我們對涉及到的幾個指令展開介紹:
首先,在使用 list 充當消息隊列時,list 對應的 key 則對應爲消息的 topic 名稱.
producer 在投遞消息時,可以使用 lpush 指令,對應的時間複雜度爲 O(1). 指令文檔鏈接:https://redis.io/commands/lpush/
127.0.0.1:6379> lpush my_list_topic msg
(integer) 1
-
• my_list_topic: topic 名稱
-
• msg:投遞的消息內容
consumer 消費消息時,使用 rpop 指令,對應的時間複雜度 O(1). 指令文檔鏈接:https://redis.io/commands/rpop/
127.0.0.1:6379> rpop my_list_topic
"msg"
-
my_list_topic:topic 名稱
-
msg:獲取到的消息
2.2 消費流程分析
在上述流程中,存在的第一個問題是,consumer 的輪詢消費流程應該如何組織.
這種基於 list 實現的 mq 是屬於 pull 類型. 消費方自行組織流程,並在合適的時機通過 rpop 執行進行消息的主動拉取.
首先,consumer 在消費時,一定是一個類似於 loop thread 的自旋模型,每一輪循環中,通過 rpop 指令嘗試從 list 中讀取消息,如果成功讀取到了消息,則進行相應的邏輯處理.
然而在此處,需要注意的是,redis 的 rpop 指令是非阻塞型的,即在 list 沒有數據時,也會即時返回一個結果爲 nil 的響應,這樣我們在組織這段自旋程序的時候就顯得有些尷尬了:
-
倘若我們在 rpop 捕捉到 nil 時,立即開啓下一輪循環,則這個輪詢行爲可能是沒有意義的,因爲 list 中可能仍然不存在數據. 這樣的高頻率自旋,對於 cpu 資源是一種無謂的損耗
-
倘若我們選擇讓 consumer 休眠一段時間進行循環,這個休眠的時長又具有一定的人爲誤判性. 倘若我們把時長設得太短,仍然會存在 cpu 浪費的問題;倘若設得太長,則可能會導致消息處理不及時的問題
在這個過程中,最理想的實現方案是,在 list 中有數據到達時,我們令 consumer 即時獲取到對應的結果;倘若 list 數據爲空,則令 consumer 陷入阻塞等待的狀態,直到有數據抵達時程序才被喚醒.
所幸,這個訴求在 redis list 中是可以滿足的. 我們可以使用 redis 中的 brpop 指令替代 rpop 指令,做到在有數據時才返回響應,否則令當前程序陷入阻塞. brpop 指令文檔:https://redis.io/commands/brpop/
127.0.0.1:6379> brpop my_list_topic 0
1) "my_list_topic"
2) "msg"
-
my_list_topic: topic 名稱
-
0:阻塞等待的超時時長,達到此閾值仍未獲取數據時會返回 nil. 如果設置爲 0 ,則代表沒有這個超時限制.
2.3 侷限性分析
即便我們採用 brpop 解決了 consumer 合理阻塞消費數據的問題,這種基於 redis list 實現的 mq 仍然不能稱爲一個成熟的實現方案,其中主要存在着以下幾項缺陷:
- 無法支持發佈 / 訂閱模式
list 中的數據是獨一份的,被 pop 出去後就不復存在了.
因此 redis 中的 list 是無法支持 mq 中的發佈 / 訂閱模式的,即下游倘若存在多個獨立的消費者組 consumer group,各自都需要獨立獲取一份完整的數據,那麼此時通過 redis list 是無法滿足這個訴求的.
- 無法支持消費端 ack 機制
consumer 通過 brpop 獲取到數據後,倘若發生宕機或者其他意外錯誤,沒有一種有效的手段能給予 mq 一個消息處理失敗的反饋. 這條消息一旦從 list 中被取走,就不再有機會被重新獲取了,因此在這個場景下,消息就真的丟失了.
3 redis pub/sub
爲解決 redis list 存在的無法支持發佈 / 訂閱模式的問題,redis 提供了 pub/sub 機制,能夠有效地彌補這方面的缺陷.
pub/sub 全稱爲 publish/subscribe,顧名思義,指的正是消息隊列中的發佈 / 訂閱模式.
爲了貼合 pub/sub 的語義,在本章中,我們統一把生產者 producer 稱爲 publisher,消費者 consumer 稱爲 subscriber,大家留意一下.
在實現上,pub/sub 會在 publisher 和 subscriber 之間建立一個用於實時通訊的信道——channel. 在傳遞消息時,會根據 channel 查找到所有建立過訂閱關係的 subscriber,一一將消息送達到它們手中.
3.1 操作指令
下面我們對使用 pub/sub 實現 mq 流程涉及到的幾個核心指令進行介紹.
首先,消費方 subscriber 通過 subscribe 指令建立對某個 channel 的訂閱關係. 指令文檔:https://redis.io/commands/subscribe/
127.0.0.1:6379> subscribe my_channel_topic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "my_channel_topic"
3) (integer) 1
- • my_channel_topic: topic 名稱
每個通過 subscribe 指令建立 channel 訂閱關係的使用方都會被視爲一個獨立的 subscriber,後續 channel 中有消息到達時,會被複製成多份,一一推送到各個 subscriber 手中.
生產方 publisher 通過 publish 指令往對應的 channel 中執行消息投遞操作. 指令文檔:https://redis.io/commands/publish/
127.0.0.1:6379> publish my_channel_topic msg
(integer) 1
此時,之前對這個 channel 執行過 subscribe 操作的 subscriber 都會接收到這則消息:
1) "message"
2) "my_channel_topic"
3) "msg"
此外,值得一提的是,消費者通過 subscribe 指令會對 channel 採用阻塞模式進行監聽,只有在有消息到來時,纔會從阻塞狀態中被喚醒.
3.2 實現原理
下面我們對 redis pub/sub 模式的實現原理進行介紹. 在理清原理後,我們才能對這個流程中存在的問題進行更爲通透的認識和分析.
-
首先,消費方 subscriber 通過 subscribe 指令建立和指定 channel 之間的訂閱關係. 這時在 redis 中會維護好 channel 和對應 subscriber 列表的映射關係,並在內存中爲每個在線活躍的 subscriber 分配好一個緩衝區 buffer,用以承載後續到來的消息數據
-
接下來隨着 publisher 執行 publish 指令,往對應 channel 中投遞消息後,此時 redis 會實時查看 channel 對應 subscriber 名單,往每個 subscriber 的緩衝區 buffer 中推送這條數據
-
各執行了 subscribe 指令的 subscriber 會處於阻塞監聽緩衝區 buffer 的狀態,隨着新數據到達,subscriber 會獲取到這筆數據
基於這個流程,我們能看出來,pub/sub 對於 channel 以及 subscribers 之間的實時映射關係存在強依賴. 因此在操作的執行順序上,我們需要保證先執行 subscribe 指令,再執行 publish 執行,否則前幾筆 publish 投遞的數據就會因爲不存在 subscriber 而被直接丟棄.
3.3 優缺點分析
理完了 pub/sub 的實現原理,下面我們可以對其存在的優劣勢進行分析了.
首先,pub/sub 模式最大的優勢就是能夠支持發佈 / 訂閱能力,同一份消息會被推送給所有通過 subscribe 操作訂閱了該 channel 的 subscriber.
然而,pub/sub 存在的問題是很顯著的,就是丟消息問題. 這個問題可以從多個維度展開:
- 缺乏 ack 機制
這個問題和 redis list 相同, subscriber 在獲取到消息後,沒有對應的 ack 機制,因此倘若處理失敗,想要執行消息的重放操作是無法做到的
- 缺乏消息存儲能力
我覺得 redis pub/sub 機制就有點類似於 golang 中的無緩衝型 channel. 它相當於只維護了 channel 和 subscribers 的映射關係,但是每條被投遞的消息都是即來即走,並不會停留在 channel 中,於是在以下幾個場景中,都會發生消息丟失問題:
- subscriber 宕機:倘若某個 subscriber 中途宕機,則會被踢出名單,在恢復前的這段時間內,到達的消息都會徹底與這個 subscriber 無緣
-
redis 宕機:每條 publish 的消息都會第一時間分發到 subscriber 對應的內存緩衝區中,而這個緩衝區是完全基於內存實現的易失性存儲. 一旦 redis 服務端宕機,緩衝區中的數據就完全丟失且不可恢復了. 此外,pub/sub 模式下的消息數據不屬於 redis 中的基本數據類型,因此 redis 中的持久化機制 rdb 和 aof 對於 pub/sub 中的數據是完全不生效的,數據丟失的可能性大幅度提高
-
subscriber 消息積壓:由於消息數據會被放在 redis 側各 subscriber 的緩衝區 buffer 中,這部分空間是相對有限的,一旦某個 subscriber 因爲消費能力弱,導致 buffer 中的的數據發生積壓,此時 redis 很可能會自動把 subscriber 踢除下線,於是這部分數據也丟失了
針對最後這一點,subscriber 對應的緩衝區容量閾值可以在 redis.conf 文件中進行配置,其默認值爲:
client-output-buffer-limit pubsub 32mb 8mb 60
對應的含義是,倘若某個 subscriber 的緩衝區 buffer 大小達到 32MB,則 subscriber 會被踢下線;倘若緩衝區內數據量在連續 60s 內達到 8MB 大小,subscriber 也會踢下線.
聊到這裏,我們發現不論是 redis 中的 list 還是 pub/sub 功能,各自都存在着比較明顯的功能缺陷,都是無法被當作一個成熟的 mq 組件來使用的.
不過大家不用氣餒,真正的重頭戲馬上就來. 接下來我們聊到的第三種實現方案 redis streams 纔是真正意義上趨近於成熟的 mq 實現方案.
4 redis streams
從 redis 5.0 中,一個新的數據類型——streams 被推出了. 這種數據類型的目標正是直奔實現 mq 組件的功能而去的.
4.1 操作指令
首先,我們理一下,使用 redis streams 時涉及到的幾個核心操作指令.
- 生產消息
首先是用於生產消息的指令,通過 XADD 指令往 topic 中投入一組 kv 對消息. 指令文檔:https://redis.io/commands/xadd/
XADD my_streams_topic * key1 val1
"1638515664470-0"
XADD topic1 * key2 val2
"1638515672769-0"
-
my_streams_topic:topic 名稱
-
*:消息自動生成唯一標識 id,基於時間戳 + 自增序號生成
-
key1/val1、key2/val2:消息數據 kv 對
• 消費消息
接下來是用於消費消息的指令,通過 XREAD 指令從對應 topic 中獲取消息. 指令文檔:https://redis.io/commands/xread/
XREAD STREAMS my_streams_topic 0-0
1) 1) "my_streams_topic"
2) 1) 1) "1638515664470-0"
2) 1) "key1"
2) "val1"
2) 1) "1638515672769-0"
2) 1) "key2"
2) "val2"
-
my_streams_topic: topic 名稱
-
0-0:從頭開始消費. 倘若這裏填爲某條消息 id,則代表從這條消息之後(不包含這條消息)開始消費
-
阻塞模式消費消息:
streams 支持在消費時,採用阻塞模式進行消費. 倘若存在數據則即時返回處理,否則會阻塞消費流程.
# BLOCK 0 表示阻塞等待時沒有超時時間上限
XREAD BLOCK 0 STREAMS my_streams_topic 1638515672769-0
(nil)
-
BLOCK:阻塞消費模式
-
0:阻塞等待超時時間,超過這個時長會返回 nil. 設置爲 0 則表示不設置超時閾值
-
創建消費者組
streams 也支持發佈訂閱模式,能保證消息被多個消費者組 consumer group 同時消費到.
首先需要進行消費者組的創建. 指令文檔:https://redis.io/commands/xgroup-create/
XGROUP CREATE my_streams_topic my_group 0-0
OK
-
my_streams_topic:topic 名稱
-
my_group:消費者組名稱
-
0-0:從頭開始消費
-
基於消費者組消費消息
同一份數據在同一個消費者組下只會被消費到一次. 不同消費者組各自能獲取到獨立完整的消息數據.
通過 XReadGroup 指令,以消費者組的身份進行消費. 指令文檔:https://redis.io/commands/xreadgroup/
XREADGROUP GROUP my_group consumer BLOCK 0 STREAMS my_streams_topic >
1) 1) "topic1"
2) 1) 1) "1638515664470-0"
2) 1) "key1"
2) "val1"
2) 1) "1638515672769-0"
2) 1) "key2"
2) "val2"
• my_group: 消費者組名稱
• consumer:消費者名稱
• my_streams_topic:topic 名稱
• block 0: 採用阻塞等待的模式,0 代表沒有超時上限
• >: 讀最新的消息 (尚未分配給某個 consumer 的消息)
還有另一種消費模式,讀取的是已分配給當前消費者,但是還未經確認的老消息:
XREADGROUP GROUP my_group consumer STREAMS my_streams_topic 0-0
1) 1) "topic1"
2) 1) 1) "1638515664470-0"
2) 1) "key1"
2) "val1"
2) 1) "1638515672769-0"
2) 1) "key2"
2) "val2"
-
• 0-0:標識讀取已分配給當前 consumer ,但是還沒經過 xack 指令確認的消息
-
• 確認消息:
通過 xack 指令,攜帶上消費者組、topic 名稱以及消息 id,能夠完成對某條消息的確認操作. 文檔鏈接:https://redis.io/commands/xack/
127.0.0.1:6379> XACK my_streams_topic my_group 1638515664470-0
(integer) 1
-
my_streams_topic:topic 名稱
-
my_group:消費者組名稱
-
1638515664470-0:消息 id
4.2 優缺點分析
下面針對基於 redis streams 實現 mq 的優劣勢進行分析.
- 支持發佈 / 訂閱模式
redis streams 引入了消費者組 group 的概念,因此是能夠保證各個消費者組 consumer group 均能夠獲取到一份獨立而完整的消息數據.
- 數據可持久化
redis 中的 streams 和 string、list 等數據類型一樣,都能夠通過 rdb(redis database)、aof( append only file) 的持久化機制進行落盤存儲,能夠在很大程度上降低數據丟失的概率.
- 支持消費端 ack 機制
redis streams 中另一項非常重要的改進,是支持 consumer 的 ack 能力. consumer 在處理好某條消息後,能通過 xack 指令對該消息進行確認. 這樣對於沒經過 ack 確認的消息,redis streams 還是爲 consumer 保留了重新消費的能力.
- 支持消息緩存
和 pub/sub 模式不同的是,redis streams 中會實際開闢內存空間用於存儲 streams 中的數據. 因此哪怕某個 consumer group 是在消息生產之後才完成註冊操作,也能夠進行消息溯源,從 topic 起點開始執行消息的消費操作.
不過這裏需要考慮的問題是,redis 基於內存實現消息數據的存儲,倘若大量的消息數據被堆積在內存中,在資源使用上會存在很大的壓力和很高的成本,嚴重時甚至可能發生 OOM 問題.
基於此,redis streams 支持在每次投遞消息時,顯式設定一個 topic 中能緩存的數據長度,來人爲限制這個緩存空間的容量.
這裏可以通過在 XADD 指令中加上 maxlen 參數,用於指定 topic 中能緩存的數據長度:
XADD topic1 MAXLEN 10000 * key1 val1
- MAXLEN 10000:最多緩存 10000 條數據
這樣倘若 topic 數據容量超限,則新消息的到達會把最老的消息擠出隊列,意味着也可能存在數據丟失的風險,因此大家在使用時需要合理設置 maxlen 參數.
4.3 整體對比分析
到這裏爲止,最後一個實現方案 redis streams 也介紹完畢,最後我們回過頭對今天介紹過的幾類 redis 實現 mq 的方案進行一輪總結.
可以看到,在各項能力上 list 和 pub/sub 互有千秋,而 streams 可以說是兼具了各方面的優勢,稱得上是已經趨近於成熟的 mq 實現方案.
然而,大家應該能注意到,我在此處評價 streams 方案的數據丟失風險時,僅僅是評價爲 “低”,而不是 “無”,這一點我接下來會繼續加以說明.
下面我們再進一步拿 redis streams 和業界專業的 mq 組件進行對比,就以我比較熟悉的 kafka 組件爲例,來看看 redis stream 存在哪些方面的優劣:
可以看到,redis streams 在存儲介質上需要使用內存,因此消息存儲容量相對有限;且同一個 topic 的數據由於對應爲同一個 key,因此會被分發到相同節點,無法實現數據的縱向分治,因此不具備類似於 kafka 縱向分區以提高併發度的能力.
此外,很重要的一個點是,基於 redis 實現的 mq 一定是存在消息丟失的風險的. 儘管在生產端和消費端,producer/consumer 在和 mq 交互時可以通過 ack 機制保證在交互環節不出現消息的丟失,然而在 redis 本身存儲消息數據的環節就可能存在數據丟失問題,原因在於:
-
redis 數據基於內存存儲:哪怕通過最嚴格 aof 等級設置,由於持久化流程本身是異步執行的,也無法保證數據絕對不丟失
-
redis 走的是 ap 高可用流派:爲保證可用性,redis 會在一定程度上犧牲數據一致性. 在主從複製時,採用的是異步流程,倘若主節點宕機,從節點的數據可能存在滯後,這樣在主從切換時消息就可能丟失
與之相對的,kafka 只要合理設置好 ISR(In Sync Replica) 有關參數,理論上在集羣存在多數節點仍能正常運作的情況下,對應的消息數據是不會出現丟失的.
前面我們談到了 redis 相比於傳統 mq 組件的一些劣勢,現在我們再來聊聊它具備的一些優勢:就是相對輕量化,相比於傳統 mq 組件有着更低的使用和運維成本.
因此,在實際的選型過程中,我們可以根據業務訴求進行抉擇. 倘若業務流程對於數據的精度沒有特別嚴格的要求,那此時使用 redis streams 這樣一種輕量化的 mq 實現方案未嘗不是一種好的選擇和嘗試.
5 redmq——純 redis 實現的消息隊列
從第 5 章開始,我們正式進入實戰篇的部分.
遵循本文第 4 章基於 redis stream 實現 mq 的思路,我日前使用 go 語言,開源實現了一款 基於 redis 實現消息隊列的客戶端 sdk:redmq,該項目的開源地址爲:https://github.com/xiaoxuxiansheng/redmq
5.1 實現架構
內容可以分爲三個核心模塊:
-
redis client:封裝了 redis streams 相關指令,包括 XADD、XREADGROUP、XACK
-
producer:生產者,內置了 redis 客戶端,通過 XADD 指令實現消息的生產投遞
-
consumer:消費者,內置了 redis 客戶端,通過 XREADGROUP 指令實現消息的消費,通過 XACK 指令實現消息的確認
5.2 redis 客戶端
5.2.1 redigo
本項目中,redis 客戶端底層是基於開源的 golang redis 客戶端 sdk:redigo 實現的,使用的源碼版本爲 v1.8.9,該項目的開源地址爲:https://github.com/gomodule/redigo
5.2.2 客戶端類
下面是關於客戶端的實現,核心步驟通過代碼註釋加以說明:
// Client Redis 客戶端.
type Client struct {
// 用戶自定義配置
opts *ClientOptions
// redis 連接池
pool *redis.Pool
}
// 其中網絡協議、redis 地址、redis 密碼爲必填項
func NewClient(network, address, password string, opts ...ClientOption) *Client {
c := Client{
opts: &ClientOptions{
network: network,
address: address,
password: password,
},
}
// 注入用戶自定義的配置參數
for _, opt := range opts {
opt(c.opts)
}
// 對非法的配置參數進行修復
repairClient(c.opts)
// 創建 redis 連接池
pool := c.getRedisPool()
// 返回 redis 客戶端實例
return &Client{
pool: pool,
}
}
// 獲取 redis 連接池
func (c *Client) getRedisPool() *redis.Pool {
return &redis.Pool{
// 最大空閒連接數
MaxIdle: c.opts.maxIdle,
// 連接最長空閒時間
IdleTimeout: time.Duration(c.opts.idleTimeoutSeconds) * time.Second,
// 創建連接的方法
Dial: func() (redis.Conn, error) {
c, err := c.getRedisConn()
if err != nil {
return nil, err
}
return c, nil
},
// 最大活躍連接數
MaxActive: c.opts.maxActive,
// 當連接不夠時,是阻塞等待還是立即返回錯誤
Wait: c.opts.wait,
// 測試方法
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
// 獲取連接
func (c *Client) GetConn(ctx context.Context) (redis.Conn, error) {
return c.pool.GetContext(ctx)
}
// 獲取 redis 配置
func (c *Client) getRedisConn() (redis.Conn, error) {
if c.opts.address == "" {
panic("Cannot get redis address from config")
}
// 注入密碼
var dialOpts []redis.DialOption
if len(c.opts.password) > 0 {
dialOpts = append(dialOpts, redis.DialPassword(c.opts.password))
}
// 創建新的連接
return redis.DialContext(context.Background(),
c.opts.network, c.opts.address, dialOpts...)
}
5.2.3 投遞消息
下面是用於投遞消息的 XADD 指令的實現源碼:
func (c *Client) XADD(ctx context.Context, topic string, maxLen int, key, val string) (string, error) {
// topic 名稱不能爲空
if topic == "" {
return "", errors.New("redis XADD topic can't be empty")
}
// 從 redis 連接池中獲取連接
conn, err := c.pool.GetContext(ctx)
if err != nil {
return "", err
}
// 使用完畢後把連接放回連接池
defer conn.Close()
// 執行 XADD 指令,並返回生成的消息 id
return redis.String(conn.Do("XADD", topic, "MAXLEN", maxLen, "*", key, val))
}
5.2.4 消費新消息
消費消息時使用的是 redis streams 的 XREADGROUP 指令,其中又可以分爲消費新消息(尚未分配給任何 consumer 的消息)以及消費未確認的老消息(分配給當前 consumer,但是還沒經過 XACK 確認的消息).
func (c *Client) XReadGroup(ctx context.Context, groupID, consumerID, topic string, timeoutMiliSeconds int) ([]*MsgEntity, error) {
return c.xReadGroup(ctx, groupID, consumerID, topic, timeoutMiliSeconds, false)
}
在 xReadGroup 方法中,會根據用戶傳入的 pending 表示是否爲 true,代表當前是消費處理新消息還是未經確認的老消息:
func (c *Client) xReadGroup(ctx context.Context, groupID, consumerID, topic string, timeoutMiliSeconds int, pending bool) ([]*MsgEntity, error) {
// 消費者組 id、消費者 id、topic 名稱缺一不可
if groupID == "" || consumerID == "" || topic == "" {
return nil, errors.New("redis XREADGROUP groupID/consumerID/topic can't be empty")
}
// 從 redis 連接池中獲取新連接
conn, err := c.pool.GetContext(ctx)
if err != nil {
return nil, err
}
// 使用完畢後返還連接
defer conn.Close()
var rawReply interface{}
// 倘若 pending 爲 true,代表需要消費的是已分配給當前 consumer 但是還未經 xack 確認的老消息. 此時採用非阻塞模式進行處理
if pending {
rawReply, err = conn.Do("XREADGROUP", "GROUP", groupID, consumerID, "STREAMS", topic, "0-0")
} else {
// 倘若 pending 爲 false,代表需要消費的是尚未分配給任何 consumer 的新消息,此時會才用阻塞模式執行操作
rawReply, err = conn.Do("XREADGROUP", "GROUP", groupID, consumerID, "BLOCK", timeoutMiliSeconds, "STREAMS", topic, ">")
}
if err != nil {
return nil, err
}
reply, _ := rawReply.([]interface{})
if len(reply) == 0 {
return nil, ErrNoMsg
}
replyElement, _ := reply[0].([]interface{})
if len(replyElement) != 2 {
return nil, errors.New("invalid msg format")
}
// 對消費到的數據進行格式化
var msgs []*MsgEntity
rawMsgs, _ := replyElement[1].([]interface{})
for _, rawMsg := range rawMsgs {
_msg, _ := rawMsg.([]interface{})
if len(_msg) != 2 {
return nil, errors.New("invalid msg format")
}
msgID := gocast.ToString(_msg[0])
msgBody, _ := _msg[1].([]interface{})
if len(msgBody) != 2 {
return nil, errors.New("invalid msg format")
}
msgKey := gocast.ToString(msgBody[0])
msgVal := gocast.ToString(msgBody[1])
msgs = append(msgs, &MsgEntity{
MsgID: msgID,
Key: msgKey,
Val: msgVal,
})
}
return msgs, nil
}
5.2.5 重複消費未確認消息
XReadGroupPending 方法用於消費未經 XACK 確認的老消息,內部也會調用 xReadGroup 方法,但是會把 pending 標識置爲 true.
func (c *Client) XReadGroupPending(ctx context.Context, groupID, consumerID, topic string) ([]*MsgEntity, error) {
return c.xReadGroup(ctx, groupID, consumerID, topic, 0, true)
}
5.3 生產者
5.3.1 類定義
下面是關於生產者 producer 模塊的實現:
// 生產者 producer 類定義
type Producer struct {
// 內置的 redis 客戶端
client *redis.Client
// 用戶自定義的生產者配置參數
opts *ProducerOptions
}
// 生產者 producer 的構造器函數
func NewProducer(client *redis.Client, opts ...ProducerOption) *Producer {
p := Producer{
client: client,
opts: &ProducerOptions{},
}
// 注入用戶自定義的配置參數
for _, opt := range opts {
opt(p.opts)
}
// 對非法的配置參數進行修復
repairProducer(p.opts)
// 返回 producer 實例
return &p
}
有關於 producer 的一些配置項:
type ProducerOptions struct {
// topic 可以緩存的消息長度,單位:條. 當消息條數超過此數值時,會把老消息踢出隊列
msgQueueLen int
}
type ProducerOption func(opts *ProducerOptions)
func WithMsgQueueLen(len int) ProducerOption {
return func(opts *ProducerOptions) {
opts.msgQueueLen = len
}
}
// 默認爲每個 topic 保留 500 條消息
func repairProducer(opts *ProducerOptions) {
if opts.msgQueueLen <= 0 {
opts.msgQueueLen = 500
}
}
5.3.2 投遞消息
producer 投遞消息時,會使用到 redis 客戶端中的 XADD 方法:
// 生產一條消息
func (p *Producer) SendMsg(ctx context.Context, topic, key, val string) (string, error) {
return p.client.XADD(ctx, topic, p.opts.msgQueueLen, key, val)
}
5.3.3 使用示例
下面展示一個使用 producer 進行消息投遞的單測代碼示例:
import (
"context"
"testing"
"github.com/xiaoxuxiansheng/redmq"
"github.com/xiaoxuxiansheng/redmq/redis"
)
const (
// 連接 redis 的傳輸層協議,默認爲 tcp
network = "tcp"
// redis 的地址(ip:port)
address = "請輸入 redis 地址"
// redis 服務器的密碼,如果沒設密碼可以傳爲空串
password = "請輸入 redis 密碼"
// 投遞到哪個 topic
topic = "請輸入 topic 名稱"
)
func Test_Producer(t *testing.T) {
// 創建一個 redis 客戶端
client := redis.NewClient(network, address, password)
// 構造生產者 producer 實例,並限制一個 topic 最多保留 10 條消息
producer := redmq.NewProducer(client, redmq.WithMsgQueueLen(10))
ctx := context.Background()
// 投遞消息
msgID, err := producer.SendMsg(ctx, topic, "test_k", "test_v")
if err != nil {
t.Error(err)
return
}
// 打印這條消息對應的 msg id
t.Log(msgID)
}
5.4 消費者
5.4.1 類定義
下面是關於消費者 consumer 的類定義:
// 消費者 consumer 類定義
type Consumer struct {
// consumer 生命週期管理
ctx context.Context
// 停止 consumer 的控制器
stop context.CancelFunc
// 接收到 msg 時執行的回調函數,由使用方定義
callbackFunc MsgCallback
// redis 客戶端,基於 redis 實現 message queue
client *redis.Client
// 消費的 topic
topic string
// 所屬的消費者組
groupID string
// 當前節點的消費者 id
consumerID string
// 各消息累計失敗次數
failureCnts map[redis.MsgEntity]int
// 一些用戶自定義的配置
opts *ConsumerOptions
}
// 消費者 consumer 構造器函數
func NewConsumer(client *redis.Client, topic, groupID, consumerID string, callbackFunc MsgCallback, opts ...ConsumerOption) (*Consumer, error) {
// cancel context,用於提供停止 consumer 的控制器
ctx, stop := context.WithCancel(context.Background())
// 構造 consumer 實例
c := Consumer{
client: client,
ctx: ctx,
stop: stop,
callbackFunc: callbackFunc,
topic: topic,
groupID: groupID,
consumerID: consumerID,
opts: &ConsumerOptions{},
failureCnts: make(map[redis.MsgEntity]int),
}
// 校驗 consumer 中的參數,包括 topic、groupID、consumerID 都不能爲空,且 callbackFunc 不能爲空
if err := c.checkParam(); err != nil {
return nil, err
}
// 注入用戶自定義的配置項
for _, opt := range opts {
opt(c.opts)
}
// 修復非法的配置參數
repairConsumer(c.opts)
// 啓動 consumer 守護 goroutine,負責輪詢消費消息
go c.run()
// 返回 consumer 實例
return &c, nil
}
關於 consumer 的一些自定義配置項:
type ConsumerOptions struct {
// 每輪阻塞消費新消息時等待超時時長
receiveTimeout time.Duration
// 處理消息的最大重試次數,超過此次數時,消息會被投遞到死信隊列
maxRetryLimit int
// 死信隊列,可以由使用方自定義實現
deadLetterMailbox DeadLetterMailbox
// 投遞死信流程超時閾值
deadLetterDeliverTimeout time.Duration
// 處理消息流程超時閾值
handleMsgsTimeout time.Duration
}
type ConsumerOption func(opts *ConsumerOptions)
func WithReceiveTimeout(timeout time.Duration) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.receiveTimeout = timeout
}
}
func WithMaxRetryLimit(maxRetryLimit int) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.maxRetryLimit = maxRetryLimit
}
}
func WithDeadLetterMailbox(mailbox DeadLetterMailbox) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.deadLetterMailbox = mailbox
}
}
func WithDeadLetterDeliverTimeout(timeout time.Duration) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.deadLetterDeliverTimeout = timeout
}
}
func WithHandleMsgsTimeout(timeout time.Duration) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.handleMsgsTimeout = timeout
}
}
// 修復非法的配置參數
func repairConsumer(opts *ConsumerOptions) {
// 默認阻塞消費的超時時長爲 2 s
if opts.receiveTimeout < 0 {
opts.receiveTimeout = 2 * time.Second
}
// 默認同一條消息最多處理 3 次,超過此次數,則進入死信
if opts.maxRetryLimit < 0 {
opts.maxRetryLimit = 3
}
// 如果用戶沒傳入死信隊列,則使用默認的死信隊列,只會打印一下消息
if opts.deadLetterMailbox == nil {
opts.deadLetterMailbox = NewDeadLetterLogger()
}
// 投遞消息進入死信隊列的流程的超時時間
if opts.deadLetterDeliverTimeout <= 0 {
opts.deadLetterDeliverTimeout = time.Second
}
// 處理消息執行 callback 回調的超時時間
if opts.handleMsgsTimeout <= 0 {
opts.handleMsgsTimeout = time.Second
}
}
關於死信隊列的定義:
// 死信隊列,當消息處理失敗達到指定次數時,會被投遞到此處
type DeadLetterMailbox interface {
Deliver(ctx context.Context, msg *redis.MsgEntity) error
}
// 默認使用的死信隊列,僅僅對消息失敗的信息進行日誌打印
type DeadLetterLogger struct{}
func NewDeadLetterLogger() *DeadLetterLogger {
return &DeadLetterLogger{}
}
func (d *DeadLetterLogger) Deliver(ctx context.Context, msg *redis.MsgEntity) error {
log.ErrorContextf(ctx, "msg fail execeed retry limit, msg id: %s", msg.MsgID)
return nil
}
接下來是 consumer 在消費消息後執行的回調函數:
// 接收到消息後執行的回調函數
type MsgCallback func(ctx context.Context, msg *redis.MsgEntity) error
5.4.2 消費消息
接下來是 consumer goroutine 輪詢消費消息的流程,主要包括三個步驟:
-
• 消費新消息
-
• 投遞失敗次數超限的老消息進入死信
-
• 消費未確認老消息
對應流程示意如下:
下面是源碼部分:
// 該方法會在 consumer 構造器函數中被異步啓動
func (c *Consumer) run() {
// 通過 for 循環實現自旋模型
for {
// select 多路複用,保證在 Consumer stop 方法被執行時,該 goroutine 能及時退出
select {
case <-c.ctx.Done():
return
default:
}
// 接收處理新消息
msgs, err := c.receive()
if err != nil {
log.ErrorContextf(c.ctx, "receive msg failed, err: %v", err)
continue
}
// 接收到新消息後,執行對應的 callback 回調方法
tctx, _ := context.WithTimeout(c.ctx, c.opts.handleMsgsTimeout)
c.handlerMsgs(tctx, msgs)
// 把失敗次數超限的老消息投遞到死信隊列
tctx, _ = context.WithTimeout(c.ctx, c.opts.deadLetterDeliverTimeout)
c.deliverDeadLetter(tctx)
// 接收之前就已分配給當前 consumer,但是還未得到 xack 確認的老消息
pendingMsgs, err := c.receivePending()
if err != nil {
log.ErrorContextf(c.ctx, "pending msg received failed, err: %v", err)
continue
}
// 接收到老消息後,執行對應的 callback 回調方法
tctx, _ = context.WithTimeout(c.ctx, c.opts.handleMsgsTimeout)
c.handlerMsgs(tctx, pendingMsgs)
}
}
consumer 在消費新消息時,使用到 redis 客戶端的 XReadGroup 方法:
func (c *Consumer) receive() ([]*redis.MsgEntity, error) {
msgs, err := c.client.XReadGroup(c.ctx, c.groupID, c.consumerID, c.topic, int(c.opts.receiveTimeout.Milliseconds()))
if err != nil && !errors.Is(err, redis.ErrNoMsg) {
return nil, err
}
return msgs, nil
}
consumer 在消費未經確認的老消息時,使用到 redis 客戶端的 XReadGroupPending 方法:
func (c *Consumer) receivePending() ([]*redis.MsgEntity, error) {
pendingMsgs, err := c.client.XReadGroupPending(c.ctx, c.groupID, c.consumerID, c.topic)
if err != nil && !errors.Is(err, redis.ErrNoMsg) {
return nil, err
}
return pendingMsgs, nil
}
5.4.3 執行回調
在 consumer 消費到消息後,需要執行對應的 callback 函數,此時倘若 callback 執行成功了,則需要調用 xack 方法進行確認答覆;倘若 callback 執行失敗,則需要對失敗次數進行累加,倘若失敗次數達到上限,則這條消息最終會被投遞到死信隊列中:
func (c *Consumer) handlerMsgs(ctx context.Context, msgs []*redis.MsgEntity) {
for _, msg := range msgs {
if err := c.callbackFunc(ctx, msg); err != nil {
// 失敗計數器累加
c.failureCnts[*msg]++
continue
}
// callback 執行成功,進行 ack
if err := c.client.XACK(ctx, c.topic, c.groupID, msg.MsgID); err != nil {
// ack 失敗的情況需要關注
log.ErrorContextf(ctx, "msg ack failed, msg id: %s, err: %v", msg.MsgID, err)
continue
}
// ack 成功了,從 map中清零對應消息的失敗次數
delete(c.failureCnts, *msg)
}
}
5.4.4 投遞死信
倘若某條消息失敗次數達到上限,則會被投遞到死信隊列中,對應源碼如下:
func (c *Consumer) deliverDeadLetter(ctx context.Context) {
// 對於失敗達到指定次數的消息,投遞到死信中,然後執行 ack
for msg, failureCnt := range c.failureCnts {
if failureCnt < c.opts.maxRetryLimit {
continue
}
// 投遞死信隊列
if err := c.opts.deadLetterMailbox.Deliver(ctx, &msg); err != nil {
log.ErrorContextf(c.ctx, "dead letter deliver failed, msg id: %s, err: %v", msg.MsgID, err)
}
// 對於被投遞到死信隊列的消息,需要執行 ack 操作,後續不需要再通過常規流程重複處理了
if err := c.client.XACK(ctx, c.topic, c.groupID, msg.MsgID); err != nil {
log.ErrorContextf(c.ctx, "msg ack failed, msg id: %s, err: %v", msg.MsgID, err)
continue
}
// 對於 ack 成功的消息,將其從 failure map 中刪除
delete(c.failureCnts, msg)
}
}
5.4.5 停止消費
倘若使用方需要提前終止 consumer 的消費流程,則可以調用 Consumer.Stop 方法. 方法內部會執行 consumer 中內置的 stop 函數,本質上是 context 中的 cancel 函數,會通過停止 consumer 中 context 的方式,保證 consumer 的運行 goroutine 能正常退出.
// 停止 consumer
func (c *Consumer) Stop() {
c.stop()
}
5.4.6 使用示例
在進行 consumer 消費之前,有兩個操作是需要前置執行的:
-
對應的 topic 需要提前創建好. topic 創建時機是通過首次執行 XADD 指令完成的
-
對應的 group 需要提前創建好. 可以通過 XGROUP CREATE 指令完成創建:
XGROUP CREATE my_streams_topic my_group 0-0
OK
下面給出使用 consumer 進行消費的單測示例代碼.
import (
"context"
"testing"
"time"
"github.com/xiaoxuxiansheng/redmq"
"github.com/xiaoxuxiansheng/redmq/redis"
)
const (
// redis 傳輸層協議
network = "tcp"
// redis 服務器地址:ip:port
address = "請輸入 redis 地址"
// redis 服務器密碼,沒有密碼則設爲空串
password = "請輸入 redis 密碼"
// topic 名稱
topic = "請輸入 topic 名稱"
// 消費者組 id
consumerGroup = "請輸入消費者組名稱"
// 消費者 id
consumerID = "請輸入消費者名稱"
)
// 用戶自定義實現的死信隊列
type DemoDeadLetterMailbox struct {
do func(msg *redis.MsgEntity)
}
func NewDemoDeadLetterMailbox(do func(msg *redis.MsgEntity)) *DemoDeadLetterMailbox {
return &DemoDeadLetterMailbox{
do: do,
}
}
// 死信隊列接收消息的處理方法
func (d *DemoDeadLetterMailbox) Deliver(ctx context.Context, msg *redis.MsgEntity) error {
d.do(msg)
return nil
}
func Test_Consumer(t *testing.T) {
// 創建 redis 客戶端
client := redis.NewClient(network, address, password)
// consumer 接收到消息後的執行的 callback 回調處理函數
callbackFunc := func(ctx context.Context, msg *redis.MsgEntity) error {
t.Logf("receive msg, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
return nil
}
// 創建自定義實現的死信隊列實例
demoDeadLetterMailbox := NewDemoDeadLetterMailbox(func(msg *redis.MsgEntity) {
t.Logf("receive dead letter, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
})
// 構造並啓動消費者 consumer 實例
consumer, err := redmq.NewConsumer(client, topic, consumerGroup, consumerID, callbackFunc,
// 每條消息最多重試處理 2 次
redmq.WithMaxRetryLimit(2),
// 每輪接收消息的阻塞等待超時時間爲 2 s
redmq.WithReceiveTimeout(2*time.Second),
// 注入自定義實現的死信隊列
redmq.WithDeadLetterMailbox(demoDeadLetterMailbox))
if err != nil {
t.Error(err)
return
}
// 程序退出前停止 consumer
defer consumer.Stop()
// 十秒後退出單測程序
<-time.After(10 * time.Second)
}
至此,正文內容全部結束.
6 總結
本期和大家一起探討了如何基於 redis 實現消息隊列,其中實現方案包括三類:
-
redis list:最簡單粗暴的實現,存在問題包括:不支持發佈 / 訂閱模式、消費端缺少 ack 機制
-
redis pub/sub:支持發佈 / 訂閱模式,有較高的丟數據風險,消費端同樣不支持 ack 機制
-
redis streams:趨近於成熟的 mq 實現方式. 支持發佈 / 訂閱模式,消費端能支持 ack 機制. 但是受限於 redis 自身的特性,仍無法杜絕丟失數據的可能性
最後,本人使用 go 語言,基於 redis streams 開源實現了一款 mq 客戶端 sdk,開源地址:https://github.com/xiaoxuxiansheng/redmq,歡迎大家批評指正.
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/MSmipbE5cyK2_m5iiKv7pw