萬字長文解析如何基於 Redis 實現消息隊列

0 前言

去年年底,和大家分享了一個個人項目的實現方案——基於協程池架構實現分佈式定時器 XTimer.

在這次分享中我有提到, xtimer 前身是基於消息隊列架構實現的 workflow timer. 之所以在演進過程中把實現方式改爲協程池,主要是考慮到消息隊列組件存在比較高的使用和維護成本. 然而從技術流程本身出發,基於消息隊列的實現方式實際上對於分佈式定時器的核心流程解耦以及縱向架構擴展都是有利的.

1 消息隊列

要想把本期分享話題聊到位,首先我們需要理清楚,一個合格的消息隊列(MQ,message queue)應該具備哪些核心能力.

1.1 核心能力

談到消息隊列,其最重要的兩項能力就是解耦和削峯.

針對於 mq 的解耦功能而言,這裏舉個生活中的例子來幫助大家進一步理解這個概念.

假設我們在網上購買了一些商品. 在上班開會的時候,快遞小哥打通了我們的電話,告知物品已經送達到我們的家門口.

這個時候,倘若這個快遞小哥是個比較耿直的人,強烈要求我們必須立刻過去當面簽收,這個流程才能結束. 那此時的我們就尬住了. 我們得和領導打個招呼,趕回家去簽收快遞. 在我們到家之前,快遞小哥也必須一直守在原地等我們回去簽收交接. 整個流程是比較僵硬的,在這一次交接動作完成之前,雙方都沒辦法再靈活處理其他事項了.

上述例子就類似於我們在業務流程中基於 http/rpc 發起的一次同步請求,上游(快遞小哥)在發出請求後(打電話),會阻塞等待下游(作爲簽收方的我們)給到反饋(完成簽收操作),否則整個流程會一直阻塞住.

然而在實際場景中,我們知道還有一個叫作 “快遞超市” 的存在. 當快遞到達時,快遞小哥可以將我們的物品先存放在快遞超市中,登記好接收方的個人信息後,並給接收方發完通知短信後,快遞小哥就可以先撤離現場,去忙活其他事情了. 接下來,快遞超市會爲接收方承擔起託管快遞的職責,接收方只需要選擇在合適的時間去快遞超市收取物品即可.

這個流程相比之下就顯得靈活很多,由於有快遞超市這個緩衝區的存在,使得我們和快遞小哥之間的交互流程能夠實現解耦. 在這個流程中,快遞小哥就類似於生產者 producer,我們作爲接收方,類似於消費者 consumer,而負責承上啓下、託管快遞的快遞超市則類似於消息隊列 mq.

聊完了這個生活場景,我們再從技術視角出發,對 mq 所帶來的解耦能力進行一輪闡述:

下面我們再用同樣的例子說明一下消息隊列的另一項核心功能——削峯.

假設現在正值雙十一時期,我們剁手一通買買買,導致同時有大量的快遞在同一個時段到達. 這時候,快遞超市就爲我們起到 “削峯” 的效果. 不論快遞數量的多少,我們不用第一時間立刻進行響應處理,而是能夠選擇在合適的時間到達快遞超市進行取件. 如果快遞數量很大,我們一次拿不完的話,我們也可以量力而行,每次只收取一部分,分成多個批次處理.

這個流程就類似於 mq 所帶來的消息削峯的能力. 在實際的生產環境中,倘若上游請求量很大,而下游都需要第一時間進行同步響應的話,這對於下游系統可能產生很大的負荷. 此時如果能把同步流程轉爲異步,把消息放到 mq 組件中進行一輪緩衝,讓下游可以根據自身的處理能力,按照自己的節奏消化這部分積攢的流量,這對於下游系統來說能起到很好的保護作用.

前面是從宏觀功能的視角出發,聊到了 mq 對應的異步流程所具備的優勢. 下面我們再談一談,作爲 mq 組件需要具備哪些基礎能力:

針對於上述第二點,各 mq 組件在實現上大抵上是基於數據落盤 + 數據備份的方式保證的.

而針對於上述的一、三點,則是通過兩個交互環節中的 ack 機制保證的. 以 producer 投遞 msg 到 mq 的環節爲例,只要 mq 沒有給到投遞成功的 ack 反饋,那麼 producer 就應該把本次投遞流程視爲失敗,執行重新投遞的操作. consumer 的消費流程同樣如此.

因此,mq 交互流程主要通過 ack 機制保證消息投遞以及消費環節做到 at least once(至少一次)的語義,然而無法保證消息不重複的問題. 因此,處於最下游的消費者 consumer 需要能夠具備消息冪等去重的能力,避免流程被重複處理.

另一項能力是支持消息的存儲. 以我們前面提到的取快遞的例子來說,快遞超市需要有一個實體店面,店面具有着一定的容量能夠存放一定數量的快遞. 這樣當下遊 consumer 沒來得及第一時間消費消息時,消息能緩存在 mq 組件中一段時間,讓消費方自由選擇合適的時間過來進行消費.

1.2 流程類型

我們定義 mq 類型時,可以從多個維度出發. 這裏我主要根據 consumer 消費的流程,將 mq 分爲 push 型和 pull 型.

push 型指的是當 producer 將消息投遞到 mq 時,由 mq 負責將消息以推送的形式主動發送給各個建立了訂閱關係的消費方.

pull 型指的是當 mq 中存在消息時,由 consumer 主動執行拉取消息的操作.

關於以上兩種 mq 類型,在這裏個人有一些比較淺顯的認知:

對於 push 型,存在的優勢是:

劣勢:

