新一代消息隊列,是否能代替?

乘風破浪 | 直掛雲帆

Redis Stream 是 Redis5.0 之後 數據結構中的一種高級數據結構,用於實現日誌、消息隊列等應用場景。Stream 可以保持對數據的持久化存儲,並支持按時間順序讀取和消費數據。

在 Redis Stream 中,數據是以消息(Message)的形式存儲的,每個消息都有一個唯一的 ID,可以根據 ID 來查找和操作消息。Stream 以消息的產生時間作爲排序依據,新的消息總是追加到 Stream 的末尾。

基本操作

每一個 Stream 都有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID,消息會持久化到 AOF 或 RDB 文件當中,Redis 重啓後還是會存在。

xadd  mystream  *  field1 value1 [field2 value2....]  創建一個名爲 mystream 的 Stream,並將消息插入其中,* 表示自己生成 ID,ID 格式爲 時間戳 - 同一時間戳的序號  

xrange mystream - +   按照消息產生的時間順序讀取全部消息。

xread count streams mystream <last_id>  從指定的 ID 開始按照時間順序讀取指定數量的消息。

xread block 0 streams mystream $  從 mystream 阻塞讀取消息。

xdel  mystream  ID 從 mystream 中刪除指定 id 的一條記錄。

xlen  mystream  返回 mystream 消息的數量。

xtrim  mystream  maxlen  count  刪除 Stream 中多餘的消息,可以指定刪除的數量或者刪除到指定的 ID。

每個 Stream 可以掛多個消費組,每個消費組會有個遊標 last_delivered_id 在 Stream 數組之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個 Stream 內唯一的名稱。每個消費組 (Consumer Group) 的狀態都是獨立的,相互不受影響。也就是說同一份 Stream 內部的消息會被每個消費組都消費到。

xgroup create mystream mygroup 0-0 爲 mystream 添加一個 mygroup 的消費組,從第一條消息開始消費。

xgroup create mystream mygroup $  爲 mystream 添加一個 mygroup 的消費組,該組只監聽新消息。

xreadgroup group mygroup c1 [block mills]  count 1 streams  mystream > 使用消費組 cgroup-1 中的 c1 消費者讀取 stream-1 中的 一條消息 block mills 表示阻塞等待。

xack mystream mygroup ID [ID...]  通知消費完成 ack,可以傳遞多個 ID。

場景運用

代碼實現

import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.jedis.params.XReadGroupParams;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * 實現消費組消費,不考慮單消費者模式
 */
