10 分鐘掌握 RocketMQ 的核心知識

前言

Apache RocketMQ 是阿里開源的一款高性能、高吞吐量的分佈式消息中間件。

RocketMQ 主要由 Producer、Broker、Consumer 三部分組成,其中 Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。每個 Broker 可以存儲多個 Topic 的消息,每個 Topic 的消息也可以分片存儲於集羣中的不同的 Broker Group。

快速安裝:https://rocketmq.apache.org/docs/quick-start/

源代碼:https://github.com/apache/rocketmq-spring

主要功能:

1、業務解耦。採用發佈訂閱模式,生產端發送消息到 MQ Server,下游的消費端訂閱接收消息。異步形式,系統解耦,提升系統擴展性

2、削峯限流。由於消息中間件的吞吐量很高,過量的請求會暫時放在 MQ server,下游慢慢消費,避免過量請求沖垮系統

3、億級消息的堆積能力,單個隊列中的百萬級消息的累積容量。

4、高可用性:Broker 服務器支持多 Master 多 Slave 的同步雙寫以及 Master 多 Slave 的異步複製模式,其中同步雙寫可保證消息不丟失。

5、高可靠性:生產者將消息發送到 Broker 端有三種方式,同步、異步和單向。Broker 在對於消息刷盤有兩種策略:同步刷盤和異步刷盤,其中同步刷盤可以保證消息成功的存儲到磁盤中。消費者的消費模式也有集羣消費和廣播消費兩種,默認集羣消費,如果集羣模式中消費者掛了,一個組裏的其他消費者會接替其消費。

6、分佈式事務消息:這裏是採用半消息確認和消息回查機制來保證分佈式事務消息。

7、支持消息過濾:建議採用消費者業務端的 tag 過濾

8、支持順序消息:消息在 Broker 中是採用隊列的FIFO模式存儲的,也就是發送是順序的,只要保證消費的順序性即可。

9、支持定時消息和延遲消息:Broker 中由定時消息的機制,消息發送到 Broker 中,不會立即被 Consumer 消費,會等到一定的時間才被消費。延遲消息也是一樣,延遲一定時間之後纔會被 Consumer 消費。

核心組件:

1、Namesrv

Namesrv 充當路由消息的提供者。Namesrv 是一個幾乎無狀態節點,多個 Namesrv 實例組成集羣,但相互獨立,沒有信息交換。Namesrv 主要作用是:爲 producer 和 consumer 提供關於 topic 的路由信息。管理 broker 節點:監控更新 broker 的實時狀態。路由註冊、路由刪除(故障剔除)。

2、Broker

負責存儲消息、轉發消息。Broker 是以 group 爲單位提供服務。一個 group 裏面分 Master 和 Slave。Master 和 Slave 存儲的數據一樣,slave 從 master 同步數據(同步雙寫或異步複製看配置)。一個 Master 可以對應多個 Slave,一個 Slave 只能對應一個 Master。Master 與 Slave 的對應關係通過指定相同的 BrokerName、不同的 BrokerId 來定義,BrokerId 爲 0 表示 Master,非 0 表示 Slave。

基本概念:

1、主題(Topic) 表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是 RocketMQ 進行消息訂閱的基本單位。每個 topic 可分爲若干個分區(queue)

2、生產者組(Producer Group) 同一類 Producer 的集合,這類 Producer 發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之後崩潰,則 Broker 服務器會聯繫同一生產者組的其他生產者實例以提交或回溯消費。

3、消費者組(Consumer Group) 同一類 Consumer 的集合,這類 Consumer 通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的 Topic。RocketMQ 支持兩種消息模式:集羣消費(Clustering)和廣播消費(Broadcasting)。

4、普通順序消息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

5、嚴格順序消息(Strictly Ordered Message) 嚴格順序消息模式下,消費者收到的所有消息均是有序的。

6、消息(Message) 消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ 中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的 Key。系統提供了通過 Message ID 和 Key 查詢消息的功能。

7、標籤(Tag) 爲消息設置的標誌,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標籤。標籤能夠有效地保持代碼的清晰度和連貫性,並優化 RocketMQ 提供的查詢系統。消費者可以根據 Tag 實現對不同子主題的不同消費邏輯,實現更好的擴展性。

RocketMQ 特性:

代碼演示

外部依賴:

spring boot 已經爲 RocketMQ 封裝了 starter 組件,只需在 pom.xml 文件中添加 jar 版本依賴即可:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

配置文件:

在配置文件 application.yaml 中配置 RocketMQ 的相關參數,具體內容如下:

rocketmq:
  name-server: localhost:9876
  consumer:
    topic: maker-order-topic
    group: my-group1
  producer:
    group: p-my-group1