對於 pull 型則剛好相反:

1.3 redis 實現 mq 的問題

理清楚 mq 的核心功能和訴求後,下面我們先明確一下基於 redis 實現 mq 存在的一類通用問題:

redis 本身是基於內存實現的緩存組件,因此在存儲消息時總容量相對有限.

此外,redis 存儲消息時會不可避免地存在數據丟失的風險,可以從兩個方面出發考慮:

以上問題,不論是在 redis 緩存數據還是實現 mq 的流程中都是存在的,這個問題我們在選型使用 redis 時需要做到了然於心,這一點在後續 4.3 小節中會進一步展開說明.

2 redis list

基於 redis 實現 mq 的方式之一是使用 redis 中的 list 結構.

redis 中的 list 是一個雙向鏈表,天然契合 mq 中的 queue 隊列模型. 我們在使用 redis list 實現 mq 時,可以生產消息的流程具象化爲一次將數據追加到 list 尾部的操作;同時,我們可以把消費消息的流程具象化爲一次從 list 頭部摘取數據的操作. 如此這般,一個簡易版的消息隊列就實現了.

2.1 操作指令

下面我們對涉及到的幾個指令展開介紹:

首先,在使用 list 充當消息隊列時,list 對應的 key 則對應爲消息的 topic 名稱.

producer 在投遞消息時,可以使用 lpush 指令,對應的時間複雜度爲 O(1). 指令文檔鏈接:https://redis.io/commands/lpush/

127.0.0.1:6379> lpush my_list_topic msg
(integer) 1

consumer 消費消息時,使用 rpop 指令,對應的時間複雜度 O(1). 指令文檔鏈接:https://redis.io/commands/rpop/

127.0.0.1:6379> rpop my_list_topic
"msg"

2.2 消費流程分析

在上述流程中,存在的第一個問題是,consumer 的輪詢消費流程應該如何組織.

這種基於 list 實現的 mq 是屬於 pull 類型. 消費方自行組織流程,並在合適的時機通過 rpop 執行進行消息的主動拉取.

首先,consumer 在消費時,一定是一個類似於 loop thread 的自旋模型,每一輪循環中,通過 rpop 指令嘗試從 list 中讀取消息,如果成功讀取到了消息,則進行相應的邏輯處理.

然而在此處,需要注意的是,redis 的 rpop 指令是非阻塞型的,即在 list 沒有數據時,也會即時返回一個結果爲 nil 的響應,這樣我們在組織這段自旋程序的時候就顯得有些尷尬了:

在這個過程中,最理想的實現方案是,在 list 中有數據到達時,我們令 consumer 即時獲取到對應的結果;倘若 list 數據爲空,則令 consumer 陷入阻塞等待的狀態,直到有數據抵達時程序才被喚醒.

所幸,這個訴求在 redis list 中是可以滿足的. 我們可以使用 redis 中的 brpop 指令替代 rpop 指令,做到在有數據時才返回響應,否則令當前程序陷入阻塞. brpop 指令文檔:https://redis.io/commands/brpop/

127.0.0.1:6379> brpop my_list_topic 0
1) "my_list_topic"
2) "msg"

2.3 侷限性分析

即便我們採用 brpop 解決了 consumer 合理阻塞消費數據的問題,這種基於 redis list 實現的 mq 仍然不能稱爲一個成熟的實現方案,其中主要存在着以下幾項缺陷:

list 中的數據是獨一份的,被 pop 出去後就不復存在了.

因此 redis 中的 list 是無法支持 mq 中的發佈 / 訂閱模式的,即下游倘若存在多個獨立的消費者組 consumer group,各自都需要獨立獲取一份完整的數據,那麼此時通過 redis list 是無法滿足這個訴求的.

consumer 通過 brpop 獲取到數據後,倘若發生宕機或者其他意外錯誤,沒有一種有效的手段能給予 mq 一個消息處理失敗的反饋. 這條消息一旦從 list 中被取走,就不再有機會被重新獲取了,因此在這個場景下,消息就真的丟失了.

3 redis pub/sub

爲解決 redis list 存在的無法支持發佈 / 訂閱模式的問題,redis 提供了 pub/sub 機制,能夠有效地彌補這方面的缺陷.

pub/sub 全稱爲 publish/subscribe,顧名思義,指的正是消息隊列中的發佈 / 訂閱模式.

爲了貼合 pub/sub 的語義,在本章中,我們統一把生產者 producer 稱爲 publisher,消費者 consumer 稱爲 subscriber,大家留意一下.

在實現上,pub/sub 會在 publisher 和 subscriber 之間建立一個用於實時通訊的信道——channel. 在傳遞消息時,會根據 channel 查找到所有建立過訂閱關係的 subscriber,一一將消息送達到它們手中.

3.1 操作指令

下面我們對使用 pub/sub 實現 mq 流程涉及到的幾個核心指令進行介紹.

首先,消費方 subscriber 通過 subscribe 指令建立對某個 channel 的訂閱關係. 指令文檔:https://redis.io/commands/subscribe/

127.0.0.1:6379> subscribe my_channel_topic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "my_channel_topic"
3) (integer) 1

每個通過 subscribe 指令建立 channel 訂閱關係的使用方都會被視爲一個獨立的 subscriber,後續 channel 中有消息到達時,會被複製成多份,一一推送到各個 subscriber 手中.

生產方 publisher 通過 publish 指令往對應的 channel 中執行消息投遞操作. 指令文檔:https://redis.io/commands/publish/

127.0.0.1:6379> publish my_channel_topic msg
(integer) 1

