RocketMQ 源碼詳解:事務消息、批量消息、延遲消息
概述
在上文中,我們討論了消費者對於消息拉取的實現,對於
這個黑盒的心臟部分,我們順着消息的發送流程已經將其剖析了大半部分。本章我們不妨乘勝追擊,接着討論各種不同的消息的原理與實現。
事務消息
概念
RocketMQ 中的事務消息功能,實際上是 分佈式事務中的本地事務表 的實現,只不過,在這裏用消息中間件來代替了數據庫,同時也幫我們做好了回查的操作。
在這點上,RocketMQ 和 Kafka 是截然不同的,kafka 的事務是用來實現 Exacltly Once 語義,且該語義主要用來流計算中,即在 "從 Topic 中讀 -> 計算 -> 存到 Topic" 保證不被重複計算。
事務流程
- 客戶端發送 half 消息
吐槽一下爲什麼要叫半消息 (half message),叫 prepare 消息不是更直觀嗎
-
Broker 將 half 消息持久化
-
客戶端根據事務執行結果,發送 Commit / Rollback 消息
-
Broker 收到 Commit 時,將事務消息對消費者可見。收到 Rollback 時,將消息丟棄
補償
-
Broker 過久未收到事務執行結果,詢問客戶端執行結果
-
客戶端收到結果查詢請求,執行回查方法,發送 Commit / Rollback 方法
-
Broker 根據事務執行結果做出對應處理
源碼流程
第一步
在設置好了事務監聽器後(執行事務 與 事務回查),就可以發送事務消息
在將事務消息交給發送方法後,客戶端首先會爲消息添加事務消息的標識
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
然後將該事務消息會像普通的同步消息一樣發送(且是同步發送)
sendResult = this.send(msg);
具體發送流程見:RocketMQ 源碼詳解 | Producer 篇 · 其一:Start,然後 Send 一條消息
第二步
在 Broker 端接收到消息以後,會走與普通消息相同的底層通道(因爲這個消息本身就只是個加上了 事務 flag 的普通消息),然後由
TransactionalMessageService 來對這個消息進行額外處理。
首先會對該消息放入 real topic 屬性和 real queue 屬性,然後將消息 Topic 替換爲用於處理所有事務消息的特殊的 Topic,當然該 Topic 對消費者是不可見的。
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); // 設置標記爲未收到結果 msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); // 替換到特殊的 Topic (RMQ_SYS_TRANS_HALF_TOPIC) msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner;}
完成後,會送到 MessageStore 像普通消息一樣處理
普通消息的具體流程見 RocketMQ 源碼詳解 | Broker 篇 · 其二:文件系統
第三步
回到 Producer 端,在事務消息發送完成後,該方法會使用專門的線程池執行事務
// 2.執行本地事務,更新事務獲取狀態localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
然後對本地的事務執行狀態進行處理,也就是將該執行狀態上報
this.endTransaction(msg, sendResult, localTransactionState, localException);
這裏會發送一條 oneway 命令給 Broker 端,且使用的是
RequestCode.END_TRANSACTION 請求碼
// 事務結果報告(可能是 commit 或 rollback)public static final int END_TRANSACTION = 37;
完成處理後,該方法會將事務的發送結果和本地事務的執行結構都返回給上層 API
第四步
在 Broker 端,這裏會由 EndTransactionProcessor 處理器來處理該請求碼
然後,根據事務的執行結果來做不同的處理
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 事務執行成功,嘗試完成事務 // 獲取 half 消息 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { if (res.getCode() == ResponseCode.SUCCESS) { // 將 half 消息取出,構造真實消息,然後投入實際上的 Topic /* pass */ RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { /* * 找到半消息,進行刪除 * 刪除並不是物理上的刪除,因爲物理上的刪除的代價十分的高昂,而是寫入一條具有相同事務id的消息到 op Topic */ this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; }}
如果需要回滾,則對相應的半消息進行刪除,且和上面一樣,並不是物理上的刪除,而是發送具有相同事務 id 的消息到 OP Topic,來標記這個事務已經完成了 (Commit/Rollback), OP Topic 也是一個特殊的 Topic,同樣對消費者不可見。
if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 事務執行失敗,進行 half 消息的回滾 // 首先找到 half 消息 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 進行刪除 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; }}
當這些都做完後,一次事務就完成了。
補償
當然啦,以上是順利的情況,我們當然不能指望事務每一次都能執行成功、網絡分區和宕機事件永遠不會發生。
在一段時間後,如果客戶端沒有對事務的狀態進行上報(或者上報的狀態不是 Commit 或 Rollback,而是 Unknown), Broker 端當然就要進行事務狀態的回查。
在 BrokerController 啓動的時候,會開啓事務狀態檢測服務,該服務會通過循環調用
TransactionalMessageServiceImpl.check() 方法,不斷的掃描未結束的事務,同時對超過指定時間還不知道狀態的事務進行回查操作。
check() 方法是事務回查的核心,由於很長,我們先來看第一部分 (刪減了沒人在意的 Log)
// 首先找到存儲所有 half 消息的 TopicString topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);// 對其中每一個 queue 進行檢查for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); // 獲得對應的 op 消息所在的 queue MessageQueue opQueue = getOpQueue(messageQueue); // 獲取未處理的 half 消息的起始偏移量 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // 獲取 op 消息的 queue 的起始偏移量 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); // 用來記錄已經被處理了的 op 消息的偏移量 List<Long> doneOpOffset = new ArrayList<>(); // 用來記錄已經完成了的 half 消息的偏移量 // key: halfOffset, value: opOffset HashMap<Long, Long> removeMap = new HashMap<>(); PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
在 fillOpRemoveMap 方法中,主要是將 op 消息取出,來標記可以被移除的 half 消息(op 消息的存在代表對應事務的結束)
/** * 讀取op消息,解析op消息,填充removeMap * * @param removeMap 要刪除的半消息,key: halfOffset,value: opOffset * @param opQueue Op message queue. * @param pullOffsetOfOp op message queue 的起始偏移量 * @param miniOffset half message queue 的當前最小偏移量 * @param doneOpOffset 存儲已處理的 op 消息 * @return 獲取到的 Op 消息 */private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) { // 首先通過 queue 獲取 op 消息,最大數量爲 32 條 PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32); /* pass: pullResult 消息的意外狀態的處理 */ List<MessageExt> opMsg = pullResult.getMsgFoundList(); for (MessageExt opMessageExt : opMsg) { // op 消息的 body 存儲的是對應的 half 消息的偏移量, 現在將其取出 Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); // 感覺這裏的 Tag 並沒有什麼意義,無論是 Commit 還是 Rollback 都會加入這個 Tag if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) { // 在 已處理偏移量 之前的話則可直接放入 已處理偏移量集合 if (queueOffset < miniOffset) { doneOpOffset.add(opMessageExt.getQueueOffset()); } else { // 否則放入需要移除的 half 的消息的集合 removeMap.put(queueOffset, opMessageExt.getQueueOffset()); } } } return pullResult;}
然後進入到 check 方法的第二部分
while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break; // 推進最小已處理偏移量 if (removeMap.containsKey(i)) /* 如果該 half 消息存在對應的 op 消息,說明已經被處理了(commit/rollback) */ { // 取出放入到已處理偏移量隊列 Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else /* 否則說明當前 half 消息懸而未決 */ { // 取出對應的半消息 GetResult getResult = getHalfMsg(messageQueue, i); /* pass: 半消息不存在時的意外處理 */ /* * 檢測是否要丟棄或跳過 * 丟棄條件: 當前事務已經超過了最大回查次數(15次) * 跳過條件: 已經超過了過期文件最大保留時間(72小時) */ if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // 處理並推進偏移量 // 具體的處理方法是: 投入 TRANS_CHECK_MAX_TIME_TOPIC 這個 Topic,等待手動處理 listener.resolveDiscardMsg(msgExt); // 進入到下一個 half 消息 newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { break; }
上面的方法很好理解,只是對於已經被標記結束的事務的處理、和未結束事務的補足
接下來是第三部分,這裏將繼續對未結束事務的補足,與進行可能的回查操作
// half 消息具有最小的檢查時間(免疫時間), 檢測時間以內可以跳過回查, 重新投入 half 消息的 Topic long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { break; } } /* * 對於當前事務的回查操作,需要滿足三個條件之一 * 1.當前 op 消息的集合爲空,且已經超過了最小檢查時間(免疫時間) * 2.最大偏移量的 op 消息的生成時間 已經超過了 最小檢查時間 * 3.關閉最小檢查時間 */ List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { // 先將當前 half 消息放回 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 然後向 Product 發送檢測消息 listener.resolveHalfMsg(msgExt); } else { // 否則更新 op 消息集合,以確保能夠斷言該 half 消息的狀態 pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); continue; }} newOffset = i + 1; i++;}
上面這段代碼主要圍繞 "是否進行回查" 展開,且涉及到 "免疫時間"。
在一個事務消息被髮送後,對應事務的執行當然需要一定的執行時間,如果我們不設置這個時間立刻進行回查,那麼很有可能時候事務還沒執行完,對於大多數情況下還沒執行完的事務進行回查,毫無疑問帶來的收益很低。所以我們需要設定一個時間,在這個時間內的事務先暫時不回查,這個時間就叫做 "免疫時間"。
然後再來看下需要進行回查的三種情況:
-
當 op 消息的集合爲空,說明當前還沒有收到讓當前事務結束的通知,且超過了 "免疫時間",故回查
-
當前 op 消息最大偏移量的生成時間超過了 "免疫時間",說明該事務的提交消息可能丟失了,故回查
-
不啓用 "免疫時間"
其中發送的回查消息的請求碼爲
RequestCode.CHECK_TRANSACTION_STATE ,發送的也是 oneway 消息
最後的第四部分,同時更新 half 和 op 消息在 Queue 中的偏移量
// 對所有的 half 消息計算完成後,更新偏移量if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);}// 根據已經被標記爲完成的 op 消息更新偏移量long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) { // 如果不等,說明並不是所有的 op 消息都被標記爲完成了 // 所以我們只將偏移量更新到第一個未完成的 op 消息的位置,其後面的 op 消息會在下次重複處理 transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);}
然後在 Producer 這邊,將由
ClientRemotingProcessor.checkTransactionState() 來處理回查操作
// 獲取事務 IDString transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) { messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) { // 從 MQClientFactory 找到註冊的對應 Producer MQProducerInner producer = this.mqClientFactory.selectProducer(group); if (producer != null) { final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); // 讓 Producer 檢查在對應 IP 上的事務狀態 producer.checkTransactionState(addr, messageExt, requestHeader); } else { log.debug("checkTransactionState, pick producer by group[{}] failed", group); }} else { log.warn("checkTransactionState, pick producer group failed");}
再進入
producer.checkTransactionState() 看看 Producer 是怎樣檢查事務狀態的
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();// 取出當前 Producer 的事務監聽器TransactionListener transactionListener = getCheckListener();if (transactionCheckListener != null || transactionListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { if (transactionCheckListener != null) { // 調用其的事務回查方法 localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } // 再將事務執行結果其發回給 Broker this.processTransactionState( localTransactionState, group, exception);} else { log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}
最後發回的方法做的事情和在一開始發送事務狀態的方法,所做的事情是一樣的。Broker 做的處理也是一樣的。
這樣,補償流程就執行完了。
批量消息
概念
在消息隊列中,批量消息也是一個重要的部分,將消息壓縮在一起發送不僅可以減少帶寬的消耗,還能節省頭部佔用的空間。
有點失望的是,RocketMQ 對於批量消息的實現有點 "粗糙" 了
源碼流程
首先,在調用 send() 的 batch 版本後,會先對批量消息進行校驗
批量消息不允許延時、不允許發送到重試 Topic,且要求發送到的 Topic 必須是同一個 Topic
List<Message> messageList = new ArrayList<Message>(messages.size());Message first = null;for (Message message : messages) { if (message.getDelayTimeLevel() > 0) { throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching"); } if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { throw new UnsupportedOperationException("Retry Group is not supported for batching"); } if (first == null) { first = message; } else { if (!first.getTopic().equals(message.getTopic())) { throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); } if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) { throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same"); } } messageList.add(message);}MessageBatch messageBatch = new MessageBatch(messageList);
在校驗完成,且都放到一個 List 之後,接下來的步驟和普通的消息發送都差不多,只是在編碼上理所當然的存在着不同
public static byte[] encodeMessages(List<Message> messages) { //TO DO refactor, accumulate in one buffer, avoid copies List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size()); int allSize = 0; for (Message message : messages) { // 編碼每一個消息 byte[] tmp = encodeMessage(message); encodedMessages.add(tmp); allSize += tmp.length; } // 放到最後的大集合中 byte[] allBytes = new byte[allSize]; int pos = 0; for (byte[] bytes : encodedMessages) { System.arraycopy(bytes, 0, allBytes, pos, bytes.length); pos += bytes.length; } return allBytes;}
然後使用
RequestCode.SEND_BATCH_MESSAGE 這個狀態碼發送出去。
在 Broker 端,其投入的過程大體上和普通消息類似,但是其最後的持久化到硬盤時,這塊批量消息被拆分爲了普通的單條消息。
即 RocketMQ 使用批量消息只減少了發送時的寬帶傳輸,對於存儲與交給消費者的部分並沒有獲得優化
// 拆分批量消息爲每一個普通消息while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE final int msgPos = messagesByteBuff.position(); final int msgLen = messagesByteBuff.getInt(); final int bodyLen = msgLen - 40; //only for log, just estimate it /* pass: 當作普通消息存儲 */ queueOffset++; msgNum++; messagesByteBuff.position(msgPos + msgLen);}
延時消息
概念
在業務中,有時候有一些延時提交任務的需求,這時候就可以使用延時消息,即在投遞一部分時間後纔對消費者可見。
不過,在 RocketMQ 中,延遲級別並不支持自定義,而是具有固定的延遲級別。
不過商業版的 阿里雲 MQ 可以支持秒精度的自定義延遲時間,果然是爲了閹割社區版來賺錢嗎
源碼流程
RocketMQ 對於延時消息的處理主要在於 Broker 端,所以我們只需要看在 Broker 對延時級別的處理。
首先,在 CommitLog 的 put 中,會對延遲級別進行判斷,如果存在,會在這進行進行 Topic 的替換,將其存儲到對應的延遲級別的 Queue
if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId);}
然後會被在 DefaultMessageStore 中初始化的 ScheduleMessageService 處理
首先,該服務在啓動時會進行初始化
public void start() { // 保證只被執行一次 if (started.compareAndSet(false, true)) { // 加載本地快照 super.load(); this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { // 取出每一個級別 Integer level = entry.getKey(); // 當前延遲級別對應的延遲時間 Long timeDelay = entry.getValue(); // 該延遲級別之前消費到的自己的隊列的偏移量 Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } // 每一個延遲級別設置一個定時任務 if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 定時持久化各個延遲級別的偏移量 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }}
每一個延遲級別的 Queue 都有對應的定時任務,且都會執行以下方法
public void executeOnTimeup() { // 找到自己延遲級別的消費隊列 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { // 根據消費偏移量將指定的 MappedFile 文件加載進來 SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; // 遍歷每一個消息的索引 ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); /* pass */ long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) /* 目標時間小於當起時間,可以執行 */ { // 根據偏移量取出消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { // 將延遲消息恢復成原本消息的樣子 MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); /* pass */ // 投入真實的 Topic PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); /* pass: 更新度量信息 */ } catch (Exception e) { /* pass */ } } } else /* 否則,這個消息需要被消費的時間到了再通知我 */ { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); // 更新消費偏移量 ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for // 走到這裏,說明暫時沒有需要消費的延時消息 nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 小睡一會 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) /* pass */ } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);}
可以看出,延遲消息的實現還是十分簡單的,由於先投入的延時消息必先快於後投入的消息的到期,所以只需要不斷的拉取各個延遲級別對應的隊列 的頭部的延遲消息即可。這也是隻支持固定級別的延遲消息帶來的好處。
來源:
https://www.cnblogs.com/enoc/p/rocketmq-so-no-nana.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/-LIH1KJo0Th58ftTZ5g-eQ