RocketMQ 源碼詳解:事務消息、批量消息、延遲消息

概述

在上文中,我們討論了消費者對於消息拉取的實現,對於

這個黑盒的心臟部分,我們順着消息的發送流程已經將其剖析了大半部分。本章我們不妨乘勝追擊,接着討論各種不同的消息的原理與實現。

事務消息

概念

RocketMQ 中的事務消息功能,實際上是 分佈式事務中的本地事務表 的實現,只不過,在這裏用消息中間件來代替了數據庫,同時也幫我們做好了回查的操作。

在這點上,RocketMQ 和 Kafka 是截然不同的,kafka 的事務是用來實現 Exacltly Once 語義,且該語義主要用來流計算中,即在 "從 Topic 中讀 -> 計算 -> 存到 Topic" 保證不被重複計算。

事務流程

吐槽一下爲什麼要叫半消息 (half message),叫 prepare 消息不是更直觀嗎

  1. Broker 將 half 消息持久化

  2. 客戶端根據事務執行結果,發送 Commit / Rollback 消息

  3. Broker 收到 Commit 時,將事務消息對消費者可見。收到 Rollback 時,將消息丟棄

補償

源碼流程

第一步

在設置好了事務監聽器後(執行事務 與 事務回查),就可以發送事務消息

在將事務消息交給發送方法後,客戶端首先會爲消息添加事務消息的標識

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");

然後將該事務消息會像普通的同步消息一樣發送(且是同步發送)

sendResult = this.send(msg);

具體發送流程見:RocketMQ 源碼詳解 | Producer 篇 · 其一:Start,然後 Send 一條消息

第二步

在 Broker 端接收到消息以後,會走與普通消息相同的底層通道(因爲這個消息本身就只是個加上了 事務 flag 的普通消息),然後由 
TransactionalMessageService 來對這個消息進行額外處理。

首先會對該消息放入 real topic 屬性和 real queue 屬性,然後將消息 Topic 替換爲用於處理所有事務消息的特殊的 Topic,當然該 Topic 對消費者是不可見的。

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,                              String.valueOf(msgInner.getQueueId()));  // 設置標記爲未收到結果  msgInner.setSysFlag(    MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));  // 替換到特殊的 Topic (RMQ_SYS_TRANS_HALF_TOPIC)  msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());  msgInner.setQueueId(0);  msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));  return msgInner;}

完成後,會送到 MessageStore 像普通消息一樣處理

普通消息的具體流程見 RocketMQ 源碼詳解 | Broker 篇 · 其二:文件系統

第三步

回到 Producer 端,在事務消息發送完成後,該方法會使用專門的線程池執行事務

// 2.執行本地事務,更新事務獲取狀態localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);

然後對本地的事務執行狀態進行處理,也就是將該執行狀態上報

this.endTransaction(msg, sendResult, localTransactionState, localException);

這裏會發送一條 oneway 命令給 Broker 端,且使用的是 
RequestCode.END_TRANSACTION 請求碼

// 事務結果報告(可能是 commit 或 rollback)public static final int END_TRANSACTION = 37;

完成處理後,該方法會將事務的發送結果和本地事務的執行結構都返回給上層 API

第四步

在 Broker 端,這裏會由 EndTransactionProcessor 處理器來處理該請求碼