消息生產端:

@Resource
private RocketMQTemplate rocketMQTemplate;
private static String makerOrderTopic = "maker-order-topic";

@GetMapping("/send_make_order_message")
public Object send_make_order_message() {
    try {
        Long orderId = Long.valueOf(new Random().nextInt(1000000));
        OrderModel orderModel = OrderModel.builder().orderId(orderId).buyerUid(200000L).amount(26.8).shippingAddress("上海").build();
        SendResult sendResult = rocketMQTemplate.syncSend(makerOrderTopic, orderModel);
        System.out.printf("Send message to topic %s , sendResult=%s %n", makerOrderTopic, sendResult);
        return "消息發送成功";
    } catch (Exception e) {
        e.printStackTrace();
        return "消息發送失敗";
    }
}

消息消費端:

@Service
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}"topic = "${rocketmq.consumer.topic}"consumerGroup = "${rocketmq.consumer.group}")
public class OrderConsumer implements RocketMQListener<OrderModel> {

    @Override
    public void onMessage(OrderModel orderModel) {
        System.out.printf("consumer received message: %s \n", JSON.toJSONString(orderModel));
    }
}

操作演示

瀏覽器訪問:http://localhost:9071/send_make_order_message,模擬生產端發送消息到 MQ Server 中。

消費端接收消息日誌:

Send message to topic maker-order-topic , sendResult=SendResult [sendStatus=SEND_OK, msgId=C0A80069816F14DAD5DC73A75B9F0014, offsetMsgId=C0A8006900002A9F0000000000058841, messageQueue=MessageQueue [topic=maker-order-topic, brokerName=192.168.0.105, queueId=2]queueOffset=0] 
consumer received message: {"amount":26.8,"buyerUid":200000,"orderId":895586,"shippingAddress":"上海"}

其他消息類型如何發送

1、同步發送

同步發送是指消息發送方發出一條消息後,在收到服務端返回響應後,線程纔會執行後續代碼

OrderModel orderModel = mockOrderModel();
Message message = new Message(makerOrderTopic, "TageA", JSON.toJSONString(orderModel).getBytes());
SendResult sendResult = rocketMQTemplate.getProducer().send(message);

2、異步發送

異步發送是指發送方發出一條消息後,不需要等服務端返回響應。異步發送,需要實現異步發送回調接口(SendCallback),通過回調接口接收服務端響應,並處理結果

OrderModel orderModel = mockOrderModel();
rocketMQTemplate.asyncSend(makerOrderTopic, orderModel, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("消息發送成功,msgId=" + sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable throwable) {
        System.out.println("發送失敗," + throwable);
    }
});

3、順序消息

對於指定的一個 Topic,所有消息根據Sharding Key分區。同一個分區內的消息按照嚴格的 FIFO 順序進行發佈和消費。Sharding Key是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。

比如:電商的訂單創建,以訂單ID作爲 Sharding Key,那麼同一個訂單相關的消息,如創建訂單、付款、發貨、訂單退款消息、訂單物流消息都會按照發布的先後順序來消費。

for (long orderId = 0; orderId < 20; orderId++) {
    String shardingKey = String.valueOf(orderId % 5);
    OrderModel orderModel = OrderModel.builder().orderId(orderId).build();
    SendResult sendResult = rocketMQTemplate.syncSendOrderly(makerOrderTopic, orderModel, shardingKey);
    if (sendResult != null) {
        System.out.println(orderId + " ,發送成功");
    }
}

4、延時消息

Producer 將消息發送到消息隊列 RocketMQ 服務端,但並不期望立馬投遞這條消息,而是延遲一定時間後才投遞到 Consumer 進行消費,該消息稱爲延時消息。

OrderModel orderModel = mockOrderModel();
org.springframework.messaging.Message message = MessageBuilder.withPayload(JSON.toJSONString(orderModel).getBytes()).build();
//延時等級 3, 這個消息將在10s之後發送,現在只支持固定的幾個時間值
//delayTimeLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
SendResult sendResult = rocketMQTemplate.syncSend(makerOrderTopic, message, 8000, 3);

5、事務消息

RocketMQ 提供類似 X/Open XA 的分佈式事務功能,通過消息隊列 RocketMQ 事務消息能達到分佈式事務的最終一致。

由於網絡閃斷、生產者應用重啓等原因,導致某條事務消息的二次確認丟失,消息隊列 RocketMQ 服務端通過掃描發現某條消息長期處於 “半事務消息” 時,主動向生產者查詢該消息的最終狀態(Commit 或 Rollback),該過程稱之爲消息回查。

