一文讀懂 RocketMQ 的存儲機制

一、存儲方式

2、分佈式 KV 存儲

這類 MQ 一般會採用諸如 LevelDB、RocksDB 、Redis 來作爲消息持久化的方式,由於分佈式緩存的讀寫能力要優於 DB,所以在對消息的讀寫能力要求都不是比較高的情況下,這種方案也倒還不錯。消息存儲於分佈式 KV 需要解決的問題在於如何保證 MQ 整體的可靠性?

3、關係型數據庫

Apache 下開源的另外一款 MQ——ActiveMQ (默認採用的 KahaDB 做消息存儲)可選用 JDBC 的方式來做消息持久化,通過簡單的 xml 配置信息即可實現 JDBC 消息存儲。由於,普通關係型數據庫 (如 Mysql ) 在單表數據量達到千萬級別的情況下,其 IO 讀寫性能往往會出現瓶頸。在可靠性方面,該種方案非常依賴 DB,如果一旦 DB 出現故障,則 MQ 的消息就無法落盤存儲會導致線上故障。

三種方式對比:

從存儲效率來說:文件系統 > 分佈式 KV 存儲 > 關係型數據庫

從易用性來說:關係型數據庫 > 分佈式 KV 存儲 > 文件系統

二、消息的發送與存儲

可以看出消息存儲架構圖中主要有下面三個跟消息存儲相關的文件構成。

1、CommitLog 文件

CommitLog 存儲邏輯視圖如下圖所示,每條消息的前 4 個字節存儲該消息的總長度。

CommitLog 文件的存儲目錄默認爲 ${ROCKET_HOME}/store/commitlog。

看到上面的 00000000000000000000 文件了嗎?這就代表的 commitlog 目錄下的第一個文件,該文件主要存儲消息主體以及元數據的主體,存儲 Producer 端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認 1G ,文件名長度爲 20 位,左邊補零,剩餘爲起始偏移量。比如 00000000000000000000 代表了第一個文件,起始偏移量爲 0,文件大小爲 1G=1073741824;當第一個文件寫滿了,第二個文件爲 00000000001073741824,起始偏移量爲 1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件。

不知道你有沒有疑問?commitlog 文件保存着所有的主題消息,那像消費者訂閱了某個主題的話,是如何保證高效的檢索出你所訂閱的主題呢?而且 RocketMQ 基於磁盤存儲的,爲啥這麼高效呢?

盲猜消息基於磁盤的順序存儲的,爲啥呢?

RocketMQ 的消息用順序寫,保證了消息存儲的速度。

除了消息的順序寫能保證如此高效以外,老周帶你瞭解 RocketMQ 引入的 ConsumeQueue 消費隊列文件,這個文件就是爲了解決上述高效的檢索出你所訂閱的主題的問題。

2、ConsumeQueue 文件

在說 ConsumeQueue 之前,老周覺得很有必要先說下 MessageQueue。

2.1 MessageQueue

我們知道,在發送消息的時候,要指定一個 Topic。那麼,在創建 Topic 的時候,有一個很重要的參數 MessageQueue 。簡單來說,就是你這個 Topic 對應了多少個隊列,也就是幾個 MessageQueue,默認是 4 個。那它的作用是什麼呢?

它是一個數據分片的機制。比如我們的 Topic 裏面有 100 條數據,該 Topic 默認是 4 個隊列,那麼每個隊列中大約 25 條數據。然後,這些 MessageQueue 是和 Broker 綁定在一起的,就是說每個 MessageQueue 都可能處於不同的 Broker 機器上,這取決於你的隊列數量和 Broker 集羣。有點像 Kafka 的分片機制哈,因爲 RocketMQ 正是參照 Kafka 的設計原理來搞的,說到這裏,老周又不得不感嘆下老外的創新以及設計能力了。國內雖說開源慢慢在進步了,但創新、顛覆式的產品還是很少呀。

我們來看下面的圖,0、1、2、3 就是 MessageQueue,符合上面提到的默認 4 個 MessageQueue。老周這裏是單機環境哈,所以它們的 BrokerName 都是指向同一臺機器。

2.2 ConsumeQueue

單個文件由 30W 個條目組成,可以像數組一樣隨機訪問每一個條目,每個條目共 20 個字節,所以單個 ConsumeQueue 文件大小 30W✖️20 字節約 5.72M。

