RocketMQ 源碼分析之消息寫入
RocketMQ 是阿里巴巴開源的分佈式消息中間件,它具有低延遲、高性能、高可靠性、萬億級容量和靈活的擴展性。本篇文章介紹了其存儲文件和存儲整體架構,並從源碼角度分析了消息寫入流程以及消息刷盤。
1.RocketMQ 存儲文件
Rocketmq 存儲路徑爲 ${ROCKET_HOME}/store,主要存儲以下文件:
-
commitlog
消息存儲目錄
-
consumequeue
消息消費隊列存儲目錄
-
index
消息索引文件存儲目錄
-
checkpoint
文件檢查點,存儲 commitlog、consumequeue 和 index 文件最後一次刷盤時間戳
-
abort
如果 abort 文件存儲則表示 broker 非正常關閉,否則表示 broker 正常關閉。該文件是在 broker 啓動的過程中創建的。
-
config
broker 運行期間一些配置信息,主要包含以下信息:
-
consumerFilter.json
該文件保存的是每個 topic 中消息的過濾邏輯
-
consumerOffset.json
該文件保存的是每個 consumer group 的消費進度
-
delayOffset.json
該文件保存的是延遲消息隊列拉取進展
-
subscriptionGroup.json
該文件保存的是每個消費者的訂閱信息
-
topics.json
該文件保存的是 topic 的配置信息
2.RocketMQ 消息存儲整體架構
消息存儲架構圖中有三個與消息存儲相關的文件,分別是 commitlog、consumequeue 和 index。RocketMQ 通過使用內存映射文件來提高 IO 訪問性能,無論是 commitlog、consumequeue 還是 index,單個文件都被設計爲固定長度,如果一個文件寫滿以後再創建一個新文件。commitlog 和 consumequeue 的文件名稱是該文件第一條消息對應的全局物理偏移量,index 的文件名稱是以創建文件的時間戳命名。
在 RocketMQ 中所有 topic 的消息都存儲在同一個文件中,這樣就確保了發送時順序寫文件及消息發送的高性能和高吞吐量。但是 RocketMQ 是基於 topic 的消息訂閱機制,這樣便給消息消費以及消息檢索帶來了極大的不便。爲了提高 consumer 消費消息的效率,RocketMQ 引入了 consumequeue,consumequeue 文件組織方式是 ${ROCKET_HOME}/store/consumequeue/topic 名稱 / queueid/,它記錄的是消息的 commitlog offset、消息大小和 tag hashcode。爲了提高消息檢索的功能,RocketMQ 中引入了 index 文件,其 hash 衝突設計理念借鑑了 Java 中 HashMap 的結構。index 文件包含三個部分:IndexHeader、Hash 槽和 Index 條目,其中 IndexHeader 記錄了 index 中包含消息的最大及最小存儲時間、最大及最小物理偏移量、hashSlot 個數、index 條目列表當前已使用的個數,Index 條目記錄的是消息 key 的 hashcode、消息的 commitlog offset、消息與第一條消息的時間戳差值及該條目的前一條目的 index 索引。(注意:根據 key hashcode 定位 hash 槽可能會引發 hash 衝突,index 文件爲了解決 hash 衝突其解決方法是每個 hash 槽存儲的是落在這個槽的 hashcode 最新的 index 的索引,新的 index 條目的最後四個字節存儲該槽上一個條目的 index 的下標。)
消息存儲架構圖可以簡化爲以下流程:
-
producer 發送消息到 broker
-
broker 採用同步或者異步方式將消息刷盤持久化
-
broker 的 master 和 slave 之間數據同步
-
broker 後臺服務線程 ReputMessageService 分發請求構建 consumequeue 和 index 文件
本篇文章我們一起先來看下消息的寫入流程。
3.MappedFile 與 MappedFileQueue
在 RocketMQ 中使用 MappedFile 和 MappedFileQueue 來封裝存儲文件,MappedFile 是 RocketMQ 內存映射文件的具體實現,MappedFileQueue 是 MappedFile 的管理容器,MappedFileQueue 是對存儲目錄的封裝。下圖可以表示出兩者的關係:
MappedFile 重要屬性如下所示:
MappedFileQueue 重要屬性如下所示:
- 消息寫入
4.1 消息寫入流程
消息寫入的整體流程如下圖所示:
Commitlog#putMessage 流程如下圖所示:
下面我們詳細分析寫入流程中幾個比較重要的方法:
-
getLastMappedFile(final long startOffset, boolean needCreate)
-
appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) 及 doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner)
4.2 獲取最新的 mappedFile
獲取最新的 mappedFile 的方法是 getLastMappedFile(final long startOffset, boolean needCreate),其實現邏輯如下:
-
在 MappedFileQueue 中使用 CopyOnWriteArrayList mappedFiles 記錄了 mappedFile 的集合,在寫入數據時我們總是在最新的 mappedFile 中寫入數據,所以首先從 mappedFiles 中獲取最後一個 mappedFile
-
最新的 mappedFile 爲空,這種情況下計算待創建的 mappedFile 的起始 offset。mappedFile 爲空的場景是第一次使用 broker
-
最新的 mappedFile 不爲空並且已經寫滿了,這樣情況下也需要計算待創建的 mappedFile 的起始 offset,計算方法是最新 mappedFile 的初始偏移量與每個 mappedFile 大小的和
-
如果待創建的 mappedFile 的 offset 不爲 - 1 並且 needCreate 爲 true,構建出待創建的 mappedFile 的文件路徑 nextFilePath 以及再下一個 mappedFile 的文件路徑 nextNextFilePath,然後調用 allocateMappedFileService 服務的 putRequestAndReturnMappedFile 方法構建 AllocateRequest(該請求實現了 compareTo 方法,請求是按照文件名稱從小到大排序的,即創建 mappedFile 是有序的)請求並將請求放在其待處理的隊列中,後臺 allocateMappedFileService 服務會從請求隊列中獲取請求並創建 mappedFile。創建 mappedFile 的方法是 allocateMappedFileService 服務中的 mmapOperation(),這裏面需要注意:創建 mappedFile 有兩種不同的方式。
方式一:
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
這種方式是在 broker 的配置文件中刷盤方式是異步刷盤並且 TransientStorePoolEnable 爲 true 的情況下生效,該方式下 MappedFile 會將向 TransientStorePool 申請的堆外內存(Direct ByteBuffer)空間作爲 writeBuffer,寫入消息時先將消息寫入 writeBuffer,然後將消息提交至 fileChannel 最後再 flush。
方式二:
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
這種方式是直接創建 MappedFile 內存映射文件字節緩衝區 mappedByteBuffer,將消息寫入 mappedByteBuffer 再 flush。
- 如果最新的 mappedFile 不爲空則直接返回該 mappedFile 即可
getLastMappedFile(final long startOffset, boolean needCreate) 的實現如下:
4.3 追加消息到 mappedFile
追加消息到 mappedFile 的實現方法是 appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb),其實現邏輯如下:
-
獲取 mappedFile 寫指針位置
-
判斷寫指針的位置與文件大小的關係,如果寫指針的位置小於文件大小則按照消息的類型(普通消息及批量消息)調用 AppendMessageCallback 的回調函數 doAppend 追加消息,doAppend 方法是追加消息的核心實現,其實現邏輯是:
-
計算消息寫入的位置
-
爲消息創建 msgId,其創建規則是 4 個字節 IP+4 個字節的端口號 + 8 字節的消息偏移量
-
在 commitlog 的 topicQueueTable 記錄 consumequeue 的信息
-
序列化消息(注意:producer 發送的消息格式和 broker 最終存儲的消息格式是不一樣的),broker 端存儲的消息的格式如下:
-
將消息寫入消息隊列緩存中
-
構建追加消息的 AppendMessageResult 並返回結果
-
更新 mappedFile 寫指針位置及文件最後寫入的時間
appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) 的實現如下:
doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) 的實現如下:
- 消息刷盤
消息刷盤分爲同步刷盤和異步刷盤,同步刷盤只有在消息真正持久化至磁盤後 RocketMQ 的 Broker 端纔會真正返回給 Producer 端一個成功的 ACK 響應。同步刷盤對 MQ 消息可靠性來說是一種不錯的保障,但是性能上會有較大影響,一般適用於金融業務應用該模式較多。異步刷盤能夠充分利用 OS 的 PageCache 的優勢,只要消息寫入 PageCache 即可將成功的 ACK 返回給 Producer 端。消息刷盤採用後臺異步線程提交的方式進行,降低了讀寫延遲,提高了 MQ 的性能和吞吐量。
handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) 的處理邏輯如下:首先會根據消息刷盤類型分爲兩類:同步和異步,然後不同的類型有不同的處理方式,這裏需要注意異步刷盤的分支中還會再分爲兩種:啓用 TransientStorePoolEnable 和不不啓用 TransientStorePoolEnable
實現消息同步刷盤的服務是 GroupCommitService,該服務是在 broker 啓動時啓動的,在 GroupCommitService 服務中有兩個存放 GroupCommitRequest 的 list,分別是 requestsWrite 和 requestsRead,在 handleDiskFlush 方法中 GroupCommitRequest 被 put 到 requestsWrite 中,GroupCommitService 服務會每 10 毫秒執行一次 swapRequests() 方法,該方法會交換 requestsWrite 和 requestsRead 中的請求,GroupCommitService 服務在後臺會一直執行 doCommit() 方法,這個方法會不斷從 requestsRead 中獲取 GroupCommitRequest 並執行 flush 操作,最後清空 requestsRead。在 GroupCommitService 中使用 requestsWrite 和 requestsRead 可以避免提交刷盤請求與消費刷盤請求的鎖競爭。整個過程可以使用下圖來表示:
異步刷盤分爲兩種情況:
(1)TransientStorePoolEnable 爲 false
TransientStorePoolEnable 爲 false 時,是使用 FlushRealTimeService 服務來進行刷盤操作,該服務的核心邏輯如下:首先從配置文件中獲取 flushCommitLogTimed、flushIntervalCommitLog、flushPhysicQueueLeastPages 和 flushPhysicQueueThoroughInterval,計算距離上次刷盤的時間差,判斷是否超過 flushPhysicQueueThoroughInterval,如果超過了 flushPhysicQueueThoroughInterval 則本次刷盤將忽略 flushPhysicQueueLeastPages,會將所有內存緩存的全部數據刷盤到文件中,最後會調用 flush 將內存中的數據寫到磁盤並更新 checkpoint 文件中 commitlog 文件的更新時間戳。
(2)TransientStorePoolEnable 爲 true
TransientStorePoolEnable 爲 true 時會先使用 CommitRealTimeService 來將 writeBuffer 中的數據提交到 fileChannel 中之後會喚醒 FlushCommitLogService 服務來進行刷盤操作,CommitRealTimeService 服務的核心邏輯如下:
綜上,異步刷盤兩種情況可以用下圖來說明:
作者簡介
孫璽,中國民生銀行信息科技部開源軟件支持組工程師, 目前主要負責 RocketMQ 源碼研究和工具開發等相關工作。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Te0zvuh5g44RCgTUOdvhTA