此時,之前對這個 channel 執行過 subscribe 操作的 subscriber 都會接收到這則消息:

1) "message"
2) "my_channel_topic"
3) "msg"

此外,值得一提的是,消費者通過 subscribe 指令會對 channel 採用阻塞模式進行監聽,只有在有消息到來時,纔會從阻塞狀態中被喚醒.

3.2 實現原理

下面我們對 redis pub/sub 模式的實現原理進行介紹. 在理清原理後,我們才能對這個流程中存在的問題進行更爲通透的認識和分析.

基於這個流程,我們能看出來,pub/sub 對於 channel 以及 subscribers 之間的實時映射關係存在強依賴. 因此在操作的執行順序上,我們需要保證先執行 subscribe 指令,再執行 publish 執行,否則前幾筆 publish 投遞的數據就會因爲不存在 subscriber 而被直接丟棄.

3.3 優缺點分析

理完了 pub/sub 的實現原理,下面我們可以對其存在的優劣勢進行分析了.

首先,pub/sub 模式最大的優勢就是能夠支持發佈 / 訂閱能力,同一份消息會被推送給所有通過 subscribe 操作訂閱了該 channel 的 subscriber.

然而,pub/sub 存在的問題是很顯著的,就是丟消息問題. 這個問題可以從多個維度展開:

這個問題和 redis list 相同, subscriber 在獲取到消息後,沒有對應的 ack 機制,因此倘若處理失敗,想要執行消息的重放操作是無法做到的

我覺得 redis pub/sub 機制就有點類似於 golang 中的無緩衝型 channel. 它相當於只維護了 channel 和 subscribers 的映射關係,但是每條被投遞的消息都是即來即走,並不會停留在 channel 中,於是在以下幾個場景中,都會發生消息丟失問題:

針對最後這一點,subscriber 對應的緩衝區容量閾值可以在 redis.conf 文件中進行配置,其默認值爲:

client-output-buffer-limit pubsub 32mb 8mb 60

對應的含義是,倘若某個 subscriber 的緩衝區 buffer 大小達到 32MB,則 subscriber 會被踢下線;倘若緩衝區內數據量在連續 60s 內達到 8MB 大小,subscriber 也會踢下線.

聊到這裏,我們發現不論是 redis 中的 list 還是 pub/sub 功能,各自都存在着比較明顯的功能缺陷,都是無法被當作一個成熟的 mq 組件來使用的.

不過大家不用氣餒,真正的重頭戲馬上就來. 接下來我們聊到的第三種實現方案 redis streams 纔是真正意義上趨近於成熟的 mq 實現方案.

4 redis streams

從 redis 5.0 中,一個新的數據類型——streams 被推出了. 這種數據類型的目標正是直奔實現 mq 組件的功能而去的.

4.1 操作指令

首先,我們理一下,使用 redis streams 時涉及到的幾個核心操作指令.

首先是用於生產消息的指令,通過 XADD 指令往 topic 中投入一組 kv 對消息. 指令文檔:https://redis.io/commands/xadd/

XADD my_streams_topic * key1 val1
"1638515664470-0"
XADD topic1 * key2 val2
"1638515672769-0"

• 消費消息

接下來是用於消費消息的指令,通過 XREAD 指令從對應 topic 中獲取消息. 指令文檔:https://redis.io/commands/xread/

XREAD STREAMS my_streams_topic 0-0
1) 1) "my_streams_topic"
   2) 1) 1) "1638515664470-0"
         2) 1) "key1"
            2) "val1"
      2) 1) "1638515672769-0"
         2) 1) "key2"
            2) "val2"

streams 支持在消費時,採用阻塞模式進行消費. 倘若存在數據則即時返回處理,否則會阻塞消費流程.

# BLOCK 0 表示阻塞等待時沒有超時時間上限
XREAD BLOCK 0 STREAMS my_streams_topic 1638515672769-0
(nil)

streams 也支持發佈訂閱模式,能保證消息被多個消費者組 consumer group 同時消費到.

首先需要進行消費者組的創建. 指令文檔:https://redis.io/commands/xgroup-create/

XGROUP CREATE my_streams_topic my_group 0-0
OK

同一份數據在同一個消費者組下只會被消費到一次. 不同消費者組各自能獲取到獨立完整的消息數據.

通過 XReadGroup 指令,以消費者組的身份進行消費. 指令文檔:https://redis.io/commands/xreadgroup/

XREADGROUP GROUP my_group consumer BLOCK 0 STREAMS my_streams_topic >
1) 1) "topic1"
   2) 1) 1) "1638515664470-0"
         2) 1) "key1"
            2) "val1"
      2) 1) "1638515672769-0"
         2) 1) "key2"
            2) "val2"

     • my_group: 消費者組名稱

     • consumer:消費者名稱

     • my_streams_topic:topic 名稱

     • block 0: 採用阻塞等待的模式,0 代表沒有超時上限

     • >: 讀最新的消息 (尚未分配給某個 consumer 的消息)

還有另一種消費模式,讀取的是已分配給當前消費者,但是還未經確認的老消息:

XREADGROUP GROUP my_group consumer STREAMS my_streams_topic 0-0
1) 1) "topic1"
   2) 1) 1) "1638515664470-0"
         2) 1) "key1"
            2) "val1"
      2) 1) "1638515672769-0"
         2) 1) "key2"
            2) "val2"

通過 xack 指令,攜帶上消費者組、topic 名稱以及消息 id,能夠完成對某條消息的確認操作. 文檔鏈接:https://redis.io/commands/xack/

