redis 靈魂拷問:如何使用 stream 實現消息隊列

redis 在很早之前就支持消息隊列了,使用的是 PUB/SUB 功能來實現的。PUB/SUB 有一個缺點就是消息不能持久化,如果 redis 發生宕機,或者客戶端發生網絡斷開,歷史消息就丟失了。

redis5.0 開始引入了 stream 這個數據結構,stream 可以很好地用於消息隊列,它支持消息持久化,同時可以記錄消費者的位置,即使客戶端斷開重連,也不會丟失消息。

本篇文章我們就來聊一聊基於 stream 的消息隊列使用。

stream 隊列簡介

基於 stream 實現的消息隊列有 4 個角色,我們來看一下:

stream: 消息隊列

last delivered ID:消費者組在消息隊列中的 offset

consumer group:消費者組,可以包含多個消費者,同時有一個 last delivered ID

pending entries list (PEL):消費者已經讀取但是沒有 ACK 的消息

根據上面的描述,stream 的消息隊列結構如下圖:

注意:消費者組內的消費者是不會重複消費消息的,比如一個 stream 包括 1、2、3、4 這 4 條消息,消費者組內有 2 個消費者,如果其中一個消費者消費了 1、2,則第二個消費者就只能消費 3、4 了。

命令介紹

本文使用測試環境如下

redis 版本: 6.0.7

springboot-redis 版本:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.6.RELEASE</version>
</dependency>

裏面使用到的 spring-data-redis 版本:2.1.9.RELEASE

裏面使用到的 lettuce 連接池版本:5.1.7.RELEASE

本文使用的 redis 客戶端是 lettuce,lettuce 提供了 RedisStreamCommands 和 RedisStreamAsyncCommands 支持 stream 操作,本文只介紹 RedisStreamCommands。

我們創建 lettuce 中的 RedisStreamCommands,代碼如下:

RedisURI redisURI = RedisURI.builder()
                .withSentinelMasterId("master")
                .withPassword("foobared")
                .withSentinelMasterId("master")
                .withSentinel("192.168.59.146",26379)
                .withSentinel("192.168.59.141",26379)
                .withSentinel("192.168.59.141",26389).build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection<String, String> connection = client.connect();
RedisStreamCommands streamCommands = connection.sync();

當我們使用 XADD 命令往 stream 裏面寫數據時,如果 stream 不存在,就會創建一個,命令如下:

192.168.59.146:6379> XADD mystream * name Sara surname OConnor
"1607996267360-0"

我們看下 Java 示例代碼:

/**
 * 命令:XADD
 *
 * 時間複雜度:O(1)
 * @param streamKey 隊列名稱
 * @param data 數據
 */
public void xadd(String streamKey, Map<String, String> data){
    streamCommands.xadd(streamKey, data);
}

要消費 stream 隊列中的數據,首先我們需要創建一個消費組,命令如下:

XGROUP CREATE mystream mygroup $ MKSTREAM

上面的 $ 表示 group 的 offset 是隊列中的最後一個元素,MKSTREAM 這個參數會判斷 stream 是否存在,如果不存在會創建一個我們指定名稱的 stream,不加這個參數,stream 不存在會報錯。

java 代碼如下:

/**
 * XGROUP
 *
 * 時間複雜度:O(1)
 * @param streamKey 隊列名稱
 * @param groupName 消費組名稱
 */
public void createGroup(String streamKey, String groupName){
    streamCommands.xgroupCreate(XReadArgs.StreamOffset.latest(streamKey), groupName);
}

刪除消費組我們使用下面的命令:

XGROUP DESTROY mystream consumer-group-name

java 示例代碼如下:

/**
 * XGROUP
 *
 * 時間複雜度:O(M),M是pending entries list長度
 * @param streamKey 隊列名稱
 * @param groupName 消費組名稱
 */
public void deleteGroup(String streamKey, String groupName){
    streamCommands.xgroupDestroy(streamKey, groupName);
}

消息的消費有 2 種方式,XREAD 和 XREADGROUP:

XREAD是消費組讀取消息,我們看下面這個命令:
XREAD COUNT 2 STREAMS mystream writers 0-0 0-0

注意:上面這個示例是從 mystream 和 writers 這 2 個 stream 中讀取消息,offset 都是 0,COUNT 參數指定了每個隊列中讀取的消息數量不多餘 2 個。