說到這裏,爲啥能高效檢索主題消息,心裏應該豁然開朗了吧。

單個 ConsumeQueue 文件可以看出是一個 ConsumeQueue 條目的數組,其下標爲 ConsumeQueue 的邏輯偏移量,消息消費進度存儲的偏移量即邏輯偏移量。ConsumeQueue 即爲 CommitLog 文件的索引文件,其構建機制是當消息到達 CommitLog 文件後,由專門的線程產生消息轉發任務,從而構建消息消費隊列文件與下文提到的索引文件。

3、Index 文件

 1/**
 2 * 創建消息ID
 3 * @param input     
 4 * @param addr      Broker服務器地址
 5 * @param offset    正在存儲的消息,在Commitlog中的偏移量
 6 * @return
 7 */
 8public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
 9    input.flip();
10    int msgIDLength = addr.limit() == 8 ? 16 : 28;
11    input.limit(msgIDLength);
12    input.put(addr);
13    input.putLong(offset);
14    return UtilAll.bytes2string(input.array());
15}
16

當我們根據 Message Id 向 Broker 查詢消息時,首先會通過一個 decodeMessageId 方法,將 Broker 地址和消息的偏移量解析出來。

 1public static MessageId decodeMessageId(final String msgId) throws Exception {
 2    SocketAddress address;
 3    long offset;
 4    int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
 5    byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
 6    byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
 7    ByteBuffer bb = ByteBuffer.wrap(port);
 8    int portInt = bb.getInt(0);
 9    // 解析出來Broker地址
10    address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
11    // 偏移量
12    byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
13    bb = ByteBuffer.wrap(data);
14    offset = bb.getLong(0);
15    return new MessageId(address, offset);
16}
17

所以通過 Message Id 查詢消息的時候,實際上還是直接從特定 Broker 上的 CommitLog 指定位置進行查詢,屬於精確查詢。

這個也沒問題,但是如果通過 Message Key 和 Unique Key 查詢的時候, RocketMQ 又是怎麼做的呢?

3.2 Index 索引文件

ConsumerQueue 消息消費隊列是專門爲消息訂閱構建的索引文件,提高根據主題與消息隊列檢索消息的速度。

另外, RocketMQ 引入 Hash 索引機制,爲消息建立索引,它的鍵就是 Message Key 和 Unique Key 。

那我們來看下 RocketMQ 索引文件佈局圖:

我們發送的消息體中,包含 Message Key 或 Unique Key ,那麼就會給它們每一個都構建索引。

將當前 Index 條目的索引值,寫在 Hash 槽 absSlotPos 位置上;將 Index 條目的具體信息 (hashcode / 消息偏移量 / 時間差值 / hash 槽的值) ,從起始偏移量 absIndexPos 開始,順序按字節寫入。

RocketMQ 將消息索引鍵與消息偏移量映射關係寫入到 IndexFile 的實現方法爲:

 1/**
 2 * 將消息索引鍵與消息偏移量映射關係寫入到 IndexFile
 3 * @param key               消息索引
 4 * @param phyOffset         消息物理偏移量
 5 * @param storeTimestamp    消息存儲時間
 6 * @return
 7 */
 8public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
 9    if (this.indexHeader.getIndexCount() < this.indexNum) {
10        // 計算key的hash
11        int keyHash = indexKeyHashMethod(key);
12        // 計算hash槽的座標
13        int slotPos = keyHash % this.hashSlotNum;
14        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
15        // 計算時間差值
16        long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
17        timeDiff = timeDiff / 1000;
18        // 計算INDEX條目的起始偏移量
19        int absIndexPos =
20            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
21                + this.indexHeader.getIndexCount() * indexSize;
22        // 依次寫入hashcode、消息偏移量、時間戳、hash槽的值
23        this.mappedByteBuffer.putInt(absIndexPos, keyHash);
24        this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
25        this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
26        this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
27        // 將當前INDEX中包含的條目數量寫入HASH槽
28        this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
29        return true;
30    }
31    return false;
32}
33

這樣構建完 Index 索引之後,根據 Message Key 或 Unique Key 查詢消息就簡單了。

比如我們通過 RocketMQ 客戶端工具,根據 Unique Key 來查詢消息。

1adminImpl.queryMessageByUniqKey("order","FD88E3AB24F6980059FDC9C3620464741BCC18B4AAC220FDFE890007");
2

