日常工作,MQ 的 8 種常用使用場景
前言
大家好,我是田螺。
我們日常開發中,經常跟 MQ(消息隊列) 打交道。本文田螺哥梳理了 MQ 的 8 種使用場景。
- 異步處理
面試官在問我們 MQ 作用時,很多夥伴馬上想到異步處理、解耦、流量削鋒等等。
MQ 最常見的應用場景之一就是異步處理。
比如,在用戶註冊場景中,當用戶信息保存成功後,系統需要發送一個短信、或者郵箱消息,通知用戶註冊成功。如果這個短信或者郵箱消息發送比較耗時,則可能拖垮註冊接口。又或者如果調用第三方短信、郵件發送接口失敗,也會影響註冊接口。一般我們不希望一個通知類的功能,去影響註冊主流程,這時候,則可以使用 MQ 來實現異步處理。
簡要代碼如下:先保存用戶信息,然後發送註冊成功的 MQ 消息
// 用戶註冊方法
public void registerUser(String username, String email, String phoneNumber) {
// 保存用戶信息(簡化版)
userService.add(buildUser(username,email,phoneNumber))
// 發送消息
String registrationMessage = "User " + username + " has registered successfully.";
// 發送消息到隊列
rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
}
消費者從隊列中讀取消息併發送短信或郵件:
@Service
public class NotificationService {
// 監聽消息隊列中的消息併發送短信/郵件
@RabbitListener(queues = "registrationQueue")
public void handleRegistrationNotification(String message) {
// 這裏可以進行短信或郵件的發送操作
System.out.println("Sending registration notification: " + message);
// 假設這裏是發送短信的操作
sendSms(message);
// 也可以做其他通知(比如發郵件等)
sendEmail(message);
}
}
- 解耦
在微服務架構中,各個服務通常需要進行相互通信。使用 MQ 可以幫助解耦服務,避免直接調用導致的強耦合。
一個電商平臺的庫存服務和支付服務。支付服務在處理支付後,需要向庫存服務發送扣庫存的請求,但不直接調用 API,而是通過 MQ 發送消息,讓庫存服務異步處理。
支付服務在支付成功後將消息發送到 RocketMQ:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class PaymentService {
private DefaultMQProducer producer;
public PaymentService() throws Exception {
producer = new DefaultMQProducer("PaymentProducerGroup");
producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer 地址
producer.start();
}
public void processPayment(String orderId, int quantity) throws Exception {
// 1. 模擬調用支付接口(例如:支付寶、微信支付等)
boolean paymentSuccessful = callPayment(orderId, quantity);
if (paymentSuccessful) {
// 2. 支付成功後,創建支付消息併發送到 RocketMQ
String messageBody = "OrderId: " + orderId + ", Quantity: " + quantity;
Message message = new Message("paymentTopic", "paymentTag", messageBody.getBytes());
producer.send(message);
}
}
}
庫存服務從 RocketMQ 中消費支付消息,並處理扣庫存的邏輯:
public class InventoryService {
private DefaultMQPushConsumer consumer;
public InventoryService() throws Exception {
consumer = new DefaultMQPushConsumer("InventoryConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("paymentTopic", "paymentTag");
// 消息監聽器
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String messageBody = new String(msg.getBody());
// 執行扣庫存操作
reduceStock(messageBody);
}
return null; // 返回消費成功
});
consumer.start();
System.out.println("InventoryService started...");
}
}
- 流量削鋒
在高併發的情況下,有些請求可能會產生瞬時流量峯值,直接處理可能會導致服務過載。比如:
-
春運快到了,12306 的搶票就是這種案例。
-
又或者雙 12 這種大促,訂單壓力會比較大。
-
秒殺的時候,也需要避免流量暴漲,打垮應用系統的風險
這些場景,我們都可以使用 MQ 來進行流量的削峯填谷,確保系統平穩運行。
假設秒殺系統每秒最多可以處理 2k 個請求,每秒卻有 5k 的請求過來,可以引入消息隊列,秒殺系統每秒從消息隊列拉 2k 請求處理得了。
- 延時任務
在電商平臺的訂單處理中,如果用戶下單後一定時間內未支付,需要自動取消訂單。通過 MQ 的延時隊列功能,可以設置消息延遲消費的時間,當消息到達延遲時間後,由消費者處理取消訂單的邏輯。
當用戶下單時,生成一個訂單併發送一條延遲消息到 RocketMQ。延遲時間可以根據訂單的超時時間設置:
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) {
// 保存訂單邏輯(省略)
// 計算延遲時間(單位:毫秒)
long delay = order.getTimeout();
// 發送延遲消息
rocketMQTemplate.syncSend("orderCancelTopic:delay" + delay,
MessageBuilder.withPayload(order).build(),
10000, // 消息發送超時時間(單位:毫秒)
(int) (delay / 1000) // RocketMQ的延遲級別是以秒爲單位的,因此需要轉換爲秒
);
}
}
注意:RocketMQ 的延遲級別是固定的,如 1s、5s、10s 等。如果訂單的延遲時間不是 RocketMQ 支持的延遲級別的整數倍,那麼消息將不會精確地在預期的延遲時間後被消費。爲了解決這個問題,你可以選擇最接近的延遲級別,或者根據業務需求進行適當的調整。
創建一個用來消費延遲消息的消費者,處理取消訂單的邏輯。例如:
@Component
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "order-cancel-consumer-group")
public class OrderCancelListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 取消訂單邏輯
// 檢查訂單狀態,如果訂單仍處於未支付狀態則進行取消
System.out.println("Cancelling order: " + order.getOrderId());
// (省略實際的取消訂單邏輯)
}
}
- 日誌收集
消息隊列常用於日誌系統中,將應用生成的日誌異步地發送到日誌處理系統,進行統一存儲和分析。
假設你有一個微服務架構,每個微服務都會生成日誌。你可以將這些日誌通過消息隊列(如 Kafka)發送到一個集中式的日誌收集系統(如 ELK(Elasticsearch, Logstash, Kibana) 或 Fluentd),從而實現日誌的統一管理。
生產者(發送日誌到 Kafka)
// 配置和發送日誌到 Kafka 主題 "app-logs"
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
String logMessage = "{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}";
producer.send(new ProducerRecord<>("app-logs", "log-key", logMessage));
消費者(收集日誌信息)
@Service
public class LogConsumer {
// 使用 @KafkaListener 註解來消費 Kafka 中的日誌
@KafkaListener(topics = "app-logs", groupId = "log-consumer-group")
public void consumeLog(String logMessage) {
// 打印或處理收到的日誌
System.out.println("Received log: " + logMessage);
}
}
- 分佈式事務
業界經常使用MQ
來實現分佈式事務。
我舉個下訂單的場景,使用 MQ 實現分佈式事務的例子吧。
我們先來看,一條普通的 MQ 消息,從產生到被消費,大概流程如下:
-
生產者產生消息,發送帶 MQ 服務器
-
MQ 收到消息後,將消息持久化到存儲系統。
-
MQ 服務器返回 ACk 到生產者。
-
MQ 服務器把消息 push 給消費者
-
消費者消費完消息,響應 ACK
-
MQ 服務器收到 ACK,認爲消息消費成功,即在存儲中刪除消息。
回到下訂單這個例子,訂單系統創建完訂單後,再發送消息給下游系統。如果訂單創建成功,然後消息沒有成功發送出去,下游系統就無法感知這個事情,出導致數據不一致。
這時候就可以使用 MQ 實現分佈式事務消息。大家看下這個流程:
-
生產者產生消息,發送一條半事務消息到 MQ 服務器
-
MQ 收到消息後,將消息持久化到存儲系統,這條消息的狀態是待發送狀態。
-
MQ 服務器返回 ACK 確認到生產者,此時 MQ 不會觸發消息推送事件
-
生產者執行本地事務
-
如果本地事務執行成功,即 commit 執行結果到 MQ 服務器;如果執行失敗,發送 rollback。
-
如果是正常的 commit,MQ 服務器更新消息狀態爲可發送;如果是 rollback,即刪除消息。
-
如果消息狀態更新爲可發送,則 MQ 服務器會 push 消息給消費者。消費者消費完就回 ACK。
-
如果 MQ 服務器長時間沒有收到生產者的 commit 或者 rollback,它會反查生產者,然後根據查詢到的結果執行最終狀態。
-
遠程調用
我以前公司(微衆)基於 MQ(RocketMQ),自研了遠程調用框架。
RocketMQ 作爲遠程調用框架,主要就是金融場景的適配性。
-
消息查詢功能:RocketMQ 提供了消息查詢功能,方便微衆銀行在需要時進行消息對賬或問題排查。
-
金融級穩定性:RocketMQ 在金融領域的應用非常廣泛,得到了衆多金融機構的認可。其穩定性和可靠性能夠滿足微衆銀行對金融級消息服務的需求。
還有可以基於 RocketMQ 的定製開發:多中心多活、灰度發佈、流量權重與消息去重、背壓模式(能夠根據後續服務的治理能力決定拉取的消息數量,確保系統的穩定運行。)
- 廣播通知:事件驅動的消息通知
消息隊列(MQ) 可以非常適合用於 廣播通知。在廣播通知場景中,消息隊列可以將消息推送到多個訂閱者,讓不同的服務或者應用接收到通知。
系統通知:向所有用戶廣播應用更新、系統維護、公告通知等。
事件驅動的消息通知:如庫存更新、用戶狀態變化、訂單支付成功等事件通知,多個系統可以訂閱這個事件。
針對事件驅動的消息通知,我們以 訂單支付成功 事件爲例,假設多個系統(如庫存管理系統、用戶積分系統、財務系統等)都需要監聽這個事件來進行相應處理。
當訂單支付成功 事件發生時,系統會通過消息隊列廣播一個事件通知(比如消息內容是訂單 ID、支付金額等),其他系統可以根據這個事件來執行相應的操作,如:
-
庫存系統:根據訂單信息減少庫存。
-
用戶積分系統:增加用戶積分。
-
財務系統:記錄支付流水。
發送訂單支付成功事件:
// 創建訂單支付成功事件消息
String orderEventData = "{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}";
Message msg = new Message("order_event_topic", "order_payment_success", orderEventData.getBytes());
// 發送消息
producer.send(msg);
事件消費者(接收並處理訂單支付成功事件):
- 庫存系統:
// 註冊消息監聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Inventory system received: " + eventData);
// 處理庫存減少邏輯
// 解析消息(假設是 JSON 格式)
// updateInventory(eventData); // 假設調用庫存更新方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
- 積分系統:
// 註冊消息監聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Points system received: " + eventData);
// 處理用戶積分增加邏輯
// updateUserPoints(eventData); // 假設調用積分更新方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
- 財務系統:
// 註冊消息監聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Finance system received: " + eventData);
// 處理財務記錄邏輯
// recordPaymentTransaction(eventData); // 假設調用財務記錄方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/sMoAFNsy03zS9OzvpHoAAw