127.0.0.1:6379> XACK my_streams_topic my_group 1638515664470-0
(integer) 1

4.2 優缺點分析

下面針對基於 redis streams 實現 mq 的優劣勢進行分析.

redis streams 引入了消費者組 group 的概念,因此是能夠保證各個消費者組 consumer group 均能夠獲取到一份獨立而完整的消息數據.

redis 中的 streams 和 string、list 等數據類型一樣,都能夠通過 rdb(redis database)、aof( append only file) 的持久化機制進行落盤存儲,能夠在很大程度上降低數據丟失的概率.

redis streams 中另一項非常重要的改進,是支持 consumer 的 ack 能力. consumer 在處理好某條消息後,能通過 xack 指令對該消息進行確認. 這樣對於沒經過 ack 確認的消息,redis streams 還是爲 consumer 保留了重新消費的能力.

和 pub/sub 模式不同的是,redis streams 中會實際開闢內存空間用於存儲 streams 中的數據. 因此哪怕某個 consumer group 是在消息生產之後才完成註冊操作,也能夠進行消息溯源,從 topic 起點開始執行消息的消費操作.

不過這裏需要考慮的問題是,redis 基於內存實現消息數據的存儲,倘若大量的消息數據被堆積在內存中,在資源使用上會存在很大的壓力和很高的成本,嚴重時甚至可能發生 OOM 問題.

基於此,redis streams 支持在每次投遞消息時,顯式設定一個 topic 中能緩存的數據長度,來人爲限制這個緩存空間的容量.

這裏可以通過在 XADD 指令中加上 maxlen 參數,用於指定 topic 中能緩存的數據長度:

XADD topic1 MAXLEN 10000 * key1 val1

這樣倘若 topic 數據容量超限,則新消息的到達會把最老的消息擠出隊列,意味着也可能存在數據丟失的風險,因此大家在使用時需要合理設置 maxlen 參數.

4.3 整體對比分析

到這裏爲止,最後一個實現方案 redis streams 也介紹完畢,最後我們回過頭對今天介紹過的幾類 redis 實現 mq 的方案進行一輪總結.

HieK7S

可以看到,在各項能力上 list 和 pub/sub 互有千秋,而 streams 可以說是兼具了各方面的優勢,稱得上是已經趨近於成熟的 mq 實現方案.

然而,大家應該能注意到,我在此處評價 streams 方案的數據丟失風險時,僅僅是評價爲 “低”,而不是 “無”,這一點我接下來會繼續加以說明.

下面我們再進一步拿 redis streams 和業界專業的 mq 組件進行對比,就以我比較熟悉的 kafka 組件爲例,來看看 redis stream 存在哪些方面的優劣:

YlA10A

可以看到,redis streams 在存儲介質上需要使用內存,因此消息存儲容量相對有限;且同一個 topic 的數據由於對應爲同一個 key,因此會被分發到相同節點,無法實現數據的縱向分治,因此不具備類似於 kafka 縱向分區以提高併發度的能力.

此外,很重要的一個點是,基於 redis 實現的 mq 一定是存在消息丟失的風險的. 儘管在生產端和消費端,producer/consumer 在和 mq 交互時可以通過 ack 機制保證在交互環節不出現消息的丟失,然而在 redis 本身存儲消息數據的環節就可能存在數據丟失問題,原因在於:

與之相對的,kafka 只要合理設置好 ISR(In Sync Replica) 有關參數,理論上在集羣存在多數節點仍能正常運作的情況下,對應的消息數據是不會出現丟失的.

前面我們談到了 redis 相比於傳統 mq 組件的一些劣勢,現在我們再來聊聊它具備的一些優勢:就是相對輕量化,相比於傳統 mq 組件有着更低的使用和運維成本.

因此,在實際的選型過程中,我們可以根據業務訴求進行抉擇. 倘若業務流程對於數據的精度沒有特別嚴格的要求,那此時使用 redis streams 這樣一種輕量化的 mq 實現方案未嘗不是一種好的選擇和嘗試.

5 redmq——純 redis 實現的消息隊列

從第 5 章開始,我們正式進入實戰篇的部分.

遵循本文第 4 章基於 redis stream 實現 mq 的思路,我日前使用 go 語言,開源實現了一款 基於 redis 實現消息隊列的客戶端 sdk:redmq,該項目的開源地址爲:https://github.com/xiaoxuxiansheng/redmq

5.1 實現架構

內容可以分爲三個核心模塊:

5.2 redis 客戶端

5.2.1 redigo

本項目中,redis 客戶端底層是基於開源的 golang redis 客戶端 sdk:redigo 實現的,使用的源碼版本爲 v1.8.9,該項目的開源地址爲:https://github.com/gomodule/redigo

5.2.2 客戶端類

下面是關於客戶端的實現,核心步驟通過代碼註釋加以說明:

// Client Redis 客戶端.
type Client struct {
    // 用戶自定義配置
    opts *ClientOptions
    // redis 連接池
    pool *redis.Pool
}


// 其中網絡協議、redis 地址、redis 密碼爲必填項
func NewClient(network, address, password string, opts ...ClientOption) *Client {
    c := Client{
        opts: &ClientOptions{
            network:  network,
            address:  address,
            password: password,
        },
    }


    // 注入用戶自定義的配置參數
    for _, opt := range opts {
        opt(c.opts)
    }


    // 對非法的配置參數進行修復
    repairClient(c.opts)


    // 創建 redis 連接池
    pool := c.getRedisPool()
    // 返回 redis 客戶端實例
    return &Client{
        pool: pool,
    }
}