@Component
public class RedisOperationStreamType {
    public final static String RS_STREAM_MQ_NS = "stream:";
    @Resource
    private JedisPool jedisPool;
    /**
     * 發佈消息到Stream
     * @param key
     * @param message
     * @return
     */
    public StreamEntryID produce(String key,Map<String,String> message){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            StreamEntryID id = jedis.xadd(RS_STREAM_MQ_NS+key, StreamEntryID.NEW_ENTRY, message);
            System.out.println("發佈消息到"+RS_STREAM_MQ_NS+key+" 返回消息id="+id.toString());
            return id;
        } catch (Exception e) {
            throw new RuntimeException("發佈消息失敗!");
        } finally {
            jedis.close();
        }
    }
    /**
     * 創建消費羣組,消費羣組不可重複創建
     * @param key
     * @param groupName
     * @param lastDeliveredId
     */
    public void createCustomGroup(String key, String groupName, String lastDeliveredId){
        Jedis jedis = null;
        try {
            StreamEntryID id = null;
            if (lastDeliveredId==null){
                lastDeliveredId = "0-0";
            }
            id = new StreamEntryID(lastDeliveredId);
            jedis = jedisPool.getResource();
            /*makeStream表示沒有時是否自動創建stream,但是如果有,再自動創建會異常*/
            jedis.xgroupCreate(RS_STREAM_MQ_NS+key,groupName,id,false);
            System.out.println("創建消費羣組成功:"+groupName);
        } catch (Exception e) {
            throw new RuntimeException("創建消費羣組失敗!",e);
        } finally {
            jedis.close();
        }
    }
    /**
     * 消息消費
     * @param key
     * @param customerName
     * @param groupName
     * @return
     */
    public List<Map.Entry<String, List<StreamEntry>>> consume(String key, String customerName,String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            /*消息消費時的參數*/
            XReadGroupParams xReadGroupParams = new XReadGroupParams().block(0).count(1);
            Map<String, StreamEntryID> streams = new HashMap<>();
            streams.put(RS_STREAM_MQ_NS+key,StreamEntryID.UNRECEIVED_ENTRY);
            List<Map.Entry<String, List<StreamEntry>>> result
                    = jedis.xreadGroup(groupName, customerName, xReadGroupParams, streams);
            System.out.println(groupName+"從"+RS_STREAM_MQ_NS+key+"接受消息, 返回消息:"+result);
            return result;
        } catch (Exception e) {
            throw new RuntimeException("消息消費失敗!",e);
        } finally {
            jedis.close();
        }
    }
    /**
     * 消息確認
     * @param key
     * @param groupName
     * @param msgId
     */
    public void ackMsg(String key, String groupName,StreamEntryID msgId){
        if (msgId==null) throw new RuntimeException("msgId爲空!");
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            System.out.println(jedis.xack(key,groupName,msgId));
            System.out.println(RS_STREAM_MQ_NS+key+",消費羣組"+groupName+" 消息已確認");
        } catch (Exception e) {
            throw new RuntimeException("消息確認失敗!",e);
        } finally {
            jedis.close();
        }
    }
    /*
    檢查消費者羣組是否存在,輔助方法
    * */
    public boolean checkGroup(String key, String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
            for(StreamGroupInfo groupinfo : xinfoGroupResult) {
                if(groupName.equals(groupinfo.getName())) return true;
            }
            return false;
        } catch (Exception e) {
            throw new RuntimeException("檢查消費羣組失敗!",e);
        } finally {
            jedis.close();
        }
    }
    public final static int MQ_INFO_CONSUMER = 1;
    public final static int MQ_INFO_GROUP = 2;
    public final static int MQ_INFO_STREAM = 0;
    /**
     * 消息隊列信息查看
     * @param type
     */
    public void MqInfo(int type,String key, String groupName){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            if(type==MQ_INFO_CONSUMER){
                List<StreamConsumersInfo> xinfoConsumersResult = jedis.xinfoConsumers(RS_STREAM_MQ_NS+key, groupName);
                System.out.println(RS_STREAM_MQ_NS+key+" 消費者信息:" + xinfoConsumersResult);
                for( StreamConsumersInfo consumersinfo : xinfoConsumersResult) {
                    System.out.println("-ConsumerInfo:" + consumersinfo.getConsumerInfo());
                    System.out.println("--Name:" + consumersinfo.getName());
                    System.out.println("--Pending:" + consumersinfo.getPending());
                    System.out.println("--Idle:" + consumersinfo.getIdle());
                }
            }else if (type==MQ_INFO_GROUP){
                List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
                System.out.println(RS_STREAM_MQ_NS+key+"消費者羣組信息:" + xinfoGroupResult);
                for(StreamGroupInfo groupinfo : xinfoGroupResult) {
                    System.out.println("-GroupInfo:" + groupinfo.getGroupInfo());
                    System.out.println("--Name:" + groupinfo.getName());
                    System.out.println("--Consumers:" + groupinfo.getConsumers());
                    System.out.println("--Pending:" + groupinfo.getPending());
                    System.out.println("--LastDeliveredId:" + groupinfo.getLastDeliveredId());
                }
            }else{
                StreamInfo xinfoStreamResult = jedis.xinfoStream(RS_STREAM_MQ_NS+key);
                System.out.println(RS_STREAM_MQ_NS+key+"隊列信息:" + xinfoStreamResult);
                System.out.println("-StreamInfo:" + xinfoStreamResult.getStreamInfo());
                System.out.println("--Length:" + xinfoStreamResult.getLength());
                System.out.println("--RadixTreeKeys:" + xinfoStreamResult.getRadixTreeKeys());
                System.out.println("--RadixTreeNodes():" + xinfoStreamResult.getRadixTreeNodes());
                System.out.println("--Groups:" + xinfoStreamResult.getGroups());
                System.out.println("--LastGeneratedId:" + xinfoStreamResult.getLastGeneratedId());
                System.out.println("--FirstEntry:" + xinfoStreamResult.getFirstEntry());
                System.out.println("--LastEntry:" + xinfoStreamResult.getLastEntry());
            }
        } catch (Exception e) {
            throw new RuntimeException("消息隊列信息檢索失敗!",e);
        } finally {
            jedis.close();
        }
    }
}

