深度解析:Pulsar 的消息存儲機制和 Bookie 的 GC 機制原理

導讀

Apache Pulsar 是一個多租戶、高性能的服務間消息傳輸解決方案,支持多租戶、低延時、讀寫分離、跨地域複製、快速擴容、靈活容錯等特性。騰訊數據平臺部 MQ 團隊對 Pulsar 做了深入調研以及大量的性能和穩定性方面優化,目前已經在 TDbank 落地上線。本文是 Pulsar 技術系列中的一篇,主要簡單梳理了 Pulsar 消息存儲與 BookKeeper 存儲文件的清理機制。其中,BookKeeper 可以理解爲一個 NoSQL 的存儲系統,默認使用 RockDB 存儲索引數據。

圖片

作者介紹

鮑明宇

騰訊 TEG 數據平臺部高級工程師
Apache Pulsar Contributor

熱衷於開源技術,在消息隊列領域有豐富經驗,目前致力於 Pulsar 的落地和推廣

Pulsar 消息存儲

Pulsar 的消息存儲在 BookKeeper 中,BookKeeper 是一個胖客戶的系統,客戶端部分稱爲 BookKeeper,服務器端集羣中的每個存儲節點稱爲 bookie。Pulsar 系統的 broker 作爲 BookKeeper 存儲系統的客戶端,通過 BookKeeper 提供的客戶端 SDK 將 Pulsar 的消息存儲到 bookies 集羣中。

Pulsar 中的每個 topic 的每個分區(非分區 topic,可以按照分區 0 理解,分區 topic 的編號是從 0 開始的),會對應一系列的 ledger,而每個 ledger 只會存儲對應分區下的消息。對於每個分區同時只會有一個 ledger 處於 open 即可寫狀態。

Pulsar 在生產消息,存儲消息時,會先找到當前分區使用的 ledger ,然後生成當前消息對應的 entry ID,entry ID 在同一個 ledger 內是遞增的。非批量生產的情況(producer 端可以配置這個參數,默認是批量的),一個 entry 中包含一條消息。批量方式下,一個 entry 可能包含多條消息。而 bookie 中只會按照 entry 維度進行寫入、查找、獲取。

因此,每個 Pulsar 下的消息的 msgID 需要有四部分組成(老版本由三部分組成),分別爲(ledgerID,entryID,partition-index,batch-index),其中,partition-index 在非分區 topic 的時候爲 - 1,batch-index 在非批量消息的時候爲 - 1。

每個 ledger,當存在的時長或保存的 entry 個數超過閾值後會進行切換,同一個 partition 下的,新的消息會存儲到下一個 ledger 中。Ledger 只是一個邏輯概念,是數據的一種邏輯組裝維度,並沒有對應的實體。

圖片

BookKeeper 集羣中的每個 bookie 節點收到消息後,數據會分三部分進行存儲處理,分別爲:journal 文件、entryLog 文件、索引文件。

其中 journal 文件,entry 數據是按照 wal 方式寫入的到 journal 文件中,每個 journal 文件有大小限制,當超過單個文件大小限制的時候會切換到下一個文件繼續寫,因爲 journal 文件是實時刷盤的,所以爲了提高性能, 避免相互之間的讀寫 IO 相互影響,建議存儲目錄與存儲 entrylog 的目錄區分開,並且給每個 journal 文件的存儲目錄單獨掛載一塊硬盤(建議使用 ssd 硬盤)。journal 文件只會保存保存幾個,超過配置個數的文件將會被刪除。entry 存儲到 journal 文件完全是隨機的,先到先寫入,journal 文件是爲了保證消息不丟失而設計的。

如下圖所示,每個 bookie 收到增加 entry 的請求後,會根據 ledger id 映射到存儲到那個 journal 目錄和 entry log 目錄,entry 數據會存儲在對應的目錄下。目前 bookie 不支持在運行過程中變更存儲目錄(使用過程中,增加或減少目錄會導致部分的數據查找不到)。

圖片

如下圖所示,bookie 收到 entry 寫入請求後,寫入 journal 文件的同時,也會保存到 write cache 中,write cache 分爲兩部分,一部分是正在寫入的 write cache, 一部分是正在正在刷盤的部分,兩部分交替使用。

write cache 中有索引數據結構,可以通過索引查找到對應的 entry,write cache 中的索引是內存級別的,基於 bookie 自己定義的 ConcurrentLongLongPairHashMap 結構實現。

另外,每個 entorylog 的存儲目錄,會對應一個 SingleDirectoryDbLedgerStorage 類實例對象,而每個 SingleDirectoryDbLedgerStorage 對象裏面會有一個基於 RockDB 實現的索引結構,通過這個索引可以快速的查到每個 entry 存儲在哪個 entrylog 文件中。每個 write cache 在增加 entry 的時候會進行排序處理,在同一個 write cache,同一個 ledger 下的數據是相鄰有序的,這樣在 write cache 中的數據 flush 到 entrylog 文件時,使得寫入到 entrylog 文件中的數據是局部有序的,這樣的設計能夠極大的提高後續的讀取效率。