// 獲取 redis 連接池
func (c *Client) getRedisPool() *redis.Pool {
    return &redis.Pool{
        // 最大空閒連接數
        MaxIdle:     c.opts.maxIdle,
        // 連接最長空閒時間
        IdleTimeout: time.Duration(c.opts.idleTimeoutSeconds) * time.Second,
        // 創建連接的方法
        Dial: func() (redis.Conn, error) {
            c, err := c.getRedisConn()
            if err != nil {
                return nil, err
            }
            return c, nil
        },
        // 最大活躍連接數
        MaxActive: c.opts.maxActive,
        // 當連接不夠時,是阻塞等待還是立即返回錯誤
        Wait:      c.opts.wait,
        // 測試方法
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            _, err := c.Do("PING")
            return err
        },
    }
}


// 獲取連接
func (c *Client) GetConn(ctx context.Context) (redis.Conn, error) {
    return c.pool.GetContext(ctx)
}


// 獲取 redis 配置
func (c *Client) getRedisConn() (redis.Conn, error) {
    if c.opts.address == "" {
        panic("Cannot get redis address from config")
    }


    // 注入密碼
    var dialOpts []redis.DialOption
    if len(c.opts.password) > 0 {
        dialOpts = append(dialOpts, redis.DialPassword(c.opts.password))
    }
    // 創建新的連接
    return redis.DialContext(context.Background(),
        c.opts.network, c.opts.address, dialOpts...)
}

5.2.3 投遞消息

下面是用於投遞消息的 XADD 指令的實現源碼:

func (c *Client) XADD(ctx context.Context, topic string, maxLen int, key, val string) (string, error) {
    // topic 名稱不能爲空
    if topic == "" {
        return "", errors.New("redis XADD topic can't be empty")
    }
    // 從 redis 連接池中獲取連接
    conn, err := c.pool.GetContext(ctx)
    if err != nil {
        return "", err
    }
    // 使用完畢後把連接放回連接池
    defer conn.Close()
    // 執行 XADD 指令,並返回生成的消息 id
    return redis.String(conn.Do("XADD", topic, "MAXLEN", maxLen, "*", key, val))
}

5.2.4 消費新消息

消費消息時使用的是 redis streams 的 XREADGROUP 指令,其中又可以分爲消費新消息(尚未分配給任何 consumer 的消息)以及消費未確認的老消息(分配給當前 consumer,但是還沒經過 XACK 確認的消息).

func (c *Client) XReadGroup(ctx context.Context, groupID, consumerID, topic string, timeoutMiliSeconds int) ([]*MsgEntity, error) {
    return c.xReadGroup(ctx, groupID, consumerID, topic, timeoutMiliSeconds, false)
}

在 xReadGroup 方法中,會根據用戶傳入的 pending 表示是否爲 true,代表當前是消費處理新消息還是未經確認的老消息:

func (c *Client) xReadGroup(ctx context.Context, groupID, consumerID, topic string, timeoutMiliSeconds int, pending bool) ([]*MsgEntity, error) {
    // 消費者組 id、消費者 id、topic 名稱缺一不可
    if groupID == "" || consumerID == "" || topic == "" {
        return nil, errors.New("redis XREADGROUP groupID/consumerID/topic can't be empty")
    }
    // 從 redis 連接池中獲取新連接
    conn, err := c.pool.GetContext(ctx)
    if err != nil {
        return nil, err
    }
    // 使用完畢後返還連接
    defer conn.Close()


    var rawReply interface{}
    // 倘若 pending 爲 true,代表需要消費的是已分配給當前 consumer 但是還未經 xack 確認的老消息. 此時採用非阻塞模式進行處理
    if pending {
        rawReply, err = conn.Do("XREADGROUP""GROUP", groupID, consumerID, "STREAMS", topic, "0-0")
    } else {
    // 倘若 pending 爲 false,代表需要消費的是尚未分配給任何 consumer 的新消息,此時會才用阻塞模式執行操作
        rawReply, err = conn.Do("XREADGROUP""GROUP", groupID, consumerID, "BLOCK", timeoutMiliSeconds, "STREAMS", topic, ">")
    }


    if err != nil {
        return nil, err
    }
    reply, _ := rawReply.([]interface{})
    if len(reply) == 0 {
        return nil, ErrNoMsg
    }




    replyElement, _ := reply[0].([]interface{})
    if len(replyElement) != 2 {
        return nil, errors.New("invalid msg format")
    }


    // 對消費到的數據進行格式化
    var msgs []*MsgEntity
    rawMsgs, _ := replyElement[1].([]interface{})
    for _, rawMsg := range rawMsgs {
        _msg, _ := rawMsg.([]interface{})
        if len(_msg) != 2 {
            return nil, errors.New("invalid msg format")
        }
        msgID := gocast.ToString(_msg[0])
        msgBody, _ := _msg[1].([]interface{})
        if len(msgBody) != 2 {
            return nil, errors.New("invalid msg format")
        }
        msgKey := gocast.ToString(msgBody[0])
        msgVal := gocast.ToString(msgBody[1])
        msgs = append(msgs, &MsgEntity{
            MsgID: msgID,
            Key:   msgKey,
            Val:   msgVal,
        })
    }


    return msgs, nil
}

5.2.5 重複消費未確認消息

XReadGroupPending 方法用於消費未經 XACK 確認的老消息,內部也會調用 xReadGroup 方法,但是會把 pending 標識置爲 true.