典型場景:在電商購物車下單時,涉及到購物車系統和交易系統,這兩個系統之間的數據最終一致性可以通過分佈式事務消息的異步處理實現。在這種場景下,交易系統是最爲核心的系統,需要最大限度地保證下單成功。而購物車系統只需要訂閱消息隊列 RocketMQ 的交易訂單消息,做相應的業務處理,即可保證最終的數據一致性。

發送步驟:

回查步驟:

發送半事務消息,示例代碼如下:

OrderModel orderModel = mockOrderModel();
org.springframework.messaging.Message message = MessageBuilder.withPayload(JSON.toJSONString(orderModel)).build();
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("tx_order_message", makerOrderTopic, message, null);
SendStatus sendStatus = transactionSendResult.getSendStatus();
LocalTransactionState localTransactionState = transactionSendResult.getLocalTransactionState();
System.out.println("send message status: " + sendStatus + " ,  localTransactionState: " + localTransactionState);

編寫RocketMQLocalTransactionListener接口實現類,實現執行本地事務和事務回查兩個方法。

@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_message")
public class TXProducerListener implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 執行本地事務
        System.out.println("TXProducerListener 開始執行本地事務。。。");
        RocketMQLocalTransactionState result;
        try {
            // 模擬業務處理( 如:創建訂單 )
            // int i = 1 / 0;  //模擬異常
            result = RocketMQLocalTransactionState.COMMIT;  // 成功
        } catch (Exception e) {
            System.out.println("本地事務執行失敗。。。");
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 檢查本地事務( 例如檢查下訂單是否成功 )
        System.out.println("檢查本地事務。。。");
        RocketMQLocalTransactionState result;
        try {
            //模擬業務處理( 根據檢查結果,決定是COMMIT或ROLLBACK )
            result = RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 異常就回滾
            System.out.println("檢查本地事務 error");
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }

}

演示代碼地址

https://github.com/aalansehaiyang/spring-boot-bulking  

模塊:spring-boot-bulking-rocketmq

面試官一般喜歡考察哪些知識點

1、如何保證順序消息?

順序由 producer 發送到 broker 的消息隊列是滿足 FIFO 的,所以發送是順序的,單個 queue 裏的消息是順序的。多個 Queue 同時消費是無法絕對保證消息的有序性的。所以,同一個 topic,同一個 queue,發消息的時候一個線程發送消息,消費的時候一個線程去消費一個 queue 裏的消息。

2、怎麼保證消息發到同一個 queue 裏?

RocketMQ 給我們提供了 MessageQueueSelector 接口,可以重寫裏面的接口,實現自己的算法,比如判斷 i%2==0,那就發送消息到 queue1 否則發送到 queue2。

3、如何實現消息過濾?

有兩種方案,一種是在 broker 端按照 Consumer 的去重邏輯進行過濾,這樣做的好處是避免了無用的消息傳輸到 Consumer 端,缺點是加重了 Broker 的負擔,實現起來相對複雜。另一種是在 Consumer 端過濾,比如按照消息設置的 tag 去重,這樣的好處是實現起來簡單,缺點是有大量無用的消息到達了 Consumer 端只能丟棄不處理。

4、如果由於網絡等原因,多條重複消息投遞到了 Consumer 端,你怎麼進行消息去重?

這個得先說下消息的冪等性原則:就是用戶對於同一種操作發起的多次請求的結果是一樣的,不會因爲操作了多次就產生不一樣的結果。只要保持冪等性,不管來多少條消息,最後處理結果都一樣,需要 Consumer 端自行實現。

去重的方案:因爲每個消息都有一個 MessageId, 保證每個消息都有一個唯一鍵,可以是數據庫的主鍵或者唯一約束,也可以是 Redis 緩存中的鍵,當消費一條消息前,先檢查數據庫或緩存中是否存在這個唯一鍵,如果存在就不再處理這條消息,如果消費成功,要保證這個唯一鍵插入到去重表中。

5、RocketMQ 是怎麼實現分佈式事務消息的?

6、從 Producer 角度分析,如何確保消息成功發送到了 Broker?

7、從 Broker 角度分析,如何確保消息持久化?

8、從 Consumer 角度分析,如何保證消息被成功消費?

Consumer 自身維護了個持久化的 offset(對應 Message Queue 裏的 min offset),用來標記已經成功消費且已經成功發回 Broker 的消息下標。如果 Consumer 消費失敗,它會向 Broker 發回消費失敗的狀態,發回成功纔會更新自己的 offset。如果發回給 broker 時 broker 掛掉了,Consumer 會定時重試,如果 Consumer 和 Broker 一起掛掉了,消息還在 Broker 端存儲着,Consumer 端的 offset 也是持久化的,重啓之後繼續拉取 offset 之前的消息進行消費。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/C97cxpf1Re1EcdBbNCGBBA