3 張圖帶你徹底理解 RocketMQ 事務消息

大家好,我是君哥。

事務消息是分佈式事務的一種解決方案,RocketMQ 有成熟的事務消息模型,今天就來聊一聊 RocketMQ 事務消息實現機制。

假如有一個電商場景,用戶下單後,賬戶服務從用戶賬戶上扣減金額,然後通知庫存服務給用戶發貨,這兩個服務需要在一個分佈式事務內完成。

這時,賬戶服務作爲 Producer,庫存服務作爲 Consumer,見下面消息流程:

  1. 賬戶服務作爲 Producer 向 Broker 發送一條 half 消息;

  2. half 消息發送成功後,執行本地事務,執行成功則向 Broker 發送 commit 請求,否則發送 rollback 請求;

  3. 如果 Broker 收到的是 rollback 請求,則刪除保存的 half 消息;

  4. 如果 Broker 收到的是 commit 請求,則保存扣減庫存消息(這裏的處理是把消息從 half 隊列投遞到真實的隊列),然後刪除保存的 half 消息;

  5. 如果 Broker 沒有收到請求,則會發送請求到 Producer 查詢本地事務狀態,然後根據 Producer 返回的本地狀態做 commit/rollback 相關處理。

1 half 消息

上面電商的案例中,RocketMQ 解決分佈式事務的第一步是賬戶服務發送 half 消息。

首先看官網一個發送事務消息的示例:

public static void main(String[] args) throws MQClientException, InterruptedException {
 TransactionListener transactionListener = new TransactionListenerImpl();
 TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  @Override
  public Thread newThread(Runnable r) {
   Thread thread = new Thread(r);
   thread.setName("client-transaction-msg-check-thread");
   return thread;
  }
 });

 producer.setExecutorService(executorService);
 producer.setTransactionListener(transactionListener);
 producer.start();

 String[] tags = new String[] {"TagA""TagB""TagC""TagD""TagE"};
 for (int i = 0; i < 10; i++) {
  try {
   Message msg =
    new Message("TopicTest1234", tags[i % tags.length]"KEY" + i,
     ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   SendResult sendResult = producer.sendMessageInTransaction(msg, null);
   System.out.printf("%s%n", sendResult);

   Thread.sleep(10);
  } catch (MQClientException | UnsupportedEncodingException e) {
   e.printStackTrace();
  }
 }

 for (int i = 0; i < 100000; i++) {
  Thread.sleep(1000);
 }
 producer.shutdown();
}

上面的代碼中 Producer 有一個 TransactionListener 屬性,這個由開發者通過實現這個接口來自己定義。這個接口有兩個方法:

下面代碼是發送事務消息的方法:

//類 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
 final LocalTransactionExecuter localTransactionExecuter, final Object arg)
 throws MQClientException {
 TransactionListener transactionListener = getCheckListener();
 //省略驗證邏輯

 SendResult sendResult = null;
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
 try {
  sendResult = this.send(msg);
 } catch (Exception e) {
  throw new MQClientException("send message Exception", e);
 }

 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 //省略發送結果處理
 try {
  this.endTransaction(msg, sendResult, localTransactionState, localException);
 } catch (Exception e) {
 }

 TransactionSendResult transactionSendResult = new TransactionSendResult();
 //省略封裝屬性
 return transactionSendResult;
}

從這段代碼中看到,在發送消息前,給消息封裝了一個屬性 PROPERTY_TRANSACTION_PREPARED,通過這個屬性可以找到 Broker 端的處理。

Broker 保存 half 消息時,把消息 topic 改爲 RMQ_SYS_TRANS_HALF_TOPIC,然後把消息投遞到 queueId 等於 0 的隊列。投遞成功後給 Producer 返回 PutMessageStatus.PUT_OK。代碼如下:

public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
 return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
  String.valueOf(msgInner.getQueueId()));
 msgInner.setSysFlag(
  MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
 msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
 msgInner.setQueueId(0);
 msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
 return msgInner;
}

2 執行本地事務

上一節講到,Producer 發送事務消息時,會給一個 transactionListener,發送 half 消息成功後,會通過 transactionListener 的 executeLocalTransactionBranch 提交本地事務,代碼如下:

//類 DefaultMQProducerImpl
public TransactionSendResult sendMessageInTransaction(final Message msg,
 final LocalTransactionExecuter localTransactionExecuter, final Object arg)
 throws MQClientException {
 //省略部分代碼
 SendResult sendResult = null;
 //省略部分代碼
 try {
  sendResult = this.send(msg);
 } catch (Exception e) {
  throw new MQClientException("send message Exception", e);
 }

 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
 Throwable localException = null;
 switch (sendResult.getSendStatus()) {
  case SEND_OK: {
   try {
    //省略部分代碼
    if (null != localTransactionExecuter) {
        //這個分支已經廢棄
     localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
    } else if (transactionListener != null) {
     //執行本地事務
     localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
    }
    //省略部分代碼
   } catch (Throwable e) {
   }
  }
  break;
  //省略其他 case
 }

 try {
  this.endTransaction(msg, sendResult, localTransactionState, localException);
 } catch (Exception e) {
  log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
 }
    //省略部分代碼
 return transactionSendResult;
}

從上面代碼中可以看到,本地事務執行結束後,會調用一個 endTransaction 方法,這個就是向 Broker 發送 commit/rollback,也可能發送 UNKNOW,封裝到 requestHeader 的 commitOrRollback 的屬性中。這個請求的請求碼是 END_TRANSACTION。

3 commit/rollback 處理

根據請求碼 END_TRANSACTION 可以找到 Broker 端對事務消息的處理。代碼如下:

//EndTransactionProcessor 類
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
 RemotingCommandException {
 //省略部分邏輯
 OperationResult result = new OperationResult();
 if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
     //查找出 half 消息
  result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
      //檢查 groupId 和消息偏移量是否合法
   RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
   if (res.getCode() == ResponseCode.SUCCESS) {
    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
    //刪除PROPERTY_TRANSACTION_PREPARED屬性
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
    RemotingCommand sendResult = sendFinalMessage(msgInner);
    if (sendResult.getCode() == ResponseCode.SUCCESS) {
        //刪除 half 消息
     this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
    }
    return sendResult;
   }
   return res;
  }
 } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
  result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
  if (result.getResponseCode() == ResponseCode.SUCCESS) {
   RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
   if (res.getCode() == ResponseCode.SUCCESS) {
    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
   }
   return res;
  }
 }
 response.setCode(result.getResponseCode());
 response.setRemark(result.getResponseRemark());
 return response;
}

這段代碼邏輯很清晰,首先查找出 half 消息,然後對查找出的消息進行檢查(groupId 和消息偏移量是否合法),如果是 commit,則去除事務消息準備階段屬性,重新把消息投遞到原始隊列,然後刪除 half 消息。如果是 rollback,則直接刪除 half 消息。

注意:對於 UNKNOW 的類型,這裏直接返回 null,上面代碼沒有貼出來。

4 check 事務狀態

Broker 初始化的時候,會初始化一個 TransactionalMessageServiceImpl 線程,這個線程會定時檢查過期的消息,通過向 Producer 發送 check 消息來獲取事務狀態。代碼如下:

//TransactionalMessageCheckService
protected void onWaitEnd() {
    //超時時間,默認 6s
 long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
 //最大檢查次數,默認 15
 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
 long begin = System.currentTimeMillis();
 log.info("Begin to check prepare message, begin time:{}", begin);
 this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
 log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

這裏有兩個參數需要注意:

檢查事務消息的流程如下:

Producer 收到 check 消息後,最終調用 TransactionListener 中定義的 checkLocalTransaction 方法,查詢本地事務執行狀態,然後發送給 Broker。

需要注意的是,check 消息發送給 Broker 時,會在請求 Header 中給 fromTransactionCheck 屬性賦值爲 true,以標記是 check 消息。

Broker 收到 check 響應消息後,處理邏輯跟第 3 節的處理邏輯一樣,唯一不同的是,這裏針對 check 消息和非 check 消息打印了不同的日誌。

5 總結

從上面代碼的分析可以看到,RocketMQ 的事務消息實現機制非常簡潔。使用事務消息時自己定義 TransactionListener,實現執行本地事務 executeLocalTransaction 和檢查本地事務狀態 checkLocalTransaction 這兩個方法,然後使用 TransactionMQProducer 進行發送。

最後,附一張 Producer 端的 UML 類圖:

君哥聊技術 後端架構師,定期分享技術乾貨,包括後端開發、分佈式、中間件、雲原生等。同時也會分享職場心得、程序人生。關注我,一起進階。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QyiJ4tdjcv2-doHnAtFOLQ