func (c *Client) XReadGroupPending(ctx context.Context, groupID, consumerID, topic string) ([]*MsgEntity, error) {
    return c.xReadGroup(ctx, groupID, consumerID, topic, 0, true)
}

5.3 生產者

5.3.1 類定義

下面是關於生產者 producer 模塊的實現:

// 生產者 producer 類定義
type Producer struct {
    // 內置的 redis 客戶端
    client *redis.Client
    // 用戶自定義的生產者配置參數
    opts   *ProducerOptions
}


// 生產者 producer 的構造器函數
func NewProducer(client *redis.Client, opts ...ProducerOption) *Producer {
    p := Producer{
        client: client,
        opts:   &ProducerOptions{},
    }


    // 注入用戶自定義的配置參數
    for _, opt := range opts {
        opt(p.opts)
    }
    
    // 對非法的配置參數進行修復
    repairProducer(p.opts)


    // 返回 producer 實例
    return &p
}

有關於 producer 的一些配置項:

type ProducerOptions struct {
    // topic 可以緩存的消息長度,單位:條. 當消息條數超過此數值時,會把老消息踢出隊列
    msgQueueLen int
}


type ProducerOption func(opts *ProducerOptions)


func WithMsgQueueLen(len int) ProducerOption {
    return func(opts *ProducerOptions) {
        opts.msgQueueLen = len
    }
}


// 默認爲每個 topic 保留 500 條消息
func repairProducer(opts *ProducerOptions) {
    if opts.msgQueueLen <= 0 {
        opts.msgQueueLen = 500
    }
}

5.3.2 投遞消息

producer 投遞消息時,會使用到 redis 客戶端中的 XADD 方法:

// 生產一條消息
func (p *Producer) SendMsg(ctx context.Context, topic, key, val string) (string, error) {
    return p.client.XADD(ctx, topic, p.opts.msgQueueLen, key, val)
}

5.3.3 使用示例

下面展示一個使用 producer 進行消息投遞的單測代碼示例:

import (
    "context"
    "testing"


    "github.com/xiaoxuxiansheng/redmq"
    "github.com/xiaoxuxiansheng/redmq/redis"
)




const ( 
    // 連接 redis 的傳輸層協議,默認爲 tcp
    network  = "tcp"
    // redis 的地址(ip:port)
    address  = "請輸入 redis 地址"
    // redis 服務器的密碼,如果沒設密碼可以傳爲空串
    password = "請輸入 redis 密碼"
    // 投遞到哪個 topic
    topic    = "請輸入 topic 名稱"
)


func Test_Producer(t *testing.T) {
    // 創建一個 redis 客戶端
    client := redis.NewClient(network, address, password)
    // 構造生產者 producer 實例,並限制一個 topic 最多保留 10 條消息
    producer := redmq.NewProducer(client, redmq.WithMsgQueueLen(10))
    ctx := context.Background()
    // 投遞消息
    msgID, err := producer.SendMsg(ctx, topic, "test_k""test_v")
    if err != nil {
        t.Error(err)
        return
    }
    // 打印這條消息對應的 msg id
    t.Log(msgID)
}

5.4 消費者

5.4.1 類定義

下面是關於消費者 consumer 的類定義:

// 消費者 consumer 類定義
type Consumer struct {
    // consumer 生命週期管理
    ctx  context.Context
    // 停止 consumer 的控制器
    stop context.CancelFunc


    // 接收到 msg 時執行的回調函數,由使用方定義
    callbackFunc MsgCallback


    // redis 客戶端,基於 redis 實現 message queue
    client *redis.Client


    // 消費的 topic
    topic string
    // 所屬的消費者組
    groupID string
    // 當前節點的消費者 id
    consumerID string


    // 各消息累計失敗次數
    failureCnts map[redis.MsgEntity]int


    // 一些用戶自定義的配置
    opts *ConsumerOptions
}


// 消費者 consumer 構造器函數
func NewConsumer(client *redis.Client, topic, groupID, consumerID string, callbackFunc MsgCallback, opts ...ConsumerOption) (*Consumer, error) {


    // cancel context,用於提供停止 consumer 的控制器
    ctx, stop := context.WithCancel(context.Background())
    // 構造 consumer 實例
    c := Consumer{
        client:       client,
        ctx:          ctx,
        stop:         stop,
        callbackFunc: callbackFunc,
        topic:        topic,
        groupID:      groupID,
        consumerID:   consumerID,


        opts: &ConsumerOptions{},




        failureCnts: make(map[redis.MsgEntity]int),
    }


    // 校驗 consumer 中的參數,包括 topic、groupID、consumerID 都不能爲空,且 callbackFunc 不能爲空
    if err := c.checkParam(); err != nil {
        return nil, err
    }


    // 注入用戶自定義的配置項
    for _, opt := range opts {
        opt(c.opts)
    }


    // 修復非法的配置參數
    repairConsumer(c.opts)


    // 啓動 consumer 守護 goroutine,負責輪詢消費消息
    go c.run()
    // 返回 consumer 實例
    return &c, nil
}

關於 consumer 的一些自定義配置項:

type ConsumerOptions struct {
    // 每輪阻塞消費新消息時等待超時時長
    receiveTimeout time.Duration
    // 處理消息的最大重試次數,超過此次數時,消息會被投遞到死信隊列
    maxRetryLimit int
    // 死信隊列,可以由使用方自定義實現
    deadLetterMailbox DeadLetterMailbox
    // 投遞死信流程超時閾值
    deadLetterDeliverTimeout time.Duration
    // 處理消息流程超時閾值
    handleMsgsTimeout time.Duration
}




