3 張圖帶你徹底理解 RocketMQ 事務消息
大家好,我是君哥。
事務消息是分佈式事務的一種解決方案,RocketMQ 有成熟的事務消息模型,今天就來聊一聊 RocketMQ 事務消息實現機制。
假如有一個電商場景,用戶下單後,賬戶服務從用戶賬戶上扣減金額,然後通知庫存服務給用戶發貨,這兩個服務需要在一個分佈式事務內完成。
這時,賬戶服務作爲 Producer,庫存服務作爲 Consumer,見下面消息流程:
-
賬戶服務作爲 Producer 向 Broker 發送一條 half 消息;
-
half 消息發送成功後,執行本地事務,執行成功則向 Broker 發送 commit 請求,否則發送 rollback 請求;
-
如果 Broker 收到的是 rollback 請求,則刪除保存的 half 消息;
-
如果 Broker 收到的是 commit 請求,則保存扣減庫存消息(這裏的處理是把消息從 half 隊列投遞到真實的隊列),然後刪除保存的 half 消息;
-
如果 Broker 沒有收到請求,則會發送請求到 Producer 查詢本地事務狀態,然後根據 Producer 返回的本地狀態做 commit/rollback 相關處理。
1 half 消息
上面電商的案例中,RocketMQ 解決分佈式事務的第一步是賬戶服務發送 half 消息。
首先看官網一個發送事務消息的示例:
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
上面的代碼中 Producer 有一個 TransactionListener 屬性,這個由開發者通過實現這個接口來自己定義。這個接口有兩個方法:
-
提交本地事務 executeLocalTransaction
-
檢查本地事務狀態 checkLocalTransaction
下面代碼是發送事務消息的方法:
//類 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
//省略驗證邏輯
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
//省略發送結果處理
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
//省略封裝屬性
return transactionSendResult;
}
從這段代碼中看到,在發送消息前,給消息封裝了一個屬性 PROPERTY_TRANSACTION_PREPARED,通過這個屬性可以找到 Broker 端的處理。
Broker 保存 half 消息時,把消息 topic 改爲 RMQ_SYS_TRANS_HALF_TOPIC,然後把消息投遞到 queueId 等於 0 的隊列。投遞成功後給 Producer 返回 PutMessageStatus.PUT_OK。代碼如下:
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
2 執行本地事務
上一節講到,Producer 發送事務消息時,會給一個 transactionListener,發送 half 消息成功後,會通過 transactionListener 的 executeLocalTransactionBranch 提交本地事務,代碼如下:
//類 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//省略部分代碼
SendResult sendResult = null;
//省略部分代碼
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//省略部分代碼
if (null != localTransactionExecuter) {
//這個分支已經廢棄
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
//執行本地事務
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
//省略部分代碼
} catch (Throwable e) {
}
}
break;
//省略其他 case
}
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//省略部分代碼
return transactionSendResult;
}
從上面代碼中可以看到,本地事務執行結束後,會調用一個 endTransaction 方法,這個就是向 Broker 發送 commit/rollback,也可能發送 UNKNOW,封裝到 requestHeader 的 commitOrRollback 的屬性中。這個請求的請求碼是 END_TRANSACTION。
3 commit/rollback 處理
根據請求碼 END_TRANSACTION 可以找到 Broker 端對事務消息的處理。代碼如下:
//EndTransactionProcessor 類
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
//省略部分邏輯
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//查找出 half 消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//檢查 groupId 和消息偏移量是否合法
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
//刪除PROPERTY_TRANSACTION_PREPARED屬性
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//刪除 half 消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
這段代碼邏輯很清晰,首先查找出 half 消息,然後對查找出的消息進行檢查(groupId 和消息偏移量是否合法),如果是 commit,則去除事務消息準備階段屬性,重新把消息投遞到原始隊列,然後刪除 half 消息。如果是 rollback,則直接刪除 half 消息。
注意:對於 UNKNOW 的類型,這裏直接返回 null,上面代碼沒有貼出來。
4 check 事務狀態
Broker 初始化的時候,會初始化一個 TransactionalMessageServiceImpl 線程,這個線程會定時檢查過期的消息,通過向 Producer 發送 check 消息來獲取事務狀態。代碼如下:
//TransactionalMessageCheckService
protected void onWaitEnd() {
//超時時間,默認 6s
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
//最大檢查次數,默認 15
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
這裏有兩個參數需要注意:
-
事務消息超時時間,超時後會向 Producer 發送 check 消息檢查本地事務狀態,默認 6s;
-
最大檢查次數,Broker 每次向 Producer 發送 check 消息後檢查次數加 1,超過最大檢查次數後 half 消息被丟棄,默認最大檢查次數是 15;
注意:這裏的丟棄是把消息寫入了一個新的隊列,Topic 爲 TRANS_CHECK_MAX_TIME_TOPIC,queueId 爲 0。
-
文件保存時間,默認 72 小時。
檢查事務消息的流程如下:
Producer 收到 check 消息後,最終調用 TransactionListener 中定義的 checkLocalTransaction 方法,查詢本地事務執行狀態,然後發送給 Broker。
需要注意的是,check 消息發送給 Broker 時,會在請求 Header 中給 fromTransactionCheck 屬性賦值爲 true,以標記是 check 消息。
Broker 收到 check 響應消息後,處理邏輯跟第 3 節的處理邏輯一樣,唯一不同的是,這裏針對 check 消息和非 check 消息打印了不同的日誌。
5 總結
從上面代碼的分析可以看到,RocketMQ 的事務消息實現機制非常簡潔。使用事務消息時自己定義 TransactionListener,實現執行本地事務 executeLocalTransaction 和檢查本地事務狀態 checkLocalTransaction 這兩個方法,然後使用 TransactionMQProducer 進行發送。
最後,附一張 Producer 端的 UML 類圖:
君哥聊技術 後端架構師,定期分享技術乾貨,包括後端開發、分佈式、中間件、雲原生等。同時也會分享職場心得、程序人生。關注我,一起進階。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/QyiJ4tdjcv2-doHnAtFOLQ