在 Broker 端,通過 Unique Key 來計算 Hash 槽的位置,從而找到 Index 索引數據。從 Index 索引中拿到消息的物理偏移量,最後根據消息物理偏移量,直接到 CommitLog 文件中去找就可以了。

在 Broker 端,通過 Unique Key 來計算 Hash 槽的位置,從而找到 Index 索引數據。從 Index 索引中拿到消息的物理偏移量,最後根據消息物理偏移量,直接到 CommitLog 文件中去找就可以了。

三、文件存儲模型層次結構


RocketMQ 文件存儲模型層次結構如上圖所示,根據類別和作用從概念模型上大致可以劃分爲 5 層,下面將從各個層次分別進行分析和闡述:

1、RocketMQ 業務處理器層:Broker 端對消息進行讀取和寫入的業務邏輯入口,這一層主要包含了業務邏輯相關處理操作(根據解析 RemotingCommand 中的 RequestCode 來區分具體的業務操作類型,進而執行不同的業務處理流程),比如前置的檢查和校驗步驟、構造 MessageExtBrokerInner 對象、decode 反序列化、構造 Response 返回對象等。

2、RocketMQ 數據存儲組件層;該層主要是 RocketMQ 的存儲核心類——DefaultMessageStore,其爲 RocketMQ 消息數據文件的訪問入口,通過該類的 putMessage() 和 getMessage() 方法完成對 CommitLog 消息存儲的日誌數據文件進行讀寫操作(具體的讀寫訪問操作還是依賴下一層中 CommitLog 對象模型提供的方法);另外,在該組件初始化時候,還會啓動很多存儲相關的後臺服務線程,包括 AllocateMappedFileService(MappedFile 預分配服務線程)、ReputMessageService(回放存儲消息服務線程)、HAService(Broker 主從同步高可用服務線程)、StoreStatsService(消息存儲統計服務線程)、IndexService(索引文件服務線程)等。

3、RocketMQ 存儲邏輯對象層:該層主要包含了 RocketMQ 數據文件存儲直接相關的三個模型類 IndexFile、ConsumerQueue 和 CommitLog。IndexFile 爲索引數據文件提供訪問服務, ConsumerQueue 爲邏輯消息隊列提供訪問服務,CommitLog 則爲消息存儲的日誌數據文件提供訪問服務。這三個模型類也是構成了 RocketMQ 存儲層的整體結構。

4、封裝的文件內存映射層:RocketMQ 主要採用 JDK NIO 中的 MappedByteBuffer 和 FileChannel 兩種方式完成數據文件的讀寫。其中,採用 MappedByteBuffer 這種內存映射磁盤文件的方式完成對大文件的讀寫,在 RocketMQ 中將該類封裝成 MappedFile 類。這裏限制的問題在上面已經講過;對於每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的文件(單個 IndexFile 文件大小約爲 400M、單個 ConsumerQueue 文件大小約 5.72M、單個 CommitLog 文件大小爲 1G),其中每個分隔文件的文件名爲前面所有文件的字節大小數 +1,即爲文件的起始偏移量,從而實現了整個大文件的串聯。這裏,每一種類的單個文件均由 MappedFile 類提供讀寫操作服務(其中,MappedFile 類提供了順序寫 / 隨機讀、內存數據刷盤、內存清理等和文件相關的服務)。

5、磁盤存儲層:主要指的是部署 RocketMQ 服務器所用的磁盤。這裏,需要考慮不同磁盤類型(如 SSD 或者普通的 HDD)特性以及磁盤的性能參數(如 IOPS、吞吐量和訪問時延等指標)對順序寫 / 隨機讀操作帶來的影響。

四、總結

RocketMQ 的存儲機制主要介紹了存儲方式,每種方式都有相應優劣吧,需要根據自己的業務場景來選擇。然後介紹了消息的發送與存儲,消息存儲主要由 CommitLog 文件、ConsumeQueue 文件以及 Index 文件構成。最後介紹了文件存儲模型層次結構,通過層次結構與上面的消息存儲結構圖,讓你更清晰的瞭解 RocketMQ 整個消息存儲與持久化的機制。

消息存儲這一塊的源碼還是比較複雜的,後續老周有時間再慢慢分析。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/QVHFZv8iqz8fmo-iN2tCPg