下面的 java 代碼是從 testStream 這個 stream 中讀取消息,offset 是 0:

//複雜度 O(N),N是要返回的消息個數
List<StreamMessage<String, String>> list4 = streamCommands.xread(XReadArgs.StreamOffset.from("testStream", "0-0"))

XREADGROUP 使用消費者來消費消息,我們看下面這個命令:

XREADGROUP GROUP mygroup Alice BLOCK 2000 COUNT 1 STREAMS mystream >

這個命令是使用消費組 mygroup 的 Alice 這個消費者從 mystream 這個 stream 中讀取 1 條消息。

注意:

java 示例代碼如下:

//複雜度 O(N),N是要返回的消息個數
List<StreamMessage<String, String>> list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"),
                XReadArgs.StreamOffset.lastConsumed("testStream"));

這段代碼是使用 group1 消費組中的 consumer1 這個消費者從 testStream 這個 stream 中使用最後一個元素作爲 offset 來消費消息。

使用 XACK 命令可以對消息進行確認,命令如下:

XACK mystream mygroup 1526569495631-0

這裏表示消費組 mygroup 確認 mystream 這個 stream 中 1526569495631-0 這條消息

下面的 java 代碼是對收到的消息依次打印後進行確認,如下:

for (StreamMessage<String, String> message : list1){
    System.out.println(message.getId() + ":" + message.getBody());
    streamCommands.xack("streamTest", "group1", message.getId());
}

使用案例

這裏我提供一個案例,生產者每隔 1s 向隊列中寫入 1 條消息,代碼如下:

public void xgroupCreate(){
    //調用上面的createGroup方法
    createGroup("testStream", "group2");
    System.out.println("----------------------------");
    int i = 0;
    for (;;){
        Map<String, String> body =  Collections.singletonMap("message" + i, "value" + i);
        String key = streamCommands.xadd("testStream", body);
        System.out.println(key);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

啓動程序後打印如下:

----------------------------
1608014871896-0
1608014872899-0
1608014873900-0
1608014874903-0
1608014875907-0
1608014876910-0
1608014877920-0
1608014878923-0
1608014879925-0
1608014880930-0

消費者每隔 2s 從隊列中拉取一次消息,打印後執行 XACK,代碼如下:

public void xgroupRead(){
    List<StreamMessage<String, String>> list1;
    while (true){
        list1 = streamCommands.xreadgroup(Consumer.from("group1", "consumer1"),
                XReadArgs.StreamOffset.lastConsumed("testStream"));
        if (list1.isEmpty()) {
            System.out.println("==============================");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }
        for (StreamMessage<String, String> message : list1){
            System.out.println(message.getId() + ":" + message.getBody());
            streamCommands.xack("streamTest", "group2", message.getId());
        }
    }
}

啓動任務後打印如下:

==============================
1608014871896-0:{message0=value0}
1608014872899-0:{message0=value0}
==============================
1608014873900-0:{message0=value0}
1608014874903-0:{message0=value0}
==============================
1608014875907-0:{message0=value0}
1608014876910-0:{message0=value0}
==============================
1608014877920-0:{message0=value0}
1608014878923-0:{message0=value0}
==============================
1608014879925-0:{message0=value0}
1608014880930-0:{message0=value0}
==============================

使用場景

redis 的特點是讀寫速度快,所以對於實時性高要求效率的場景是一個不錯的選擇。

但是 AOF+RDB 的數據持久化方案可能會丟失 1s 的數據 (AOF 持久化策略使用 everysec),所以對於數據一致性要求高的場景要跳過。

在物聯網場景中,有大規模的傳感器數據需要採集,這些數據對實時性的要求高過了一致性,使用 redis 是一個很好的選擇

總結

使用 redis 的 stream 可以實現簡單的隊列,跟 rabbitmq 等非常成熟的消息隊列相比,功能還是比較薄弱的,比如不支持 exchange。

redis 讀寫速度快的特點對實時性要求高的場景還是一個不錯的選擇,但是如果對數據一致性要求很高,需要繞過。

redis 的 stream 還提供了其他很多的命令,本文並沒有全部介紹,感興趣的同學可以參考官網鏈接:

https://redis.io/commands/xack
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/jh-D_W7whwT_Vi-P6am6Qw