模擬消息隊列

0****1

Stream 消息太多怎麼辦?

Redis 的 Stream 指令不會刪除消息,它是給消息做了標誌位,當然如果消息太長,在 xadd 的時候可以添加一個 maxlen 的,如果長度達到就會將老的消息幹掉,確保最多不超過指定長度。

0****2

消息忘記 ack 會怎麼樣?

Stream 在每個消費者結構中保存了正在處理中的消息 ID 列表 PEL,如果消費者收到了消息處理完了但是沒有回覆 ack,就會導致 PEL 列表不斷增長,如果有很多消費組的話,那麼這個 PEL 佔用的內存就會放大。所以消息要儘可能的快速消費並確認。

0****3

PEL,如何避免消息丟失?

在客戶端消費者讀取 Stream 消息時,Redis 服務器將消息回覆給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是 PEL 裏已經保存了發出去的消息 ID。待客戶端重新連上之後,可以再次收到 PEL 中的消息 ID 列表。不過此時 xreadgroup 的起始消息 ID 不能爲參數 >,而必須是任意有效的消息 ID,一般將參數設爲 0-0,表示讀取所有的 PEL 消息以及自 last_delivered_id 之後的新消息。

0****4

死信問題

如果某個消息,不能被消費者處理,也就是不能被 XACK,這是要長時間處於 Pending 列表中,即使被反覆的轉移給各個消費者也是如此。此時該消息的 delivery counter(通過 XPENDING 可以查詢到)就會累加,當累加到某個我們預設的臨界值時,我們就認爲是壞消息(也叫死信,DeadLetter,無法投遞的消息),由於有了判定條件,我們將壞消息處理掉即可,刪除即可。刪除一個消息,使用 XDEL 語法,注意,這個命令並沒有刪除 Pending 中的消息,因此查看 Pending,消息還會在,可以在執行執行 XDEL 之後,XACK 這個消息標識其處理完畢。

0****5

Stream 的高可用

Stream 的高可用是建立主從複製基礎上的,它和其它數據結構的複製機制沒有區別,也就是說在 Sentinel 和 Cluster 集羣環境下 Stream 是可以支持高可用的。不過鑑於 Redis 的指令複製是異步的,在 failover 發生時,Redis 可能會丟失極小部分數據,這點 Redis 的其它數據結構也是一樣的。

Stream 的消費模型借鑑了 Kafka 的消費分組的概念,它彌補了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同於 kafka,Kafka 的消息可以分 partition,而 Stream 不行。

總的來說,如果是中小項目和企業,在工作中已經使用了 Redis,在業務量不是很大,而又需要消息中間件功能的情況下,可以考慮使用 Redis 的 Stream 功能。但是如果併發量很高,資源足夠支持下,還是以專業的消息中間件,比如 RocketMQ、Kafka 等來支持業務更好。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_ty8pYo-owgrIe0jAnHugg