RocketMQ 源碼分析之消息寫入

      RocketMQ 是阿里巴巴開源的分佈式消息中間件,它具有低延遲、高性能、高可靠性、萬億級容量和靈活的擴展性。本篇文章介紹了其存儲文件和存儲整體架構,並從源碼角度分析了消息寫入流程以及消息刷盤。

1.RocketMQ 存儲文件

Rocketmq 存儲路徑爲 ${ROCKET_HOME}/store,主要存儲以下文件:

2.RocketMQ 消息存儲整體架構

消息存儲架構圖中有三個與消息存儲相關的文件,分別是 commitlog、consumequeue 和 index。RocketMQ 通過使用內存映射文件來提高 IO 訪問性能,無論是 commitlog、consumequeue 還是 index,單個文件都被設計爲固定長度,如果一個文件寫滿以後再創建一個新文件。commitlog 和 consumequeue 的文件名稱是該文件第一條消息對應的全局物理偏移量,index 的文件名稱是以創建文件的時間戳命名。

 在 RocketMQ 中所有 topic 的消息都存儲在同一個文件中,這樣就確保了發送時順序寫文件及消息發送的高性能和高吞吐量。但是 RocketMQ 是基於 topic 的消息訂閱機制,這樣便給消息消費以及消息檢索帶來了極大的不便。爲了提高 consumer 消費消息的效率,RocketMQ 引入了 consumequeue,consumequeue 文件組織方式是 ${ROCKET_HOME}/store/consumequeue/topic 名稱 / queueid/,它記錄的是消息的 commitlog offset、消息大小和 tag hashcode。爲了提高消息檢索的功能,RocketMQ 中引入了 index 文件,其 hash 衝突設計理念借鑑了 Java 中 HashMap 的結構。index 文件包含三個部分:IndexHeader、Hash 槽和 Index 條目,其中 IndexHeader 記錄了 index 中包含消息的最大及最小存儲時間、最大及最小物理偏移量、hashSlot 個數、index 條目列表當前已使用的個數,Index 條目記錄的是消息 key 的 hashcode、消息的 commitlog offset、消息與第一條消息的時間戳差值及該條目的前一條目的 index 索引。(注意:根據 key hashcode 定位 hash 槽可能會引發 hash 衝突,index 文件爲了解決 hash 衝突其解決方法是每個 hash 槽存儲的是落在這個槽的 hashcode 最新的 index 的索引,新的 index 條目的最後四個字節存儲該槽上一個條目的 index 的下標。)

消息存儲架構圖可以簡化爲以下流程:

本篇文章我們一起先來看下消息的寫入流程。

3.MappedFile 與 MappedFileQueue

在 RocketMQ 中使用 MappedFile 和 MappedFileQueue 來封裝存儲文件,MappedFile 是 RocketMQ 內存映射文件的具體實現,MappedFileQueue 是 MappedFile 的管理容器,MappedFileQueue 是對存儲目錄的封裝。下圖可以表示出兩者的關係:

MappedFile 重要屬性如下所示:

MappedFileQueue 重要屬性如下所示:

  1. 消息寫入

4.1 消息寫入流程

消息寫入的整體流程如下圖所示:

Commitlog#putMessage 流程如下圖所示:

下面我們詳細分析寫入流程中幾個比較重要的方法:

4.2 獲取最新的 mappedFile

獲取最新的 mappedFile 的方法是 getLastMappedFile(final long startOffset, boolean needCreate),其實現邏輯如下:

方式一:

mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();

mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());

這種方式是在 broker 的配置文件中刷盤方式是異步刷盤並且 TransientStorePoolEnable 爲 true 的情況下生效,該方式下 MappedFile 會將向 TransientStorePool 申請的堆外內存(Direct ByteBuffer)空間作爲 writeBuffer,寫入消息時先將消息寫入 writeBuffer,然後將消息提交至 fileChannel 最後再 flush。

方式二:

mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());

這種方式是直接創建 MappedFile 內存映射文件字節緩衝區 mappedByteBuffer,將消息寫入 mappedByteBuffer 再 flush。

getLastMappedFile(final long startOffset, boolean needCreate) 的實現如下:

4.3 追加消息到 mappedFile

追加消息到 mappedFile 的實現方法是 appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb),其實現邏輯如下:

mPeKvj

appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) 的實現如下:

doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) 的實現如下:

  1. 消息刷盤

消息刷盤分爲同步刷盤和異步刷盤,同步刷盤只有在消息真正持久化至磁盤後 RocketMQ 的 Broker 端纔會真正返回給 Producer 端一個成功的 ACK 響應。同步刷盤對 MQ 消息可靠性來說是一種不錯的保障,但是性能上會有較大影響,一般適用於金融業務應用該模式較多。異步刷盤能夠充分利用 OS 的 PageCache 的優勢,只要消息寫入 PageCache 即可將成功的 ACK 返回給 Producer 端。消息刷盤採用後臺異步線程提交的方式進行,降低了讀寫延遲,提高了 MQ 的性能和吞吐量。

handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) 的處理邏輯如下:首先會根據消息刷盤類型分爲兩類:同步和異步,然後不同的類型有不同的處理方式,這裏需要注意異步刷盤的分支中還會再分爲兩種:啓用 TransientStorePoolEnable 和不不啓用 TransientStorePoolEnable

實現消息同步刷盤的服務是 GroupCommitService,該服務是在 broker 啓動時啓動的,在 GroupCommitService 服務中有兩個存放 GroupCommitRequest 的 list,分別是 requestsWrite 和 requestsRead,在 handleDiskFlush 方法中 GroupCommitRequest 被 put 到 requestsWrite 中,GroupCommitService 服務會每 10 毫秒執行一次 swapRequests() 方法,該方法會交換 requestsWrite 和 requestsRead 中的請求,GroupCommitService 服務在後臺會一直執行 doCommit() 方法,這個方法會不斷從 requestsRead 中獲取 GroupCommitRequest 並執行 flush 操作,最後清空 requestsRead。在 GroupCommitService 中使用 requestsWrite 和 requestsRead 可以避免提交刷盤請求與消費刷盤請求的鎖競爭。整個過程可以使用下圖來表示:

異步刷盤分爲兩種情況:

(1)TransientStorePoolEnable 爲 false

TransientStorePoolEnable 爲 false 時,是使用 FlushRealTimeService 服務來進行刷盤操作,該服務的核心邏輯如下:首先從配置文件中獲取 flushCommitLogTimed、flushIntervalCommitLog、flushPhysicQueueLeastPages 和 flushPhysicQueueThoroughInterval,計算距離上次刷盤的時間差,判斷是否超過 flushPhysicQueueThoroughInterval,如果超過了 flushPhysicQueueThoroughInterval 則本次刷盤將忽略 flushPhysicQueueLeastPages,會將所有內存緩存的全部數據刷盤到文件中,最後會調用 flush 將內存中的數據寫到磁盤並更新 checkpoint 文件中 commitlog 文件的更新時間戳。

(2)TransientStorePoolEnable 爲 true

TransientStorePoolEnable 爲 true 時會先使用 CommitRealTimeService 來將 writeBuffer 中的數據提交到 fileChannel 中之後會喚醒 FlushCommitLogService 服務來進行刷盤操作,CommitRealTimeService 服務的核心邏輯如下:

綜上,異步刷盤兩種情況可以用下圖來說明:

作者簡介

孫璽,中國民生銀行信息科技部開源軟件支持組工程師, 目前主要負責 RocketMQ 源碼研究和工具開發等相關工作。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Te0zvuh5g44RCgTUOdvhTA