type ConsumerOption func(opts *ConsumerOptions)




func WithReceiveTimeout(timeout time.Duration) ConsumerOption {
    return func(opts *ConsumerOptions) {
        opts.receiveTimeout = timeout
    }
}




func WithMaxRetryLimit(maxRetryLimit int) ConsumerOption {
    return func(opts *ConsumerOptions) {
        opts.maxRetryLimit = maxRetryLimit
    }
}




func WithDeadLetterMailbox(mailbox DeadLetterMailbox) ConsumerOption {
    return func(opts *ConsumerOptions) {
        opts.deadLetterMailbox = mailbox
    }
}




func WithDeadLetterDeliverTimeout(timeout time.Duration) ConsumerOption {
    return func(opts *ConsumerOptions) {
        opts.deadLetterDeliverTimeout = timeout
    }
}




func WithHandleMsgsTimeout(timeout time.Duration) ConsumerOption {
    return func(opts *ConsumerOptions) {
        opts.handleMsgsTimeout = timeout
    }
}


// 修復非法的配置參數
func repairConsumer(opts *ConsumerOptions) {
    // 默認阻塞消費的超時時長爲 2 s
    if opts.receiveTimeout < 0 {
        opts.receiveTimeout = 2 * time.Second
    }


    // 默認同一條消息最多處理 3 次,超過此次數,則進入死信
    if opts.maxRetryLimit < 0 {
        opts.maxRetryLimit = 3
    }


    // 如果用戶沒傳入死信隊列,則使用默認的死信隊列,只會打印一下消息
    if opts.deadLetterMailbox == nil {
        opts.deadLetterMailbox = NewDeadLetterLogger()
    }


    // 投遞消息進入死信隊列的流程的超時時間
    if opts.deadLetterDeliverTimeout <= 0 {
        opts.deadLetterDeliverTimeout = time.Second
    }


    // 處理消息執行 callback 回調的超時時間
    if opts.handleMsgsTimeout <= 0 {
        opts.handleMsgsTimeout = time.Second
    }
}

關於死信隊列的定義:

// 死信隊列,當消息處理失敗達到指定次數時,會被投遞到此處
type DeadLetterMailbox interface {
    Deliver(ctx context.Context, msg *redis.MsgEntity) error
}


// 默認使用的死信隊列,僅僅對消息失敗的信息進行日誌打印
type DeadLetterLogger struct{}


func NewDeadLetterLogger() *DeadLetterLogger {
    return &DeadLetterLogger{}
}


func (d *DeadLetterLogger) Deliver(ctx context.Context, msg *redis.MsgEntity) error {
    log.ErrorContextf(ctx, "msg fail execeed retry limit, msg id: %s", msg.MsgID)
    return nil
}

接下來是 consumer 在消費消息後執行的回調函數:

// 接收到消息後執行的回調函數
type MsgCallback func(ctx context.Context, msg *redis.MsgEntity) error

5.4.2 消費消息

接下來是 consumer goroutine 輪詢消費消息的流程,主要包括三個步驟:

對應流程示意如下:

下面是源碼部分:

// 該方法會在 consumer 構造器函數中被異步啓動
func (c *Consumer) run() {
    // 通過 for 循環實現自旋模型
    for {
        // select 多路複用,保證在 Consumer stop 方法被執行時,該 goroutine 能及時退出
        select {
        case <-c.ctx.Done():
            return
        default:
        }


        // 接收處理新消息
        msgs, err := c.receive()
        if err != nil {
            log.ErrorContextf(c.ctx, "receive msg failed, err: %v", err)
            continue
        }


        // 接收到新消息後,執行對應的 callback 回調方法
        tctx, _ := context.WithTimeout(c.ctx, c.opts.handleMsgsTimeout)
        c.handlerMsgs(tctx, msgs)


        // 把失敗次數超限的老消息投遞到死信隊列
        tctx, _ = context.WithTimeout(c.ctx, c.opts.deadLetterDeliverTimeout)
        c.deliverDeadLetter(tctx)


        // 接收之前就已分配給當前 consumer,但是還未得到 xack 確認的老消息
        pendingMsgs, err := c.receivePending()
        if err != nil {
            log.ErrorContextf(c.ctx, "pending msg received failed, err: %v", err)
            continue
        }
     
        // 接收到老消息後,執行對應的 callback 回調方法
        tctx, _ = context.WithTimeout(c.ctx, c.opts.handleMsgsTimeout)
        c.handlerMsgs(tctx, pendingMsgs)
    }
}

consumer 在消費新消息時,使用到 redis 客戶端的 XReadGroup 方法:

func (c *Consumer) receive() ([]*redis.MsgEntity, error) {
    msgs, err := c.client.XReadGroup(c.ctx, c.groupID, c.consumerID, c.topic, int(c.opts.receiveTimeout.Milliseconds()))
    if err != nil && !errors.Is(err, redis.ErrNoMsg) {
        return nil, err
    }


    return msgs, nil
}

consumer 在消費未經確認的老消息時,使用到 redis 客戶端的 XReadGroupPending 方法:

func (c *Consumer) receivePending() ([]*redis.MsgEntity, error) {
    pendingMsgs, err := c.client.XReadGroupPending(c.ctx, c.groupID, c.consumerID, c.topic)
    if err != nil && !errors.Is(err, redis.ErrNoMsg) {
        return nil, err
    }


    return pendingMsgs, nil
}

5.4.3 執行回調

