5 張圖帶你徹底理解 RocketMQ 軌跡消息

大家好,我是君哥。

爲了方便跟蹤消息發送和消費的軌跡,RocketMQ 引入了軌跡消息,今天一起來學習一下。

1 開啓軌跡消息

默認情況下,RocketMQ 是不開啓軌跡消息的,需要我們手工開啓。

1.1 Broker

Broker 端開啓軌跡消息,需要增加下面的配置:

traceTopicEnable=true

1.2 生產者

對於生產者端,要開啓軌跡消息,需要在定義生產者時增加參數。定義消費者使用類 DefaultMQProducer,這個類支持開啓軌跡消息的構造函數如下:

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic)

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic)

從上面的構造函數可以看出,自定義消費者時,不僅可以定義開啓軌跡消息,還可以指定軌跡消息發送的 Topic。如果不指定軌跡消息的 Topic,默認發送的 Topic 是 RMQ_SYS_TRACE_TOPIC。

1.3 消費者

對於消費者,要開啓軌跡消息,需要在定義消費者時增加參數。定義消費者使用類 DefaultMQPushConsumer,這個類支持開啓軌跡消息的構造函數如下:

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace)

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic)

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic)

2 生產者處理

首先看一個支持軌跡消息的生產者示例:

DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true, "");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

下面是一張生產者端的 UML 類圖:

在 DefaultMQProducer 創建時,會初始化 defaultMQProducerImpl、traceDispatcher 和鉤子函數 SendMessageHook。

2.1 生產者初始化

生產者初始化代碼如下:

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
 boolean enableMsgTrace, final String customizedTraceTopic) {
 this.namespace = namespace;
 this.producerGroup = producerGroup;
 defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
 if (enableMsgTrace) {
  try {
   AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
   dispatcher.setHostProducer(this.defaultMQProducerImpl);
   traceDispatcher = dispatcher;
   //註冊軌跡消息鉤子函數
   this.defaultMQProducerImpl.registerSendMessageHook(
    new SendMessageTraceHookImpl(traceDispatcher));
   //省略事務消息的鉤子註冊
  } catch (Throwable e) {
  }
 }
}

初始化的代碼中,傳入了是否開啓軌跡消息(enableMsgTrace)和自定義軌跡消息的 Topic(customizedTraceTopic),同時初始化了 traceDispatcher 並註冊了鉤子函數 SendMessageTraceHook。

生產者啓動時 defaultMQProducerImpl 和 traceDispatcher 也會啓動,代碼如下:

public void start() throws MQClientException {
 this.setProducerGroup(withNamespace(this.producerGroup));
 this.defaultMQProducerImpl.start();
 if (null != traceDispatcher) {
  try {
   traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  } catch (MQClientException e) {
   log.warn("trace dispatcher start failed ", e);
  }
 }
}

2.2 traceDispatcher 啓動

生產者初始化的時候初始化了 traceDispatcher。traceDispatcher 是軌跡消息的處理器,AsyncTraceDispatcher 構造函數定義一個專門發送軌跡消息的生產者 traceProducer(DefaultMQProducer 類型)。

注意:traceProducer 發送消息的最大值 maxMessageSize 是 128k,雖然 maxMessageSize 初始值被定義爲 4M,但是創建 traceProducer 時賦值 128k。

上面提到,生產者啓動時 traceDispatcher 也會啓動,看一下它的啓動方法:

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
 if (isStarted.compareAndSet(false, true)) {
  traceProducer.setNamesrvAddr(nameSrvAddr);
  traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
  traceProducer.start();
 }
 this.accessChannel = accessChannel;
 this.worker = new Thread(new AsyncRunnable()"MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
 this.worker.setDaemon(true);
 this.worker.start();
 this.registerShutDownHook();
}

可以看到,traceDispatcher 的啓動首先啓動了 traceProducer,然後啓動了一個異步線程 AsyncRunnable,下面看一下 run 方法:

public void run() {
 while (!stopped) {
     //batchSize=100
  List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
  //traceContextQueue隊列長度等於1024
  synchronized (traceContextQueue) {
   for (int i = 0; i < batchSize; i++) {
    TraceContext context = null;
    try {
     //get trace data element from blocking Queue - traceContextQueue
     context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
    }
    if (context != null) {
     contexts.add(context);
    } else {
     break;
    }
   }
   if (contexts.size() > 0) {
    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
    traceExecutor.submit(request);
   } else if (AsyncTraceDispatcher.this.stopped) {
    this.stopped = true;
   }
  }
 }
}

從上面的代碼可以看到,每次從 traceContextQueue 中拉取 100 條 TraceContext,然後通過 AsyncAppenderRequest 異步發送出去。

注意:

  1. 發送軌跡消息時需要組裝消息進行批量發送,每次發送的消息大小不超過 128k;

  2. 如果保存軌跡消息的 Broker 有多個,則需要按照輪詢的方式依次發送到不同的 Broker 上,具體代碼見 AsyncTraceDispatcher 類中的 sendTraceDataByMQ 方法。

2.3 鉤子函數

看到這裏相信你一定會有一個疑問,traceContextQueue 中的消息是從哪兒來的呢?答案是生產者初始化時定義的 SendMessageTraceHook。

