redis 靈魂拷問:如何使用 stream 實現消息隊列
redis 在很早之前就支持消息隊列了,使用的是 PUB/SUB 功能來實現的。PUB/SUB 有一個缺點就是消息不能持久化,如果 redis 發生宕機,或者客戶端發生網絡斷開,歷史消息就丟失了。
redis5.0 開始引入了 stream 這個數據結構,stream 可以很好地用於消息隊列,它支持消息持久化,同時可以記錄消費者的位置,即使客戶端斷開重連,也不會丟失消息。
本篇文章我們就來聊一聊基於 stream 的消息隊列使用。
stream 隊列簡介
基於 stream 實現的消息隊列有 4 個角色,我們來看一下:
stream: 消息隊列
last delivered ID:消費者組在消息隊列中的 offset
consumer group:消費者組,可以包含多個消費者,同時有一個 last delivered ID
pending entries list (PEL):消費者已經讀取但是沒有 ACK 的消息
根據上面的描述,stream 的消息隊列結構如下圖:
注意:消費者組內的消費者是不會重複消費消息的,比如一個 stream 包括 1、2、3、4 這 4 條消息,消費者組內有 2 個消費者,如果其中一個消費者消費了 1、2,則第二個消費者就只能消費 3、4 了。
命令介紹
本文使用測試環境如下:
redis 版本: 6.0.7
springboot-redis 版本:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
裏面使用到的 spring-data-redis 版本:2.1.9.RELEASE
裏面使用到的 lettuce 連接池版本:5.1.7.RELEASE
本文使用的 redis 客戶端是 lettuce,lettuce 提供了 RedisStreamCommands 和 RedisStreamAsyncCommands 支持 stream 操作,本文只介紹 RedisStreamCommands。
- 創建 Commands
我們創建 lettuce 中的 RedisStreamCommands,代碼如下:
RedisURI redisURI = RedisURI.builder()
.withSentinelMasterId("master")
.withPassword("foobared")
.withSentinelMasterId("master")
.withSentinel("192.168.59.146",26379)
.withSentinel("192.168.59.141",26379)
.withSentinel("192.168.59.141",26389).build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connection = client.connect();
RedisStreamCommands streamCommands = connection.sync();
- 創建 stream
當我們使用 XADD 命令往 stream 裏面寫數據時,如果 stream 不存在,就會創建一個,命令如下:
192.168.59.146:6379> XADD mystream * name Sara surname OConnor
"1607996267360-0"
我們看下 Java 示例代碼:
/**
* 命令:XADD
*
* 時間複雜度:O(1)
* @param streamKey 隊列名稱
* @param data 數據
*/
public void xadd(String streamKey, Map<String, String> data){
streamCommands.xadd(streamKey, data);
}
- 創建消費組
要消費 stream 隊列中的數據,首先我們需要創建一個消費組,命令如下:
XGROUP CREATE mystream mygroup $ MKSTREAM
上面的 $ 表示 group 的 offset 是隊列中的最後一個元素,MKSTREAM 這個參數會判斷 stream 是否存在,如果不存在會創建一個我們指定名稱的 stream,不加這個參數,stream 不存在會報錯。
java 代碼如下:
/**
* XGROUP
*
* 時間複雜度:O(1)
* @param streamKey 隊列名稱
* @param groupName 消費組名稱
*/
public void createGroup(String streamKey, String groupName){
streamCommands.xgroupCreate(XReadArgs.StreamOffset.latest(streamKey), groupName);
}
- 刪除消費組
刪除消費組我們使用下面的命令:
XGROUP DESTROY mystream consumer-group-name
java 示例代碼如下:
/**
* XGROUP
*
* 時間複雜度:O(M),M是pending entries list長度
* @param streamKey 隊列名稱
* @param groupName 消費組名稱
*/
public void deleteGroup(String streamKey, String groupName){
streamCommands.xgroupDestroy(streamKey, groupName);
}
- 消費消息
消息的消費有 2 種方式,XREAD 和 XREADGROUP:
XREAD是消費組讀取消息,我們看下面這個命令:
XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
注意:上面這個示例是從 mystream 和 writers 這 2 個 stream 中讀取消息,offset 都是 0,COUNT 參數指定了每個隊列中讀取的消息數量不多餘 2 個。
下面的 java 代碼是從 testStream 這個 stream 中讀取消息,offset 是 0:
//複雜度 O(N),N是要返回的消息個數
List<StreamMessage<String, String>> list4 = streamCommands.xread(XReadArgs.StreamOffset.from("testStream", "0-0"))
XREADGROUP 使用消費者來消費消息,我們看下面這個命令:
XREADGROUP GROUP mygroup Alice BLOCK 2000 COUNT 1 STREAMS mystream >
這個命令是使用消費組 mygroup 的 Alice 這個消費者從 mystream 這個 stream 中讀取 1 條消息。
注意:
-
上面使用了 BLOCK,表示是阻塞讀取,如果讀不到數據,會阻塞等待 2s,不加這個條件默認是不阻塞的
-
">" 表示只接受其他消費者沒有消費過的消息
-
如果沒有 ">", 消費者會消費比指定 id 偏移量大並且沒有被自己確認過的消息,這樣就不用關係是否 ACK 過或者是否 BLOCK 了。
java 示例代碼如下:
//複雜度 O(N),N是要返回的消息個數
List<StreamMessage<String, String>> list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"),
XReadArgs.StreamOffset.lastConsumed("testStream"));
這段代碼是使用 group1 消費組中的 consumer1 這個消費者從 testStream 這個 stream 中使用最後一個元素作爲 offset 來消費消息。
- 確認消息
使用 XACK 命令可以對消息進行確認,命令如下:
XACK mystream mygroup 1526569495631-0
這裏表示消費組 mygroup 確認 mystream 這個 stream 中 1526569495631-0 這條消息
下面的 java 代碼是對收到的消息依次打印後進行確認,如下:
for (StreamMessage<String, String> message : list1){
System.out.println(message.getId() + ":" + message.getBody());
streamCommands.xack("streamTest", "group1", message.getId());
}
使用案例
這裏我提供一個案例,生產者每隔 1s 向隊列中寫入 1 條消息,代碼如下:
public void xgroupCreate(){
//調用上面的createGroup方法
createGroup("testStream", "group2");
System.out.println("----------------------------");
int i = 0;
for (;;){
Map<String, String> body = Collections.singletonMap("message" + i, "value" + i);
String key = streamCommands.xadd("testStream", body);
System.out.println(key);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
啓動程序後打印如下:
----------------------------
1608014871896-0
1608014872899-0
1608014873900-0
1608014874903-0
1608014875907-0
1608014876910-0
1608014877920-0
1608014878923-0
1608014879925-0
1608014880930-0
消費者每隔 2s 從隊列中拉取一次消息,打印後執行 XACK,代碼如下:
public void xgroupRead(){
List<StreamMessage<String, String>> list1;
while (true){
list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"),
XReadArgs.StreamOffset.lastConsumed("testStream"));
if (list1.isEmpty()) {
System.out.println("==============================");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
for (StreamMessage<String, String> message : list1){
System.out.println(message.getId() + ":" + message.getBody());
streamCommands.xack("streamTest", "group2", message.getId());
}
}
}
啓動任務後打印如下:
==============================
1608014871896-0:{message0=value0}
1608014872899-0:{message0=value0}
==============================
1608014873900-0:{message0=value0}
1608014874903-0:{message0=value0}
==============================
1608014875907-0:{message0=value0}
1608014876910-0:{message0=value0}
==============================
1608014877920-0:{message0=value0}
1608014878923-0:{message0=value0}
==============================
1608014879925-0:{message0=value0}
1608014880930-0:{message0=value0}
==============================
使用場景
redis 的特點是讀寫速度快,所以對於實時性高要求效率的場景是一個不錯的選擇。
但是 AOF+RDB 的數據持久化方案可能會丟失 1s 的數據 (AOF 持久化策略使用 everysec),所以對於數據一致性要求高的場景要跳過。
在物聯網場景中,有大規模的傳感器數據需要採集,這些數據對實時性的要求高過了一致性,使用 redis 是一個很好的選擇
總結
使用 redis 的 stream 可以實現簡單的隊列,跟 rabbitmq 等非常成熟的消息隊列相比,功能還是比較薄弱的,比如不支持 exchange。
redis 讀寫速度快的特點對實時性要求高的場景還是一個不錯的選擇,但是如果對數據一致性要求很高,需要繞過。
redis 的 stream 還提供了其他很多的命令,本文並沒有全部介紹,感興趣的同學可以參考官網鏈接:
https://redis.io/commands/xack
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/jh-D_W7whwT_Vi-P6am6Qw