在 consumer 消費到消息後,需要執行對應的 callback 函數,此時倘若 callback 執行成功了,則需要調用 xack 方法進行確認答覆;倘若 callback 執行失敗,則需要對失敗次數進行累加,倘若失敗次數達到上限,則這條消息最終會被投遞到死信隊列中:

func (c *Consumer) handlerMsgs(ctx context.Context, msgs []*redis.MsgEntity) {
    for _, msg := range msgs {
        if err := c.callbackFunc(ctx, msg); err != nil {
            // 失敗計數器累加
            c.failureCnts[*msg]++
            continue
        }


        // callback 執行成功,進行 ack
        if err := c.client.XACK(ctx, c.topic, c.groupID, msg.MsgID); err != nil {
            // ack 失敗的情況需要關注
            log.ErrorContextf(ctx, "msg ack failed, msg id: %s, err: %v", msg.MsgID, err)
            continue
        }
 
        // ack 成功了,從 map中清零對應消息的失敗次數
        delete(c.failureCnts, *msg)
    }
}

5.4.4 投遞死信

倘若某條消息失敗次數達到上限,則會被投遞到死信隊列中,對應源碼如下:

func (c *Consumer) deliverDeadLetter(ctx context.Context) {
    // 對於失敗達到指定次數的消息,投遞到死信中,然後執行 ack
    for msg, failureCnt := range c.failureCnts {
        if failureCnt < c.opts.maxRetryLimit {
            continue
        }


        // 投遞死信隊列
        if err := c.opts.deadLetterMailbox.Deliver(ctx, &msg); err != nil {
            log.ErrorContextf(c.ctx, "dead letter deliver failed, msg id: %s, err: %v", msg.MsgID, err)
        }


        // 對於被投遞到死信隊列的消息,需要執行 ack 操作,後續不需要再通過常規流程重複處理了
        if err := c.client.XACK(ctx, c.topic, c.groupID, msg.MsgID); err != nil {
            log.ErrorContextf(c.ctx, "msg ack failed, msg id: %s, err: %v", msg.MsgID, err)
            continue
        }


        // 對於 ack 成功的消息,將其從 failure map 中刪除
        delete(c.failureCnts, msg)
    }
}

5.4.5 停止消費

倘若使用方需要提前終止 consumer 的消費流程,則可以調用 Consumer.Stop 方法. 方法內部會執行 consumer 中內置的 stop 函數,本質上是 context 中的 cancel 函數,會通過停止 consumer 中 context 的方式,保證 consumer 的運行 goroutine 能正常退出.

// 停止 consumer
func (c *Consumer) Stop() {
    c.stop()
}

5.4.6 使用示例

在進行 consumer 消費之前,有兩個操作是需要前置執行的:

XGROUP CREATE my_streams_topic my_group 0-0
OK

下面給出使用 consumer 進行消費的單測示例代碼.

import (
    "context"
    "testing"
    "time"


    "github.com/xiaoxuxiansheng/redmq"
    "github.com/xiaoxuxiansheng/redmq/redis"
)


const (
    // redis 傳輸層協議
    network       = "tcp"
    // redis 服務器地址:ip:port
    address       = "請輸入 redis 地址"
    // redis 服務器密碼,沒有密碼則設爲空串
    password      = "請輸入 redis 密碼"
    // topic 名稱
    topic         = "請輸入 topic 名稱"
    // 消費者組 id
    consumerGroup = "請輸入消費者組名稱"
    // 消費者 id
    consumerID    = "請輸入消費者名稱"
)


// 用戶自定義實現的死信隊列
type DemoDeadLetterMailbox struct {
    do func(msg *redis.MsgEntity)
}


func NewDemoDeadLetterMailbox(do func(msg *redis.MsgEntity)) *DemoDeadLetterMailbox {
    return &DemoDeadLetterMailbox{
        dodo,
    }
}


// 死信隊列接收消息的處理方法
func (d *DemoDeadLetterMailbox) Deliver(ctx context.Context, msg *redis.MsgEntity) error {
    d.do(msg)
    return nil
}


func Test_Consumer(t *testing.T) {
    // 創建 redis 客戶端
    client := redis.NewClient(network, address, password)


    // consumer 接收到消息後的執行的 callback 回調處理函數
    callbackFunc := func(ctx context.Context, msg *redis.MsgEntity) error {
        t.Logf("receive msg, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
        return nil
    }


    // 創建自定義實現的死信隊列實例
    demoDeadLetterMailbox := NewDemoDeadLetterMailbox(func(msg *redis.MsgEntity) {
        t.Logf("receive dead letter, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
    })


    // 構造並啓動消費者 consumer 實例
    consumer, err := redmq.NewConsumer(client, topic, consumerGroup, consumerID, callbackFunc,
        // 每條消息最多重試處理 2 次
        redmq.WithMaxRetryLimit(2),
        // 每輪接收消息的阻塞等待超時時間爲 2 s
        redmq.WithReceiveTimeout(2*time.Second),
        // 注入自定義實現的死信隊列
        redmq.WithDeadLetterMailbox(demoDeadLetterMailbox))
    if err != nil {
        t.Error(err)
        return
    }
    // 程序退出前停止 consumer
    defer consumer.Stop()


    // 十秒後退出單測程序
    <-time.After(10 * time.Second)
}

至此,正文內容全部結束.

6 總結

本期和大家一起探討了如何基於 redis 實現消息隊列,其中實現方案包括三類:

最後,本人使用 go 語言,基於 redis streams 開源實現了一款 mq 客戶端 sdk,開源地址:https://github.com/xiaoxuxiansheng/redmq,歡迎大家批評指正.

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/MSmipbE5cyK2_m5iiKv7pw