看一下發送消息的代碼:

//DefaultMQProducerImpl 類
private SendResult sendKernelImpl(final Message msg,
 final MessageQueue mq,
 final CommunicationMode communicationMode,
 final SendCallback sendCallback,
 final TopicPublishInfo topicPublishInfo,
 final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
 //省略部分代碼
 SendMessageContext context = null;
 if (brokerAddr != null) {
  try {
            //省略部分代碼
   if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    //1.發送消息前執行鉤子函數
    this.executeSendMessageHookBefore(context);
   }

   SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
   //省略requestHeader封裝代碼
   SendResult sendResult = null;
   //-------------2.這裏發送消息-------------
   if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    //3.發送消息後執行鉤子函數
    this.executeSendMessageHookAfter(context);
   }

   return sendResult;
  } 
  //catch和finally省略
 }
 throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

由於 sendKernelImpl 代碼比較多,我這裏只貼了骨架代碼。我在上面加了註釋,可以看到在發送消息前後都會執行鉤子函數。

在發送消息前,通過調用鉤子函數封裝一個軌跡消息。發送消息後,再通過鉤子函數對軌跡消息進行完善,主要加入消息發送結果、發送消息花費時間等屬性,然後把軌跡消息加到 traceContextQueue 上。軌跡消息包含的內容如下圖:

軌跡消息的內容比較多,包含了發送消息的詳細信息,比如:Topic、Message、MessageQueue、Group、生產者地址(clientHost)、消息發送結果等。

3 Broker 處理

軌跡消息發送到 Broker 後,會保存到 Broker 上,默認保存的 Topic 是 RMQ_SYS_TRACE_TOPIC。Broker 啓動時,會自動初始化默認 Topic 的路由配置,代碼如下:

//TopicConfigManager 類
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
 String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
 TopicConfig topicConfig = new TopicConfig(topic);
 TopicValidator.addSystemTopic(topic);
 topicConfig.setReadQueueNums(1);
 topicConfig.setWriteQueueNums(1);
 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}

前面提到過,生產者也可以自己定義軌跡消息 Topic,不過需要在 Broker 上提前創建好自定義的 Topic。

如果想要軌跡消息和業務消息隔離,可以專門用一個 Broker 來保存軌跡消息,這樣需要單獨在這個 Broker 上開啓軌跡消息。

4 消費端處理

消費端對軌跡消息的處理跟生產端非常類似。首先我們看一下消費端處理的 UML 類圖:

我們以推模式處理併發消息爲例,ConsumeMessageConcurrentlyService 在消費消息前,通過 DefaultMQPushConsumerImpl 調用了鉤子函數 executeHookBefore,消費消息後通過 DefaultMQPushConsumerImpl 調用了鉤子函數 executeHookAfter。代碼如下:

//ConsumeMessageConcurrentlyService 類
public void run() {
 //省略部分邏輯
 ConsumeMessageContext consumeMessageContext = null;
 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  consumeMessageContext = new ConsumeMessageContext();
  consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
  consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
  consumeMessageContext.setProps(new HashMap<String, String>());
  consumeMessageContext.setMq(messageQueue);
  consumeMessageContext.setMsgList(msgs);
  consumeMessageContext.setSuccess(false);
  //1.消費消息前執行鉤子函數
  ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
 }
    //省略部分邏輯
 try {
    //2.消費消息
  status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {
 }
 //省略部分邏輯
 if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  consumeMessageContext.setStatus(status.toString());
  consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
  //3.消費消息前執行鉤子函數
  ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
 }
    //省略部分邏輯
}

如果消費端開啓軌跡消息,就會初始化 traceDispatcher 並且註冊鉤子函數。

if (enableMsgTrace) {
 try {
  AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
  dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
  traceDispatcher = dispatcher;
  this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
   new ConsumeMessageTraceHookImpl(traceDispatcher));
 } catch (Throwable e) {
  log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
 }
}

可以看到,traceDispatcher 跟生產者使用的都是 AsyncTraceDispatcher,處理邏輯完全一樣。

同樣,鉤子函數的使用跟生產者也類似,在消費消息之前調用鉤子函數(executeHookBefore)封裝軌跡消息,在消費消息之後再次調用鉤子函數(executeHookAfter)完善軌跡消息。消費端軌跡消息的內容如下圖:

5 總結

本文主要講解了 RocketMQ 的軌跡消息實現機制。軌跡消息分爲生產端和消費端的軌跡消息,生產端和消費端 RocketMQ 都提供了構造函數來指定是否開啓軌跡消息。通過鉤子函數,把軌跡消息加入隊列,也就是變量 traceContextQueue,而 traceDispatcher 則以 100 條爲單位不停地從隊列中拉取消息進行組裝併發送到 Broker。如下圖:

理解了 traceDispatcher 和鉤子函數 ,就很容易理解 RocketMQ 軌跡消息的處理邏輯了。

在 Broker 端,則通過增加配置參數 traceTopicEnable 來指定是否存儲軌跡消息。

君哥聊技術 後端架構師,定期分享技術乾貨,包括後端開發、分佈式、中間件、雲原生等。同時也會分享職場心得、程序人生。關注我,一起進階。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/G0bfuLzonDPncoVC_dx1IA