新一代消息隊列,是否能代替?
乘風破浪 | 直掛雲帆
Redis Stream 是 Redis5.0 之後 數據結構中的一種高級數據結構,用於實現日誌、消息隊列等應用場景。Stream 可以保持對數據的持久化存儲,並支持按時間順序讀取和消費數據。
在 Redis Stream 中,數據是以消息(Message)的形式存儲的,每個消息都有一個唯一的 ID,可以根據 ID 來查找和操作消息。Stream 以消息的產生時間作爲排序依據,新的消息總是追加到 Stream 的末尾。
基本操作
每一個 Stream 都有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID,消息會持久化到 AOF 或 RDB 文件當中,Redis 重啓後還是會存在。
xadd mystream * field1 value1 [field2 value2....] 創建一個名爲 mystream 的 Stream,並將消息插入其中,* 表示自己生成 ID,ID 格式爲 時間戳 - 同一時間戳的序號
xrange mystream - + 按照消息產生的時間順序讀取全部消息。
xread count streams mystream <last_id> 從指定的 ID 開始按照時間順序讀取指定數量的消息。
xread block 0 streams mystream $ 從 mystream 阻塞讀取消息。
xdel mystream ID 從 mystream 中刪除指定 id 的一條記錄。
xlen mystream 返回 mystream 消息的數量。
xtrim mystream maxlen count 刪除 Stream 中多餘的消息,可以指定刪除的數量或者刪除到指定的 ID。
每個 Stream 可以掛多個消費組,每個消費組會有個遊標 last_delivered_id 在 Stream 數組之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個 Stream 內唯一的名稱。每個消費組 (Consumer Group) 的狀態都是獨立的,相互不受影響。也就是說同一份 Stream 內部的消息會被每個消費組都消費到。
xgroup create mystream mygroup 0-0 爲 mystream 添加一個 mygroup 的消費組,從第一條消息開始消費。
xgroup create mystream mygroup $ 爲 mystream 添加一個 mygroup 的消費組,該組只監聽新消息。
xreadgroup group mygroup c1 [block mills] count 1 streams mystream > 使用消費組 cgroup-1 中的 c1 消費者讀取 stream-1 中的 一條消息 block mills 表示阻塞等待。
xack mystream mygroup ID [ID...] 通知消費完成 ack,可以傳遞多個 ID。
場景運用
-
消息隊列:使用 Stream 作爲消息隊列,生產者可以將消息寫入 Stream,而消費者可以從 Stream 中讀取消息並進行處理。Stream 提供了先進先出(FIFO)的消息傳遞順序,還可以對消息進行持久化,以保證消息的可靠性。
-
實時數據處理:Stream 支持多個消費者併發的消費消息,可以用於實時數據處理場景。生產者可以將實時數據寫入 Stream,而多個消費者可以併發地從 Stream 中讀取數據並進行實時的處理,如實時計算、實時監控等。
-
分佈式日誌:Stream 可以用作分佈式日誌系統。多個應用程序可以將日誌消息寫入 Stream,而多個消費者可以從 Stream 中讀取日誌消息並進行分析、存儲或展示。
-
消息發佈與訂閱:Stream 支持多消費者分組,並提供了自動確認消息的機制。這使得它非常適合用作消息發佈與訂閱系統,可用於在系統內部或分佈式系統之間進行消息的發佈和訂閱。
代碼實現
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.jedis.params.XReadGroupParams;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 實現消費組消費,不考慮單消費者模式
*/
@Component
public class RedisOperationStreamType {
public final static String RS_STREAM_MQ_NS = "stream:";
@Resource
private JedisPool jedisPool;
/**
* 發佈消息到Stream
* @param key
* @param message
* @return
*/
public StreamEntryID produce(String key,Map<String,String> message){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
StreamEntryID id = jedis.xadd(RS_STREAM_MQ_NS+key, StreamEntryID.NEW_ENTRY, message);
System.out.println("發佈消息到"+RS_STREAM_MQ_NS+key+" 返回消息id="+id.toString());
return id;
} catch (Exception e) {
throw new RuntimeException("發佈消息失敗!");
} finally {
jedis.close();
}
}
/**
* 創建消費羣組,消費羣組不可重複創建
* @param key
* @param groupName
* @param lastDeliveredId
*/
public void createCustomGroup(String key, String groupName, String lastDeliveredId){
Jedis jedis = null;
try {
StreamEntryID id = null;
if (lastDeliveredId==null){
lastDeliveredId = "0-0";
}
id = new StreamEntryID(lastDeliveredId);
jedis = jedisPool.getResource();
/*makeStream表示沒有時是否自動創建stream,但是如果有,再自動創建會異常*/
jedis.xgroupCreate(RS_STREAM_MQ_NS+key,groupName,id,false);
System.out.println("創建消費羣組成功:"+groupName);
} catch (Exception e) {
throw new RuntimeException("創建消費羣組失敗!",e);
} finally {
jedis.close();
}
}
/**
* 消息消費
* @param key
* @param customerName
* @param groupName
* @return
*/
public List<Map.Entry<String, List<StreamEntry>>> consume(String key, String customerName,String groupName){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
/*消息消費時的參數*/
XReadGroupParams xReadGroupParams = new XReadGroupParams().block(0).count(1);
Map<String, StreamEntryID> streams = new HashMap<>();
streams.put(RS_STREAM_MQ_NS+key,StreamEntryID.UNRECEIVED_ENTRY);
List<Map.Entry<String, List<StreamEntry>>> result
= jedis.xreadGroup(groupName, customerName, xReadGroupParams, streams);
System.out.println(groupName+"從"+RS_STREAM_MQ_NS+key+"接受消息, 返回消息:"+result);
return result;
} catch (Exception e) {
throw new RuntimeException("消息消費失敗!",e);
} finally {
jedis.close();
}
}
/**
* 消息確認
* @param key
* @param groupName
* @param msgId
*/
public void ackMsg(String key, String groupName,StreamEntryID msgId){
if (msgId==null) throw new RuntimeException("msgId爲空!");
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
System.out.println(jedis.xack(key,groupName,msgId));
System.out.println(RS_STREAM_MQ_NS+key+",消費羣組"+groupName+" 消息已確認");
} catch (Exception e) {
throw new RuntimeException("消息確認失敗!",e);
} finally {
jedis.close();
}
}
/*
檢查消費者羣組是否存在,輔助方法
* */
public boolean checkGroup(String key, String groupName){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
for(StreamGroupInfo groupinfo : xinfoGroupResult) {
if(groupName.equals(groupinfo.getName())) return true;
}
return false;
} catch (Exception e) {
throw new RuntimeException("檢查消費羣組失敗!",e);
} finally {
jedis.close();
}
}
public final static int MQ_INFO_CONSUMER = 1;
public final static int MQ_INFO_GROUP = 2;
public final static int MQ_INFO_STREAM = 0;
/**
* 消息隊列信息查看
* @param type
*/
public void MqInfo(int type,String key, String groupName){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
if(type==MQ_INFO_CONSUMER){
List<StreamConsumersInfo> xinfoConsumersResult = jedis.xinfoConsumers(RS_STREAM_MQ_NS+key, groupName);
System.out.println(RS_STREAM_MQ_NS+key+" 消費者信息:" + xinfoConsumersResult);
for( StreamConsumersInfo consumersinfo : xinfoConsumersResult) {
System.out.println("-ConsumerInfo:" + consumersinfo.getConsumerInfo());
System.out.println("--Name:" + consumersinfo.getName());
System.out.println("--Pending:" + consumersinfo.getPending());
System.out.println("--Idle:" + consumersinfo.getIdle());
}
}else if (type==MQ_INFO_GROUP){
List<StreamGroupInfo> xinfoGroupResult = jedis.xinfoGroup(RS_STREAM_MQ_NS+key);
System.out.println(RS_STREAM_MQ_NS+key+"消費者羣組信息:" + xinfoGroupResult);
for(StreamGroupInfo groupinfo : xinfoGroupResult) {
System.out.println("-GroupInfo:" + groupinfo.getGroupInfo());
System.out.println("--Name:" + groupinfo.getName());
System.out.println("--Consumers:" + groupinfo.getConsumers());
System.out.println("--Pending:" + groupinfo.getPending());
System.out.println("--LastDeliveredId:" + groupinfo.getLastDeliveredId());
}
}else{
StreamInfo xinfoStreamResult = jedis.xinfoStream(RS_STREAM_MQ_NS+key);
System.out.println(RS_STREAM_MQ_NS+key+"隊列信息:" + xinfoStreamResult);
System.out.println("-StreamInfo:" + xinfoStreamResult.getStreamInfo());
System.out.println("--Length:" + xinfoStreamResult.getLength());
System.out.println("--RadixTreeKeys:" + xinfoStreamResult.getRadixTreeKeys());
System.out.println("--RadixTreeNodes():" + xinfoStreamResult.getRadixTreeNodes());
System.out.println("--Groups:" + xinfoStreamResult.getGroups());
System.out.println("--LastGeneratedId:" + xinfoStreamResult.getLastGeneratedId());
System.out.println("--FirstEntry:" + xinfoStreamResult.getFirstEntry());
System.out.println("--LastEntry:" + xinfoStreamResult.getLastEntry());
}
} catch (Exception e) {
throw new RuntimeException("消息隊列信息檢索失敗!",e);
} finally {
jedis.close();
}
}
}
模擬消息隊列
0****1
Stream 消息太多怎麼辦?
Redis 的 Stream 指令不會刪除消息,它是給消息做了標誌位,當然如果消息太長,在 xadd 的時候可以添加一個 maxlen 的,如果長度達到就會將老的消息幹掉,確保最多不超過指定長度。
0****2
消息忘記 ack 會怎麼樣?
Stream 在每個消費者結構中保存了正在處理中的消息 ID 列表 PEL,如果消費者收到了消息處理完了但是沒有回覆 ack,就會導致 PEL 列表不斷增長,如果有很多消費組的話,那麼這個 PEL 佔用的內存就會放大。所以消息要儘可能的快速消費並確認。
0****3
PEL,如何避免消息丟失?
在客戶端消費者讀取 Stream 消息時,Redis 服務器將消息回覆給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了。但是 PEL 裏已經保存了發出去的消息 ID。待客戶端重新連上之後,可以再次收到 PEL 中的消息 ID 列表。不過此時 xreadgroup 的起始消息 ID 不能爲參數 >,而必須是任意有效的消息 ID,一般將參數設爲 0-0,表示讀取所有的 PEL 消息以及自 last_delivered_id 之後的新消息。
0****4
死信問題
如果某個消息,不能被消費者處理,也就是不能被 XACK,這是要長時間處於 Pending 列表中,即使被反覆的轉移給各個消費者也是如此。此時該消息的 delivery counter(通過 XPENDING 可以查詢到)就會累加,當累加到某個我們預設的臨界值時,我們就認爲是壞消息(也叫死信,DeadLetter,無法投遞的消息),由於有了判定條件,我們將壞消息處理掉即可,刪除即可。刪除一個消息,使用 XDEL 語法,注意,這個命令並沒有刪除 Pending 中的消息,因此查看 Pending,消息還會在,可以在執行執行 XDEL 之後,XACK 這個消息標識其處理完畢。
0****5
Stream 的高可用
Stream 的高可用是建立主從複製基礎上的,它和其它數據結構的複製機制沒有區別,也就是說在 Sentinel 和 Cluster 集羣環境下 Stream 是可以支持高可用的。不過鑑於 Redis 的指令複製是異步的,在 failover 發生時,Redis 可能會丟失極小部分數據,這點 Redis 的其它數據結構也是一樣的。
Stream 的消費模型借鑑了 Kafka 的消費分組的概念,它彌補了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同於 kafka,Kafka 的消息可以分 partition,而 Stream 不行。
總的來說,如果是中小項目和企業,在工作中已經使用了 Redis,在業務量不是很大,而又需要消息中間件功能的情況下,可以考慮使用 Redis 的 Stream 功能。但是如果併發量很高,資源足夠支持下,還是以專業的消息中間件,比如 RocketMQ、Kafka 等來支持業務更好。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/_ty8pYo-owgrIe0jAnHugg