圖片

SingleDirectoryDbLedgerStorage 中的索引數據也會隨着 entry 的刷盤而刷盤到索引文件中。在 bookie 宕機重啓時,可以通過 journal 文件和 entry log 文件還原數據,保證數據不丟失。

Pulsar consumer 在消費數據的時候,做了多層的緩存加速處理,如下圖所示:

圖片

獲取數據的順序如下:

上面每一步,如果能獲取到數據,都會直接返回,跳過後面的步驟。如果是從磁盤文件中獲取的數據,會在返回的時候將數據存儲到 read cache 中,另外如果是讀取磁盤的操作,會多讀取一部分磁盤上的時候,因爲存儲的時候有局部有序的處理,獲取相鄰數據的概率非常大,這種處理的話會極大的提高後續獲取數據的效率。

我們在使用的過程中,應儘量避免或減少出現消費過老數據即觸發讀取磁盤文件中的消息的場景,以免對整體系統的性能造成影響。

BookKeeper 的 GC 機制

BookKeeper 中的每個 bookie 都會週期的進行數據清理操作,默認 15 分鐘檢查處理一次,清理的主要流程如下

圖片

  1. 清理 bookie 存儲的 ledger id(bookie 內存儲的 ledger id 與 zk 上面存儲的 ledger id 做比較,如果 zk 上面沒有則刪除 bookie 中存儲的 ledger id);

  2. 統計每個 entry log 中存活的 entry 佔比,當前 entry log 存活的 ledger 個數爲 0 時刪除這個 entry log;

  3. 根據 entry log 的元數據信息,清理 entry log 文件(當 entry log 包含的所有 ledger id 全部失效時刪除);

  4. 壓縮 entry log 文件 , 分別在當前 entry log 文件下存活的 entry 比例在 0.5 - 默認週期 1 天 (major gc) 或比例 0.2 - 默認週期 1 個小時 (minor gc) 的時候,Compaction entry log 文件,將老的文件中存活的 entry 轉移新的文件中,然後將老的 entry log 文件刪除,單次的 GC 如果處理的 entry log 文件比較大的時候可能耗時比較長。

通過上面的流程,我們可以瞭解 bookie 在清理 entrylog 文件時的大體流程。

需要特別說明的是,ledger 是否是可以刪除的,完全是客戶端的觸發的,在 Pulsar 中是 broker 觸發的。

broker 端有周期的處理線程(默認 2 分鐘),清理已經消費過的消息所在的 ledger 機制,獲取 topic 中包含的 cursor 最後確認的消息,將這個 topic 包含的 ledger 列表中,在這個 id 之前的(注意不包含當前的 ledger id)全部刪除(包括 zk 中的元數據,同時通知 bookie 刪除對應的 ledger)。

運營中遇到的問題分析

在運用的過程中我們多次遇到了 bookie 磁盤空間不足的場景,bookie 中存儲了大量的 entry log 文件。比較典型的原因主要有如下兩個。

原因一:

生產消息過於分散,例如,舉個極端的場景,1w 個 topic,每個 topic 生產一條,1w 個 topic 順序生產。這樣每個 topic 對應的 ledger 短時間內不會因爲時長或者存儲大小進行切換,active 狀態的 ledger id 分散在大量的 entry log 文件中。這些 entry log 文件是不能刪除或者及時壓縮的。

如果遇到這種場景,可以通過重啓,強制 ledger 進行切換進行處理。當然如果這個時候消費進行沒有跟上,消費的 last ack 位置所在的 ledger 也是處於 active 狀態的,不能進行刪除。

原因二:

GC 時間過程,如果現存的 enrylog 文件比較多,且大量符合 minor 或 major gc 閾值,這樣,單次的 minor gc 或者 major gc 時間過長,在這段時間內是不能清理過期的 entry log 文件。

這是由於單次清理流程的順序執行導致的,只有上次一輪執行完,纔會執行下一次。目前,這塊也在提優化流程,避免子流程執行實現過長,對整體產生影響。

小結

本文首先,介紹了 Pulsar 消息的存儲組織形式,存儲流程和消息的獲取過程。其次,對單個 bookie 的 GC 流程做了詳盡的說明。在 Pulsar 的使用過程中,應該儘量避免消費過舊的歷史數據即需要讀取磁盤獲取數據的場景。

在運維 bookie 的過程中,是不能在運行過程中調整存儲目錄的個數的,在部署時需要對容量進行充分的評估。如果需要在運營的過程中進行調整時,需要對單個的 bookie 節點進行擴縮容處理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/L0IPBZDEI31mOrvLIwaqAA