然後,根據事務的執行結果來做不同的處理

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {  // 事務執行成功,嘗試完成事務   // 獲取 half 消息  result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);  if (result.getResponseCode() == ResponseCode.SUCCESS) {    if (res.getCode() == ResponseCode.SUCCESS) {      // 將 half 消息取出,構造真實消息,然後投入實際上的 Topic      /* pass */            RemotingCommand sendResult = sendFinalMessage(msgInner);            if (sendResult.getCode() == ResponseCode.SUCCESS) {        /*         * 找到半消息,進行刪除         * 刪除並不是物理上的刪除,因爲物理上的刪除的代價十分的高昂,而是寫入一條具有相同事務id的消息到 op Topic         */        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());      }      return sendResult;    }    return res;  }}

如果需要回滾,則對相應的半消息進行刪除,且和上面一樣,並不是物理上的刪除,而是發送具有相同事務 id 的消息到 OP Topic,來標記這個事務已經完成了 (Commit/Rollback), OP Topic 也是一個特殊的 Topic,同樣對消費者不可見。

if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {  // 事務執行失敗,進行 half 消息的回滾   // 首先找到 half 消息  result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);  if (result.getResponseCode() == ResponseCode.SUCCESS) {    RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);    if (res.getCode() == ResponseCode.SUCCESS) {      // 進行刪除      this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());    }    return res;  }}

當這些都做完後,一次事務就完成了。

補償

當然啦,以上是順利的情況,我們當然不能指望事務每一次都能執行成功、網絡分區和宕機事件永遠不會發生。

在一段時間後,如果客戶端沒有對事務的狀態進行上報(或者上報的狀態不是 Commit 或 Rollback,而是 Unknown), Broker 端當然就要進行事務狀態的回查。

在 BrokerController 啓動的時候,會開啓事務狀態檢測服務,該服務會通過循環調用 
TransactionalMessageServiceImpl.check() 方法,不斷的掃描未結束的事務,同時對超過指定時間還不知道狀態的事務進行回查操作。

check() 方法是事務回查的核心,由於很長,我們先來看第一部分 (刪減了沒人在意的 Log)

// 首先找到存儲所有 half 消息的 TopicString topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);// 對其中每一個 queue 進行檢查for (MessageQueue messageQueue : msgQueues) {  long startTime = System.currentTimeMillis();   // 獲得對應的 op 消息所在的 queue  MessageQueue opQueue = getOpQueue(messageQueue);  // 獲取未處理的 half 消息的起始偏移量  long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);  // 獲取 op 消息的 queue 的起始偏移量  long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);   // 用來記錄已經被處理了的 op 消息的偏移量  List<Long> doneOpOffset = new ArrayList<>();  // 用來記錄已經完成了的 half 消息的偏移量  // key: halfOffset, value: opOffset  HashMap<Long, Long> removeMap = new HashMap<>();   PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);

在 fillOpRemoveMap 方法中,主要是將 op 消息取出,來標記可以被移除的 half 消息(op 消息的存在代表對應事務的結束)

/** * 讀取op消息,解析op消息,填充removeMap * * @param removeMap 要刪除的半消息,key: halfOffset,value: opOffset * @param opQueue Op message queue. * @param pullOffsetOfOp op message queue 的起始偏移量 * @param miniOffset half message queue 的當前最小偏移量 * @param doneOpOffset 存儲已處理的 op 消息 * @return 獲取到的 Op 消息 */private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,                                   MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {  // 首先通過 queue 獲取 op 消息,最大數量爲 32 條  PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);    /* pass: pullResult 消息的意外狀態的處理 */   List<MessageExt> opMsg = pullResult.getMsgFoundList();  for (MessageExt opMessageExt : opMsg) {    // op 消息的 body 存儲的是對應的 half 消息的偏移量, 現在將其取出    Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));    // 感覺這裏的 Tag 並沒有什麼意義,無論是 Commit 還是 Rollback 都會加入這個 Tag    if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {      // 在 已處理偏移量 之前的話則可直接放入 已處理偏移量集合      if (queueOffset < miniOffset) {        doneOpOffset.add(opMessageExt.getQueueOffset());      } else {        // 否則放入需要移除的 half 的消息的集合        removeMap.put(queueOffset, opMessageExt.getQueueOffset());      }    }  }  return pullResult;}

然後進入到 check 方法的第二部分

while (true) {  if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break;    // 推進最小已處理偏移量  if (removeMap.containsKey(i)) /* 如果該 half 消息存在對應的 op 消息,說明已經被處理了(commit/rollback) */ {    // 取出放入到已處理偏移量隊列    Long removedOpOffset = removeMap.remove(i);    doneOpOffset.add(removedOpOffset);   } else /* 否則說明當前 half 消息懸而未決  */ {    // 取出對應的半消息    GetResult getResult = getHalfMsg(messageQueue, i);		    /* pass: 半消息不存在時的意外處理 */     /*     * 檢測是否要丟棄或跳過     *   丟棄條件: 當前事務已經超過了最大回查次數(15次)     *   跳過條件: 已經超過了過期文件最大保留時間(72小時)     */    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {      // 處理並推進偏移量      // 具體的處理方法是: 投入 TRANS_CHECK_MAX_TIME_TOPIC 這個 Topic,等待手動處理      listener.resolveDiscardMsg(msgExt);            // 進入到下一個 half 消息      newOffset = i + 1;      i++;      continue;    }    if (msgExt.getStoreTimestamp() >= startTime) {      break;    }

上面的方法很好理解,只是對於已經被標記結束的事務的處理、和未結束事務的補足

接下來是第三部分,這裏將繼續對未結束事務的補足,與進行可能的回查操作

  // half 消息具有最小的檢查時間(免疫時間), 檢測時間以內可以跳過回查, 重新投入 half 消息的 Topic  long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();  long checkImmunityTime = transactionTimeout;  String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);  if (null != checkImmunityTimeStr) {    checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);    if (valueOfCurrentMinusBorn < checkImmunityTime) {      if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {        newOffset = i + 1;        i++;        continue;      }    }  } else {    if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {      break;    }  }    /*   * 對於當前事務的回查操作,需要滿足三個條件之一   *  1.當前 op 消息的集合爲空,且已經超過了最小檢查時間(免疫時間)   *  2.最大偏移量的 op 消息的生成時間 已經超過了 最小檢查時間   *  3.關閉最小檢查時間   */  List<MessageExt> opMsg = pullResult.getMsgFoundList();  boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)    || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))    || (valueOfCurrentMinusBorn <= -1);   if (isNeedCheck) {    // 先將當前 half 消息放回    if (!putBackHalfMsgQueue(msgExt, i)) {      continue;    }    // 然後向 Product 發送檢測消息    listener.resolveHalfMsg(msgExt);  } else {    // 否則更新 op 消息集合,以確保能夠斷言該 half 消息的狀態    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);    continue;  }} newOffset = i + 1; i++;}

上面這段代碼主要圍繞 "是否進行回查" 展開,且涉及到 "免疫時間"。

在一個事務消息被髮送後,對應事務的執行當然需要一定的執行時間,如果我們不設置這個時間立刻進行回查,那麼很有可能時候事務還沒執行完,對於大多數情況下還沒執行完的事務進行回查,毫無疑問帶來的收益很低。所以我們需要設定一個時間,在這個時間內的事務先暫時不回查,這個時間就叫做 "免疫時間"。

然後再來看下需要進行回查的三種情況:

  1. 當 op 消息的集合爲空,說明當前還沒有收到讓當前事務結束的通知,且超過了 "免疫時間",故回查

  2. 當前 op 消息最大偏移量的生成時間超過了 "免疫時間",說明該事務的提交消息可能丟失了,故回查

  3. 不啓用 "免疫時間"

其中發送的回查消息的請求碼爲 
RequestCode.CHECK_TRANSACTION_STATE ,發送的也是 oneway 消息

最後的第四部分,同時更新 half 和 op 消息在 Queue 中的偏移量

// 對所有的 half 消息計算完成後,更新偏移量if (newOffset != halfOffset) {  transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);}// 根據已經被標記爲完成的 op 消息更新偏移量long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) {  // 如果不等,說明並不是所有的 op 消息都被標記爲完成了  // 所以我們只將偏移量更新到第一個未完成的 op 消息的位置,其後面的 op 消息會在下次重複處理  transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);}

然後在 Producer 這邊,將由 
ClientRemotingProcessor.checkTransactionState() 來處理回查操作

// 獲取事務 IDString transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {    messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {    // 從 MQClientFactory 找到註冊的對應 Producer    MQProducerInner producer = this.mqClientFactory.selectProducer(group);    if (producer != null) {        final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());        // 讓 Producer 檢查在對應 IP 上的事務狀態        producer.checkTransactionState(addr, messageExt, requestHeader);    } else {        log.debug("checkTransactionState, pick producer by group[{}] failed", group);    }} else {    log.warn("checkTransactionState, pick producer group failed");}

再進入 
producer.checkTransactionState() 看看 Producer 是怎樣檢查事務狀態的

TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();// 取出當前 Producer 的事務監聽器TransactionListener transactionListener = getCheckListener();if (transactionCheckListener != null || transactionListener != null) {  LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;  Throwable exception = null;  try {    if (transactionCheckListener != null) {      // 調用其的事務回查方法      localTransactionState = transactionCheckListener.checkLocalTransactionState(message);    } else if (transactionListener != null) {      log.debug("Used new check API in transaction message");      localTransactionState = transactionListener.checkLocalTransaction(message);    }  } catch (Throwable e) {    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);    exception = e;  }   // 再將事務執行結果其發回給 Broker  this.processTransactionState(    localTransactionState,    group,    exception);} else {  log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}

最後發回的方法做的事情和在一開始發送事務狀態的方法,所做的事情是一樣的。Broker 做的處理也是一樣的。

這樣,補償流程就執行完了。

批量消息

概念

在消息隊列中,批量消息也是一個重要的部分,將消息壓縮在一起發送不僅可以減少帶寬的消耗,還能節省頭部佔用的空間。

有點失望的是,RocketMQ 對於批量消息的實現有點 "粗糙" 了

源碼流程

首先,在調用 send() 的 batch 版本後,會先對批量消息進行校驗

批量消息不允許延時、不允許發送到重試 Topic,且要求發送到的 Topic 必須是同一個 Topic

List<Message> messageList = new ArrayList<Message>(messages.size());Message first = null;for (Message message : messages) {  if (message.getDelayTimeLevel() > 0) {    throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");  }  if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {    throw new UnsupportedOperationException("Retry Group is not supported for batching");  }  if (first == null) {    first = message;  } else {    if (!first.getTopic().equals(message.getTopic())) {      throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");    }    if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {      throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");    }  }  messageList.add(message);}MessageBatch messageBatch = new MessageBatch(messageList);

在校驗完成,且都放到一個 List 之後,接下來的步驟和普通的消息發送都差不多,只是在編碼上理所當然的存在着不同

public static byte[] encodeMessages(List<Message> messages) {  //TO DO refactor, accumulate in one buffer, avoid copies  List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());  int allSize = 0;  for (Message message : messages) {    // 編碼每一個消息    byte[] tmp = encodeMessage(message);    encodedMessages.add(tmp);    allSize += tmp.length;  }   // 放到最後的大集合中  byte[] allBytes = new byte[allSize];  int pos = 0;  for (byte[] bytes : encodedMessages) {    System.arraycopy(bytes, 0, allBytes, pos, bytes.length);    pos += bytes.length;  }  return allBytes;}

然後使用 
RequestCode.SEND_BATCH_MESSAGE 這個狀態碼發送出去。

在 Broker 端,其投入的過程大體上和普通消息類似,但是其最後的持久化到硬盤時,這塊批量消息被拆分爲了普通的單條消息。

即 RocketMQ 使用批量消息只減少了發送時的寬帶傳輸,對於存儲與交給消費者的部分並沒有獲得優化

// 拆分批量消息爲每一個普通消息while (messagesByteBuff.hasRemaining()) {  // 1 TOTALSIZE  final int msgPos = messagesByteBuff.position();  final int msgLen = messagesByteBuff.getInt();  final int bodyLen = msgLen - 40; //only for log, just estimate it    /*  pass: 當作普通消息存儲   */    queueOffset++;  msgNum++;  messagesByteBuff.position(msgPos + msgLen);}

延時消息

概念

在業務中,有時候有一些延時提交任務的需求,這時候就可以使用延時消息,即在投遞一部分時間後纔對消費者可見。

不過,在 RocketMQ 中,延遲級別並不支持自定義,而是具有固定的延遲級別。

不過商業版的 阿里雲 MQ 可以支持秒精度的自定義延遲時間,果然是爲了閹割社區版來賺錢嗎

源碼流程

RocketMQ 對於延時消息的處理主要在於 Broker 端,所以我們只需要看在 Broker 對延時級別的處理。

首先,在 CommitLog 的 put 中,會對延遲級別進行判斷,如果存在,會在這進行進行 Topic 的替換,將其存儲到對應的延遲級別的 Queue

if (msg.getDelayTimeLevel() > 0) {  if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());  }   topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;  queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());   // Backup real topic, queueId  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));  msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));   msg.setTopic(topic);  msg.setQueueId(queueId);}

然後會被在 DefaultMessageStore 中初始化的 ScheduleMessageService 處理

首先,該服務在啓動時會進行初始化

public void start() {  // 保證只被執行一次  if (started.compareAndSet(false, true)) {    // 加載本地快照    super.load();    this.timer = new Timer("ScheduleMessageTimerThread", true);    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {      // 取出每一個級別      Integer level = entry.getKey();      // 當前延遲級別對應的延遲時間      Long timeDelay = entry.getValue();      // 該延遲級別之前消費到的自己的隊列的偏移量      Long offset = this.offsetTable.get(level);      if (null == offset) {        offset = 0L;      }       // 每一個延遲級別設置一個定時任務      if (timeDelay != null) {        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);      }    }     // 定時持久化各個延遲級別的偏移量    this.timer.scheduleAtFixedRate(new TimerTask() {       @Override      public void run() {        try {          if (started.get()) ScheduleMessageService.this.persist();        } catch (Throwable e) {          log.error("scheduleAtFixedRate flush exception", e);        }      }    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());  }}

每一個延遲級別的 Queue 都有對應的定時任務,且都會執行以下方法

public void executeOnTimeup() {  // 找到自己延遲級別的消費隊列  ConsumeQueue cq =    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,                                                                     delayLevel2QueueId(delayLevel));  long failScheduleOffset = offset;  if (cq != null) {    // 根據消費偏移量將指定的 MappedFile 文件加載進來    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);    if (bufferCQ != null) {      try {        long nextOffset = offset;        int i = 0;        // 遍歷每一個消息的索引        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {          long offsetPy = bufferCQ.getByteBuffer().getLong();          int sizePy = bufferCQ.getByteBuffer().getInt();          long tagsCode = bufferCQ.getByteBuffer().getLong();           /* pass  */           long now = System.currentTimeMillis();          long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);           nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);           long countdown = deliverTimestamp - now;          if (countdown <= 0) /* 目標時間小於當起時間,可以執行 */ {            // 根據偏移量取出消息            MessageExt msgExt =              ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(              offsetPy, sizePy);             if (msgExt != null) {              try {                // 將延遲消息恢復成原本消息的樣子                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);                /* pass */                                // 投入真實的 Topic                PutMessageResult putMessageResult =                  ScheduleMessageService.this.writeMessageStore                  .putMessage(msgInner);                 /* pass: 更新度量信息  */              } catch (Exception e) {                /* pass */              }            }          } else /* 否則,這個消息需要被消費的時間到了再通知我 */ {            ScheduleMessageService.this.timer.schedule(              new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),              countdown);            // 更新消費偏移量            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);            return;          }        } // end of for         // 走到這裏,說明暫時沒有需要消費的延時消息        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);        // 小睡一會        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(          this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);        return;      } finally {        bufferCQ.release();      }    } // end of if (bufferCQ != null)    /* pass */  } // end of if (cq != null)  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,                                                                                failScheduleOffset), DELAY_FOR_A_WHILE);}

可以看出,延遲消息的實現還是十分簡單的,由於先投入的延時消息必先快於後投入的消息的到期,所以只需要不斷的拉取各個延遲級別對應的隊列 的頭部的延遲消息即可。這也是隻支持固定級別的延遲消息帶來的好處。

來源:

https://www.cnblogs.com/enoc/p/rocketmq-so-no-nana.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/-LIH1KJo0Th58ftTZ5g-eQ