消息隊列技術選型:這 7 種消息場景一定要考慮!

大家好,我是君哥。

我們在做消息隊列的技術選型時,往往會結合業務場景進行考慮。今天來聊一聊消息隊列可能會用到的 7 種消息場景。

1 普通消息

消息隊列最基礎的功能就是生產者發送消息、Broker 保存消息,消費者來消費消息,以此實現系統解耦、削峯填谷的作用。

普通消息是消息隊列必備的消息類型,也是系統使用場景最多的一種消息。

2 順序消息

順序消息是指生產者發送消息的順序和消費者消費消息的順序是一致的。比如在一個電商場景,同一個用戶提交訂單、訂單支付、訂單出庫,這三個消息消費者需要按照順序來進行消費。如下圖:

順序消息的實現並不容易,原因如下:

要保證消息有序,需要滿足兩個條件:

如下圖:

上面第二個條件是比較容易實現的,一個分區綁定一個消費者就可以,主要是第一個條件。

在主流消息隊列的實現中,Kafka 和 Pulsar 的實現方式類似,生產者給消息賦值一個 key,對 key 做 Hash 運算來指定消息發送到哪一個分區。比如上面電商的例子,對同一個用戶的一筆訂單,提交訂單、訂單支付、訂單出庫這三個消息賦值同一個 key,就可以把這三條消息發送到同一個分區。

對於 RocketMQ,生產者在發送消息的時候,可以通過 MessageQueueSelector 指定把消息投遞到那個 MessageQueue,如下圖:

示例代碼如下:

public static void main(String[] args) throws UnsupportedEncodingException {
 try {
  DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  producer.start();

  String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};
  for (int i = 0; i < 100; i++) {
   int orderId = i % 10;
   Message msg =
    new Message("TopicTestjjj", tags[i % tags.length]"KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
     Integer id = (Integer) arg;
     int index = id % mqs.size();
     return mqs.get(index);
    }
   }, orderId);

   System.out.printf("%s%n", sendResult);
  }

  producer.shutdown();
 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  e.printStackTrace();
 }
}

RabbitMQ 的實現是 Exchange 根據設置好的 Route Key 將數據路由到不同的 Queue 中。示例代碼如下:

@Resource
private AmqpTemplate rabbitTemplate;

public void send1(String message) {
 rabbitTemplate.convertAndSend("testExchange""testRoutingKey", message);
}

3 延時消息

或者也叫定時消息,是指消息發送後不會立即被消費,而是指定一個時間,到時間後再消費。經典的場景比如電商購物時,30 分鐘未支付訂單,讓訂單自動失效。

3.1 RocketMQ 實現

RocketMQ 定義了 18 個延時級別,每個延時級別對應一個延時時間。下面如果延遲級別是 3,則消息會延遲 10s 纔會拉取。

//MessageStoreConfig類
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

RocketMQ 的延時消息如下圖:

生產者把消費發送到 Broker 後,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 這個 Topic,然後調度任務會判斷是否到期,如果到期,會把消息從 SCHEDULE_TOPIC_XXXX 取出投遞到原始的 queue,這樣消費者就可以消費到了。

RocketMQ 的延時消息只支持最大兩個小時的延時,不過 RocketMQ5.0 基於時間輪算法實現了定時消息,解決了這個問題。

3.2 Pulsar 實現

Pulsar 的實現如下圖:

Pulsar 的延時消息首先會寫入一個 Delayed Message Tracker 的數據結構中,Delayed Message Tracker 根據延時時間構建 delayed index 優先級隊列。消費者拉取消息時,首先去 Delayed Message Tracker 檢查是否有到期的消息。如果有則直接拉取進行消費。

3.3 RabbitMQ 實現

RabbitMQ 的實現方式有兩種,一種是投遞到普通隊列都不消費,等消息過期後被投遞到死信隊列,消費者消費死信隊列。如下圖:

第二種方式是生產者發送消息時,先發送到本地 Mnesia 數據庫,消息到期後定時器再將消息投遞到 broker。

3.4 Kafka 實現

Kafka 本身並沒有延時隊列,不過可以通過生產者攔截器來實現消息延時發送,也可以定義延時 Topic,利用類似 RocketMQ 的方案來實現延時消息。

4 事務消息

事務消息是指生產消息和消費消息滿足事務的特性。

RabbitMQ 和 Kafka 的事務消息都是隻支持生產消息的事務特性,即一批消息要不全部發送成功,要不全部發送失敗。

RabbitMQ 通過 Channel 來開啓事務消息,代碼如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
//開啓事務
channel.txSelect();
channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8"));
//提交事務 或者 channel.txRollback()回滾事務
channel.txCommit();

Kafka 可以給多個生產者設置同一個事務 ID ,從而把多個 Topic 、多個 Partition 放在一個事務中,實現原子性寫入。

Pulsar 的事務消息對於事務語義的定義是:允許事件流應用將消費、處理、生產消息整個過程定義爲一個原子操作。可見,Pulsar 的事務消息可以覆蓋消息流整個過程。

RocketMQ 的事務消息是通過 half 消息來實現的。以電商購物場景來看,賬戶服務扣減賬戶金額後,發送消息給 Broker,庫存服務來消費這條消息進行扣減庫存。如下圖:

可見,RocketMQ 只能保證生產者發送消息和本地事務的原子性,並不能保證消費消息的原子性。

5 軌跡消息

軌跡消息主要用於跟蹤消息的生命週期,當消息丟失時可以很方便地找出原因。

軌跡消息也跟普通消息一樣,也需要存儲和查詢,也會佔用消息隊列的資源,所以選擇軌跡消息要考慮下面幾點:

RabbitMQ Broker 實現了軌跡消息的功能,打開 Trace 開關,就可以把軌跡消息發送到 amq.rabbitmq.trace 這個 exchange,但是要考慮軌跡消息會不會給 Broker 造成 壓力進而導致消息積壓。RabbitMQ 的生產者和消費者都沒有實現軌跡消息,需要開發者自己來實現。

RocketMQ 生產者、Broker 和消費者都實現了軌跡消息,不過默認是關閉的,需要手工開啓。

使用軌跡消息,需要考慮記錄哪些節點、存儲介質、性能、查詢方式等問題。

6 死信隊列

在消息隊列中,死信隊列主要應對一些異常的情況,如下圖:

RocketMQ 實現了消費端的死信隊列,當消費者消費失敗時,會進行重試,如果重試 16 次還是失敗,則這條消息會被髮送到死信隊列。

RabbitMQ 實現了生產者和 Broker 的死信隊列,下面三種情況,消息會被髮送到死信隊列:

RabbitMQ 消息變成死信消息後,會被髮送到死信交換機(Dead-Letter-Exchange)。

7 優先級消息

有一些業務場景下,我們需要優先處理一些消息,比如銀行裏面的金卡客戶、銀卡客戶優先級高於普通客戶,他們的業務需要優先處理。如下圖:

主流消息隊列中,RabbitMQ 是支持優先級隊列的,代碼如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
Map<String, Object> args = new HashMap<String, Object>();
//設置優先級爲 5
args.put("x-max-priority", 5);
channel.queueDeclare("my-priority-queue", true, false, false, args);

8 總結

消息隊列技術選型,要考慮的因素很多,本文主要從業務場景來分析需要考慮的因素,同時技術上也需要考慮運維複雜度、業務規模、社區活躍度、學習成本等因素。希望本文對你使用消息隊列有所幫助。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/RBvaqp6gen7CgeTqu9fQKQ