如何使用 Redis Streams 實現消息隊列
一 背景
在上一篇文章 Redis Streams 原理介紹,使用場景及基於 Golang 實現的使用示例 講解了關於 Redis Streams 的原理,使用情況,提到過它最主要是用在消息隊列中。這篇文章,就基於 Redis Streams 實現消息隊列以及如何基於 Golang 實現它的消息隊列。
二 Redis Streams 相關操作命令
Redis Streams 實現消息隊列的流程圖如下:
2.1 生產消息
首先是用於生產消息的指令,通過 XADD 指令往 topic 中加入一組 kv 對消息:
XADD test_streams_topic * key1 val1
XADD test_streams_topic * key2 val3
其中,test_streams_topic 是 topic 名稱;'*'表示消息自動生成唯一標識 id,基於時間戳 + 自增序號生成;key1/val1、key2/val2:消息數據 kv 對。
2.2 消費消息
消費消息的指令,是通過 XREAD 指令從對應 topic 中獲取消息
XREAD STREAMS test_streams_topic 0-0
test_streams_topic 是 topic 名稱;0-0 表示消息從頭開始消費,如果這裏爲某條消息 id,則代表從這條消息之後(不包含這條消息)開始消費。
2.3 阻塞模式下的消費消息
Streams 支持在消費時,採用阻塞模式進行消費。如果存在數據則即時返回處理,否則會阻塞消費流程。
XREAD BLOCK 0 STREAMS test_streams_topic 1715522464874-0
在這裏,BLOCK 表示阻塞消費模式;0 表示阻塞等待超時時間,超過這個時長會返回 nil. 設置爲 0 則表示不設置超時閾值。如果此時往 topic 中加入一組 kv 對消息,則會出現如下結果:
2.4 創建消費羣組
Streams 也支持發佈訂閱模式,能保證消息被多個消費者組 consumer group 同時消費到。首先,進行消費羣組的創建:
XGROUP CREATE test_streams_topic test_group 0-0
在這裏,test_streams_topic 是 topic 名稱;test_group 表示消費羣組;0-0 表示從頭開始消費
2.5 基於消費組消費消息
同一份數據在同一個消費者組下只會被消費到一次。不同消費者組各自能獲取到獨立完整的消息數據。在這裏,可以通過 XReadGroup 指令,以消費者組的身份進行消費。
XREADGROUP GROUP test_group consumer BLOCK 0 STREAMS test_streams_topic >
其中,test_group 表示消費者組名稱;consumer 表示消費者名稱;test_streams_topic 名稱;block 0 表示是採用阻塞等待的模式,0 代表沒有超時上限;">" 表示讀取最新的消息(尚未分配給某個 consumer 的消息)。
還有另一種消費模式,讀取的是已分配給當前消費者,但是還未經確認的老消息:
XREADGROUP GROUP test_group consumer BLOCK 0 STREAMS test_streams_topic 0-0
0-0 表示標識讀取已分配給當前 consumer ,但是還沒經過 xack 指令確認的消息。
2.6 確認消息
通過 XACK 指令,帶上消費者組、topic 名稱以及消息 id,能夠完成對某條消息的確認操作。
XACK test_streams_topic test_group 1715522446991-0
1715522446991-0 表示消息的 id(在這裏,是第一條消息的 ID)
通過以上的步驟操作,就可以基於 Redis Streams 實現一個消息隊列。當然,這只是一個最基礎的消息隊列,在實際的應用中,我們可能需要實現更復雜的邏輯,比如錯誤處理、消息重試和消費者狀態管理等等,這個就需要結合業務需求去實現了。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/QApIVf56cqB0PDhQrCNR6g