Bookie 存儲架構源碼剖析|得物技術
一
Pulsar 存儲架構簡析
Pulsar 作爲新一代 MQ 中間件,在底層架構設計上充分貫徹了存算分離的思想,broker 與 Bookeeper 兩個組件獨立部署,前者負責流量的調度、聚合、計算,後者負責數據的存儲,這也契合了雲原生下 k8s 大行其道的時代背景。Bookeeper 又名 Bookie ,是一個單獨的存儲引擎。在組件關係上,broker 深度依賴 Bookie,內部集成了 Bookie 的 client 端,broker 和 Bookie 之間基於 TCP 通信,使用 protobuf。
Pulsar 整體架構
消息流從 client 端發送到 broker,經過 broker 的計算、轉化、路由後再次被分發到具體的 Bookie 節點,一條消息被存儲幾份是可配置的。數據的高可用由 broker 來保障而非 Bookie,Bookie 只是一個簡單的單機存儲引擎。一般而言數據多副本有兩種主要的分發方式:一種是基於主從模式,主節點在收到數據寫入後,將數據二次分發到從節點,從節點的數據流源頭只有主節點,可以存在多個從節點,這種架構典型實現有 rocketMQ ,MySQL 等;另一種方式是並行多份寫入多份相同的數據,在接收到 SDK 側數據後進行多路分發。兩種方式各有優劣,前者實現簡單,但是延遲較高,在開啓同步複製(異步複製可能丟數據)的情況下延遲爲: master 寫入延遲 + slave 寫入延遲;後者實現複雜,需要處理單節點分發失敗補償的問題,但是延遲較低,實際的寫入延遲爲 Max(shard1 寫入延遲,shard2 寫入延遲,.....)。Pulsar 的數據分發模式爲後者。
Pulsar 數據流架構
一個 topic 在時間序列上被分爲多個 Ledger,使用 LedgerId 標識,在一個物理集羣中,LedgerId 不會重複,採用全局分配模式,對於單個 topic(分區 topic)而言同一時刻只會有一個 Ledger 在寫入,關閉的 Ledger 不可以寫入,以 topicA-partition1 的 Ledgers[ledger1, ledger3, ledger7, ...., ledgerN] 爲例,可寫入的 Ledger 只有 N,小於 N 的 Ledger 均不可寫入,單個 Ledger 默認可以存儲 5W 條消息,當 broker 以 (3,2,2)模式寫入數據時,具體架構如下圖所示。3,2,2 可以解釋爲當前 topic 可以寫入的節點有 3 個,每次數據寫入 2 份,並且收到 2 個數據寫入成功的 ACK 後纔會返回響應 client 端。
Ledger 分段機制
二
Bookie 的架構設計
對 Pulsar 的架構有了大致的瞭解後,我們重點剖析下 Bookie 這個核心的存儲引擎。消息系統爲了追求最大寫入吞吐,一般都採用順序寫的方式來壓榨磁盤的 IO 性能。Bookie 也是一樣,默認情況下 Bookie 的數據會寫入 journal 日誌文件,這個日誌類似於 MySQL 中的 binlog 文件或者 rocketMQ 中的 commitlog 文件,採用亂序追加寫的方式,存在多個 topic 的數據寫入同一個文件的情況。
爲了更好的 IO 隔離,官方建議 journal 單獨掛一塊盤。爲了充分發揮磁盤 IO 性能,journal 目錄可以有多個,即同時存在多個並行寫入的 journal 日誌,每個 journal 日誌會綁定一個寫入線程,寫入請求提交後會被歸一化到某個具體線程,實現無鎖化,單個消息寫入是按照 LedgerId 對目錄數量取模,決定當前數據落到哪個 journal 目錄。journal 日誌落盤策略是可配置的,當配置同步落盤時,數據實時落盤後纔會返回寫入成功。journal 日誌數據寫入後會確認返回寫入成功,而 entrylog 的數據是否落盤並不影響請求的立即返回。journal 和 entrylog 均可以配置爲異步刷盤,這種情況下落盤的時序上並沒有先後之分。
Bookie 數據存儲架構
Journal 日誌的主要作用是保證數據不丟失,同時提供足夠快的性能,因此採用了混合落盤的模式。實際業務消費時,針對單個 topic 的數據在時間序列上是順序消費,如果實際的數據從 journal 文件中讀取則會出現大量的隨機 IO,性能較差。Bookie 通過將數據進行二次轉寫的方式實現數據的局部有序從而提升讀取性能,默認情況下一份數據在磁盤上會存兩份:一份在 journal 日誌中,一份在 entry 日誌中。entry 日誌中的數據具備局部有序的特性,在一批數據刷盤時,會針對這批數據按照 LedgerId,entryId 進行排序後落盤。這樣消費側在消費數據時能夠實現一定程度上的順序 IO,以提升性能。
entryIndex 的作用是保存(LedgerId+entryId)到 offset 的映射關係,這裏的 offset 是指 entry data 文件中的 offset。
這樣的一組映射關係很容易想到其在內存中的組織形式,一個 map。實際的存儲 Pulsar 選擇 rocksDB 來存儲這樣的 KV 關係,但 Bookie 本身也有自己的 KV 存儲實現;
通過對 Bookie 架構的上分析,我們發現針對讀寫場景 Bookie 做了兩件事來支撐:
-
混合 Ledger 順序寫的 journal 日誌支撐高吞吐低延遲的寫入場景;
-
局部有序的 entry data 支撐消費場景下的 Ledger 級別的順序讀。
三
Bookie 的數據寫入流程
對於 Bookie 的寫入流程大致如下圖所示。Bookie 收到數據後會同時寫入 journal 日誌和 memtable,memtable 是一個內存 buffer。memtable 再次分發到 entry logger 以及 entry index,數據在 journal 中 append 完後會立即返回寫入成功。entry data 和 entry index 的構建可以理解都是異步操作。
Bookie 數據寫入流程
client 端源碼分析
Pulsar 中 broker 組件 使用 low level API 與 Bookie 進行通信。下文結合具體代碼進行分析。
ClientConfiguration conf = new ClientConfiguration();
conf.setThrottleValue(bkthrottle);
conf.setMetadataServiceUri("zk://" + zkservers + "/ledgers");
BookKeeper bkc = new BookKeeper(conf);
final LedgerHandle ledger = bkc.createLedger(3, 2, 2, DigestType.CRC32, new byte[]{'a', 'b'});
final long entryId = ledger.addEntry("ABC".getBytes(UTF_8));
使用 low level api 時,藉助於 LedgerHandle 添加 entry 對象。在 Pulsar 中 entryId 爲一個遞增的序列,在 broker 中 Bookie 的源碼調用順序如下所示,其中 LedgerHandle,OpAddEntry,LedgerHandle class 對象爲 Bookeeper 模塊提供。
-
ManagedLedgerImpl#asyncAddEntry() 方法(參數省略,下同)
-
ManagedLedgerImpl#internalAsyncAddEntry() 方法
-
LedgerHandle#asyncAddEntry() 方法
-
OpAddEntry#initiate() 方法
-
LedgerHandle#doAsyncAddEntry() 方法
-
BookieClient#addEntry() 方法
LedgerHandle#doAsyncAddEntry 方法
在 doAsyncAddEntry 中的 729 行,發現 entryId 其實是由 lastAddPushed 遞增得到,並且這段代碼也被加上了重量級鎖。PendingAddOp 對象構建完成後會進入一個 pendingAddOps 隊列,該隊列與當前 Ledger 綁定。
PendingAddOp#initiate 方法
這裏的 PendingAddOp 對象代表着一個寫數據的請求,在 initiate 進一步加鎖,結合寫入節點的數量分別向不同的 Bookie 存儲節點發送寫請求,sendWriteRequest 方法內容比較簡單,直接調用 addEntry 方法即可。
PendingAddOp#sendWriteRequest
BookieClient#addEntry
addEntry 方法的實現依然有很多方法包裝的細節,但最終通過網絡調用 server 端的相關接口,這裏篇幅有限,不過度展開。
server 端源碼分析
請求路由組件:BookieRequestProcessor
直接跳轉 bookeeper 的 server 端的核心處理方法上,BookieRequestHandler 爲 server 端的處理類,其繼承了 Netty 的 ChannelInboundHandlerAdapter,是最外層與 netty 組合工作的 handler。
BookieRequestHandler
在 channelRead 方法中觸發了 requestProcessor 的處理邏輯,這裏的 processor 實際爲 BookieRequestProcessor,具體的相關代碼在 BookieServer 類的構造函數中,BookieServer 是整個 bookeeper server 端的啓動類。
BookieRequestProcessor#processRequest 方法爲數據流的核心指令分發器。
BookieRequestProcessor#processRequest
這裏圍繞 processAddRequestV3 方法展開分析;Bookie 中有個很有意思的設定,將請求處理線程池分爲普通線程池和高優線程池;兩者執行邏輯相同。在下圖的 452 行將寫操作請求放入了線程池,需要說明的是這個線程池是經過改良的,多了一個 orderingKey 參數,在內部會將根據該參數進行 hash 運算,映射具體的線程上,其內部由多個單線程的線程池組成。這樣做的好處是可以大幅度減少投遞任務時的隊列頭部競爭,相比傳統線程池有一定的性能優勢。
processAddRequestV3
核心線程池任務:WriteEntryProcessorV3
顯然,核心的處理邏輯在 write.run 方法內,繼續開扒。run 方法中核心邏輯封裝在 getAddResponse()。
WriteEntryProcessorV3#run
getAddResponse 方法內會對當前請求的標記,判斷後分別調用 recoveryAddEntry 和 addEntry 這兩個方法。前者的使用場景顧名思義是在異常恢復流程中被觸發,一般是節點啓動,宕機後重啓等過程中恢復數據。addEntry 方法位於 Bookie 內,Bookie 是個接口,只有一個實現類 BookieImpl。
WriteEntryProcessorV3#getAddResponse
存儲引擎接口抽象:Bookie
繼續來看 BookieImpl#addEntry 方法,在 1067 這一行加上了 synchronized 鎖,鎖的對象爲 handle,具體爲 LedgerDescriptor 類型,這表示在單個 Ledger 內部的數據在寫入時通過加鎖的方式實現串行化寫入。
1073 行的 addEntryInternal 方法內部是核心的寫入邏輯。
BookieImpl#addEntry
Ledger 的管理者:LedgerDescriptor
getLedgerForEntry 方法基於傳入的參數 LedgerId 查找到對應的 LedgerDescriptor,該類是一個抽象類,有兩個實現類,分別是 LedgerDescriptorImpl 和 LedgerDescriptorReadOnlyImpl,顧名思義,二者分別提供讀寫功能。
BookieImpl#getLedgerForEntry
LedgerDescriptor 的兩個實現類
handles 是 HandleFactory 類型接口,從其定義的接口來看主要作用就是實現 LedgerDescriptor 的讀寫分離,且只有一個實現 HandleFactoryImpl,在 HandleFactoryImpl 中保存了 2 個 Map 類型的 MAP。分別服務於兩個接口的調用,getHandle 方法就是從 map 中獲取可以寫入的 LedgerDescriptor。
HandleFactory
事實上 LedgerDescriptorReadOnlyImpl 的實現很簡單,繼承了 LedgerDescriptorImpl 後將該類涉及到寫入的方法全部重寫爲拋出異常!
LedgerDescriptorReadOnlyImpl
獲取到對應的 LedgerDescriptor 後,就需要進行寫入操作,下面分析 BookieImpl#addEntryInternal 方法。
從邏輯上來講,entry 先是被寫入 Ledger storage(930 行),其次才被寫入 journal 日誌,同時 journal 日誌的寫入是可選的,默認情況下開啓;journal 關閉後將不存在數據落盤的邏輯,這意味着將無法依靠 journal 日誌進行數據恢復。但考慮到消息寫入時一般是多份,不考慮寫入的多個節點同時宕機的情況,數據某種程度上依然是可靠的。
BookieImpl#addEntryInternal
Ledger 級的接口抽象:LedgerStorage
LedgerDescriptorImpl 中持有一個 ledgerStorage 類型,該組件負責最終的 entry 對象寫入,存在多個實現類,分別是:DbLedgerStorage,SingleDirectoryDbLedgerStorage,InterleavedLedgerStorage,SortedLedgerStorage。
LedgerDescriptorImpl
LedgerStorage 實現類
Bookie 默認使用 SortedLedgerStorage,但 Pulsar 中使用 DbLedgerStorage 進行管理。
實際可配置的實現只有三個選項,下面依次對每個實現類進行分析。
ServerConfiguration
1
DbLedgerStorage->SingleDirectoryDbLedgerStorage
writeCache 寫入
DbLedgerStorage 主要特點是使用了 rocksDB 保存 [ledgerId+entryId --> location] 的映射關係;內部又存在了一層套娃。addEntry 方法中先獲取到 LedgerId, 再根據 LedgerId 獲取 ledgerStorage,也就是說 LedgerId 和實際的 LedgerStorage 存在映射關係;DbLedgerStorage 內部又繼續封裝了 SingleDirectoryDbLedgerStorage 類來支撐數據寫入,具體是一個 ListledgerStrageList;字段。經過 hash 後獲得真實的 SingleDirectoryDbLedgerStorage 對象進行實際的 addEntry 操作;下文首先對該實現進行分析。
DbLedgerStorage#addEntry
DbLedgerStorage#getLedgerStorage
DbLedgerStorage 的成員變量
在 SingleDirectoryDbLedgerStorage 的源碼中,**待寫入的 entry 僅僅是被放入 writeCache 中,put 成功後更新 LAC 並通知相關監聽者,同時觸發寫入成功事件,貌似沒有任何寫盤的操作出現!!!**進一步分析 497 行,如果 put 失敗會觸發 flush 操作並嘗試再次 addEntry,這裏的 flush 有點眼熟,有必要展開分析一波。
不難發現這裏的寫入操作和刷盤操作其實是線程隔離的,默認情況下,類比於 RMQ,大部分存儲組件的刷盤操作和實際寫入動作切分爲兩個線程在執行,刷盤線程會不斷地巡檢是否需要刷盤,主要基於當前未刷盤的數據量以及距離上次刷盤的時間間隔,如果開啓同步刷盤,一般寫入線程會被掛起在 req 請求上,當刷盤進度已經 cover 寫入請求的 offset 時,被掛起的請求上的線程會被喚醒繼續執行,這是一種非常典型的存儲引擎設計模式。這裏 writeCache 就是個 buffer,既可以充當寫入緩衝也可以充當讀取緩衝,在 tail read 場景下會有非常好的性能收益。
SingleDirectoryDbLedgerStorage#addEntry
writeCache 背後的 flush
triggerFlushAndAddEntry 的邏輯並不複雜,在超時時間到來之前會不斷的檢查當前的刷盤標記位,如果沒有正在刷盤以及刷盤邏輯沒有被觸發,會嘗試刷盤,同時嘗試繼續向 writeCache 中 put 數據,因爲刷盤成功後會在 cache 中清理出一部分空間,用於 put 新的的數據,一旦 put 成功立即返回,跟外層的 addEntry 方法類似,只是多了個刷盤邏輯的處理。
SingleDirectoryDbLedgerStorage#triggerFlushAndAddEntry 方法
flush 方法其實是個空殼,核心邏輯在 checkpoint() 方法內,該方法的主要邏輯爲:
-
交換 writeCache,避免刷盤過程中數據無法寫入,導致寫入抖動;
-
對 writeCache 內的數據進行排序,實現局部有序;
-
分別調用 entryLog 的 add 方法和 entryIndex 的 add 方法;
-
調用 entrylog 的 flush 和 entryIndex 的 flush 進行刷盤。
SingleDirectoryDbLedgerStorage#flush
SingleDirectoryDbLedgerStorage#checkpoint
源碼中的 writeCacheBeingFlushed 實際上和 writeCache 一體兩面,上一次刷盤結束後 writeCacheBeingFlushed 會被 clear,再次刷盤時會交換兩者;保證寫入的穩定性;如果實際查詢數據時要利用這部分 cache,需要查詢兩次,先查 writeCache 如果不存在 ,再查 writeCacheBeingFlushed。
writeCacheBeingFlushed 的註釋
entryLocationIndex.addLocation(batch, ledgerId, entryId, location); 方法底層依賴 rocksDB 建立了 [ledgerId, entryId]-->location 的映射關係,Batch 在這裏代表着 一個 RocksDBBatch,location 可以理解爲實際磁盤文件上的 offset。rocksDB 引擎超出本文範疇,這裏不做分析。
EntryLogger
entryLogger 代表着存儲實際數據的組件抽象,調用 addEntry(ledgerId, entry) 方法完成數據寫入。進一步對 addEntry 方法展開分析,發現 EntryLogger 是一個接口,有 2 個直接實現類,分別是 DefaultEntryLogger 和 DirectEntryLogger,默認使用 DefaultEntryLogger 參見源碼:
DbLedgerStorage#initialize -part1
DbLedgerStorage#initialize -part2
最終的調用來到了 EntryLogManagerBase#addEntry 方法,首先獲取到待寫入的數據,然後調用 BufferedLogChannel#write 將其寫入,可以看到實際的數據長度爲:entry.readableBytes() + 4,4 個字節用於記錄長度,先寫入長度值,再寫入 entry 的二進制數據;addEntry 方法返回值爲 location,方法的最後一行表明 location 由 2 部分組成,分別是 logId 和 pos,各暫用高位和低位的 4 個字節。很容易想到隨着時間的推移 EntryLogger 中的文件不止一個,因此需要一個 logId 來標識不同文件,具體到文件上又需要一個偏移量來定位具體的一條數據,4 個字節的 pos 也表明了單個 entryLog 文件理論大小值不能超過 4 個 G,實際默認值爲 1G。
EntryLogManagerBase#addEntry
進一步分析 BufferedLogChannel#write 方法,發現 BufferedLogChannel 繼承置之 BufferedChannel ,在 BufferedChannel 中有一個 writeBuffer ,write 方法只是將數據寫入到這個 writeBuffer 中,至於是否刷盤則不一定。
只在滿足下列兩種情況時數據纔會刷盤:
-
當前 writeBuffer 已經寫滿,writeBuffer 默認值 64KB;
-
ServerConfiguration 中配置了 FLUSH_ENTRYLOG_INTERVAL_BYTES 參數,且值大於 0,默認值爲 0。
BufferedLogChannel#write
flush 方法內容很簡單,調用底層的 fileChannel 將 writeBuffer 中的數據寫入底層的文件系統,但是 flush 並不保證一定落盤,而是最後一行代碼 forceWrite 方法保證。forceWrite 會調用 fileChannel.force(forceMetadata) 將數據同步到磁盤上。
BufferedChannel#flush
爲了保證數據的務必落盤,在 SingleDirectoryDbLedgerStorage#checkpoint 方法中,addEntry 方法之後,又單獨調用了 entryLogger.flush(); 和 ledgerIndex.flush(); 對 entryLogger.flush() 進一步拆分,發現底層調用了 EntryLogManagerBase#flush 方法,二者兩個方法在 base 類中是 abstract 類型,具體實現又落到了 EntryLogManagerForSingleEntryLog 中,最終任務還是落在了 BufferedChannel#flushAndForceWrite 上。
BufferedChannel#flushAndForceWrite
2
SortedLedgerStorage->InterleavedLedgerStorage
在 SortedLedgerStorage 類中,持有了 InterleavedLedgerStorage 類型,大部分的接口實現都委託給了 InterleavedLedgerStorage 的相關方法調用,SortedLedgerStorage 的最大特點是,每次數據寫入時都會進行排序,其內部使用了跳錶。
EntryMemTable 寫入
addEntry 方法的邏輯非常簡單,將數據 add 到 memtTable 後,更新下 LAC 即結束;
SortedLedgerStorage#addEntry
繼續研究 EntryMemTable 的 addEntry 方法之前先了解下 EntryMemTable 的結構,這個組件是一個純內存的數據保存結構,kvmap 和 snapshot 負責實際數據保存,二者類型皆是 EntrySkipList ,這個類簡單封裝了 ConcurrentSkipListMap,實際使用時 KV 值相同,因爲需要保證有序,所以重寫了排序規則,主要比較 LedgerId 和 entryId。
kvmap 和 snapshot 工作機制是,當寫滿 kvmap 時,會將數據交換給 snapshot,kvmap 重新構建一個新的指定大小的結構,後臺線程負責將 snpshot 中的數據刷盤保存,因此只要後臺刷盤的速度不是特別垮,可以提供持續不間斷的寫入。單個 kvmap 有大小限制,默認 64M 大小,結合前面的 swap 機制,最多可以兜住 128M 的寫入緩存。
EntryMemTable
addEntry 寫入之前先獲取讀鎖,(沒錯,寫入用的是讀鎖!!!)然後將數據 put 進入 kvmap 結構中,internalAdd 方法內容很簡單,就是一個對 kvmap 的 putIfAbsent 調用,看到這裏可以理解爲什麼用的是讀鎖了。因爲這裏 kvmap 的併發安全控制根本不依賴這個讀寫鎖。
EntryMemTable#addEntry
EntryMemTable 刷盤
讀寫鎖的主要作用是,在 swap kvmap 和 snapshot 瞬間加上寫鎖控制以及讀取數據時加上讀鎖控制。
ReentrantReadWriteLock 使用場景
每次刷盤之前會先創建個一個 snapshot 快照,用以保證此快照之前的數據在此次的刷盤範圍內;創建 snapshot 時,會交換 kvmap 與 snapshot 兩個字段,因爲快照的創建是刷盤行爲觸發的,而刷盤動作一般都是有個單獨的線程在執行,所以這裏需要控制併發邏輯,保證在 swap 的瞬間,不能有 addEntry 操作,同樣的在刷盤結束後需要清理 snapshot 的數據,也加上了寫鎖來控制。
EntryMemTable#snapshot
會有一個後臺的刷盤線程執行 flush 操作,首先會先將 snapshot 數據 flush,然後嘗試創建新的 snapshot,如果創建成功說明,仍然有可刷數據,再次執行 flushSnapshot 的動作。
EntryMemTable#flush
在 flushSnapshot 方法中,會調用 flusher 的 process 方法,這裏的 flusher 其實就是 SortedLedgerStorage,在 process 方法內的實際調用了 InterleavedLedgerStorage 的 processEntry 方法,這個方法並不能保證數據一定會落到磁盤文件上,因此 EntryMemTable 所謂的 flush 操作只是將其內存數據刷新到 InterleavedLedgerStorage 組件中。
EntryMemTable#flushSnapshot
SortedLedgerStorage#process
EntryLogger
繼續來看 InterleavedLedgerStorage 的處理邏輯,添加 Entry 後將對應的 KV 索引寫入 LedgerCache 緩存後返回。查看 InterleavedLedgerStorage 的 entryLogger 字段發現,與上文的 SingleDirectoryDbLedgerStorage 相同,寫入 entry 依然用的是 DefaultEntryLogger。
InterleavedLedgerStorage#processEntry
InterleavedLedgerStorage#entryLogger
EntryIndex
上文提到默認情況下 Pulsar 使用 DbLedgerStorage 來存儲數據和索引信息,而索引信息默認情況下使用 rocksDB 來存儲,rocksDB 作爲頂級 KV 引擎其性能和穩定性毋庸置疑。但是在實際的使用過程中,某些時候會選擇 LedgerStorage 的另一個實現類:SortedLedgerStorage。SortedLedgerStorage 的主要特點是是在每次寫入數據的時候都會進行內部排序,內部維護一個跳錶,同時其存儲 leggerId+entryId 到 location 的映射關係是使用 Java 的引擎實現。下面對這個 Java 實現的 KV 引擎做詳細分析。
ServerConfiguration 關於 ledgerStorageClass 的配置
仍然是先從 entryLog 的寫入作爲突破口,SortedLedgerStorage 內部套了一個 InterleavedLedgerStorage 對象,前者複用了後者的 addEntry 方法,核心方法在 InterleavedLedgerStorage#processEntry 中。
long pos = entryLogger.addEntry(ledgerId, entry, rollLog); 方法添加完 entry 對象後返回對象在文件的 offset,內部的 add 邏輯與上文分析的 SingleDirectoryDbLedgerStorage 一致。
InterleavedLedgerStorage#addEntry
InterleavedLedgerStorage#processEntry
LedgerCache 是一個接口,具體的實現只有一個 LedgerCacheImpl 類,後者內部有兩個支撐組件,IndexInMemPageMgr 和 IndexPersistenceMgr,從名稱可以看出前者負責數據在內存中的保持,後者負責實際的存儲。按照之前的分析的源碼,很容易聯想到數據大概率先落入 memoryPage 再落盤,pageSize 默認 8K,entriesPerPage 默認爲 pageSize/8= 1K。
LedgerCacheImpl
putEntryOffset 方法首先通過 entryId 模以單個 page 頁的 entry 數量得到當前 entryId 在具體的 page 頁中的偏移量,這裏的 page 不是 OS 中的 page 頁,是 Bookeeper 單獨抽象出來的 page 概念,需要區分開。在 getLedgerEntryPage 方法中,首先會嘗試從內存中獲取 LedgerEntryPage 對象,如果沒有則調用 grabLedgerEntryPage 方法從磁盤上加載,內存中緩存的對象結構爲 InMemPageCollection,內部是一個 LRU 緩存。
寫入算法分析:
-
LedgerEntryPage 是對單個頁的抽象;
-
int offsetInPage = entry % entriesPerPage; 計算出當前的 entryId 在單個 LedgerEntryPage 的邏輯偏移量;
-
long pageEntry = entry - offsetInPage; 計算出當前 LedgerEntryPage 中初始 entryId;
-
基於 LedgerId 和初始 entryId 查找定位到 LedgerEntryPage,如果緩存中不存在,則從文件中加載;
-
按照 offsetInPage 計算當前的 offset 需要寫入的真實位置,這裏的 offset 即是 entryLogger 中 entry location 值;
-
由於寫入的數據爲 offset 是個 long 類型,需要 8 個二進制爲,實際的寫入的位點爲邏輯上的 offsetInPage*8。
IndexInMemPageMgr#putEntryOffset
上述的算法自然也是可逆的,讀取的時候同樣基於 LedgerId 和 entryId 定位到具體的 LedgerEntryPage。然後在計算出實際的物理偏移量,在特定位置讀取到 location 參數。
順序寫入的 WAL 日誌:Journal
分析完 writeCache 的寫入及其背後的邏輯,我們繼續分析 journal 日誌的寫入流程;上文提到 journal 爲混合寫入模式,可能存在多個 LedgerId 的數據混編。在 addEntryInternal 方法的最後一行中通過 LedgerId 獲取到真實的 journal,獲取的邏輯依然是個 hash 算法,用來保證相同 LedgerId 始終落到一個 journal 上進行處理。
logAddEntry 幹了三件事:
-
entry.retain() 調整 entry 的引用計數值;
-
journalStats 給內部的 queueSize +1;
-
memoryLimitController 內存使用限速器,如果超限時,當前線程會被置爲等待狀態;
-
queue.put(.......), 將待寫入的數據放進隊列。
結合 logAddEntry 源碼發現又是熟悉的味道,寫入方法只是將請求放入隊列,那麼必然存在從隊列獲取數據並進行刷盤的邏輯。既然有 put 操作,必然有 take 操作,我們發現 takeAll 和 pollAll 方法 ,都位於 journal#run 方法中,run 方法這個名字如此敏感,以至於不跟 Thread 扯上點關係都說不過去。
Journal#logAddEntry
queue 的調用點
public class Journal extends BookieCriticalThread implements CheckpointSource
查看 journal 的 class 簽名 發現其不出所料的實現了 Thread 的 run 方法,journal 既是順序寫入日誌邏輯的抽象也是後臺的刷盤線程的抽象;run 方法的實現較爲複雜,其註釋表明這是一個專門負責持久化的線程方法,同時負責 journal 文件的滾動,當 journal 文件被寫滿時,會使用當前時間戳創建一個新的 journal 文件,老的文件會被定期回收。
在 queue 字段旁邊有一個 forceWriteRequests 字段,這個字段在實際的刷盤邏輯中起到了重要作用。
Journal 部分成員變量釋義:
-
maxGroupWaitInNanos 組提交間隔,一般超過這個時間需要刷盤;
-
flushWhenQueueEmpty 開關表示當 queue 爲空時是否刷盤;
-
bufferedEntriesThreshold 表示暫存在 toFlush 中的對象數量的閾值;
-
bufferedWritesThreshold 表示待刷盤的字節數閾值;
-
journalPageCacheFlushIntervalMSec 真實刷盤的時間間隔。
Journal#run 方法成員變量釋義:
-
localQueueEntries 是一個複用的定長數組;
-
localQueueEntriesIdx 是這個定長數組中當前處理的元素索引編號,從 0 開始;
-
localQueueEntriesLen 代表每次從 queue 隊列中獲取的對象數量;
-
toFlush 隊列是個可複用的 ArrayList,可以認爲是個對象池;
-
numEntriesToFlush 是個待刷盤對象數量的計數器,與 toFlush 配合使用;
-
lastFlushPosition 爲上次刷盤位點記錄值;
-
lastFlushTimeMs 爲上次刷盤時間點(毫秒單位);
-
JournalChannel 是單個 journal 文件的抽象,journal 代表單個目錄下的多文件抽象;
-
BufferedChannel 代表一個寫入緩衝區,來自於 JournalChannel;
-
qe 爲 QueueEntry 類型的臨時變量。
Journal#run 的主要邏輯如下:
-
啓動 forceWriteThread 線程,這是一個真正意義上的刷盤線程;
-
journal 線程只是將 queue 中的 QueueEntry 對象寫入相關的 FileChannel 的 buffer 中,並不保證一定落盤;
-
實際的刷盤行爲由 forceWriteThread 負責。
-
不斷的從 queue 中獲取一組 QueueEntry 對象,並逐一將其寫入 BufferedChannel 緩衝區;
-
從 queue 中獲取的 QE 對象放入 localQueueEntries 數組中;
-
entry 需要符合一定的條件纔會被寫入二進制數據流(主要 entryId 和版本的識別);
-
寫入調用的是 BufferedChannel#write 方法,只是將數據寫入內部的 writeBuffer 中;
-
寫入緩衝區後,將 QE 對象添加進入 toFlush 隊列,同時調整 numEntriesToFlush(+1);
-
繼續處理 localQueueEntries 中的下一個元素。當 localQueueEntriesIdx == localQueueEntriesLen 時,表示 localQueueEntries 元素全部處理完成,此時臨時變量 qe(QueueEntry) 置爲 null。
-
在處理 qe 對象的過程中,會綜合多方面條件判斷是否需要刷盤,使用臨時變量 shouldFlush 表示;
-
當 numEntriesToFlush>0 且符合以下條件時會觸發 “刷盤” 邏輯;
-
當臨時變量 qe 爲空 或者當前的 qe 處理的時間超過 maxGroupWaitInNanos;
-
當臨時變量 qe 爲空並且開啓 flushWhenQueueEmpty 配置時刷盤;
-
當臨時變量 qe 不爲空,符合下面兩個條件時刷盤;
-
且 toFlush 中暫存的對象數量超過 bufferedEntriesThreshold;
-
或距離上次刷盤的位點間隔超過 bufferedWritesThreshold。
-
如果滿足刷盤條件,調用 BufferedChannel#flush 操作;
-
flush 操作會將之前攢批的 writeBuffer 中的數據寫入 OS 的文件系統;
-
底層作爲 FileChannel#write 方法的入參;
-
將 toFlush 相關索引爲置空,同時調整 numEntriesToFlush;
-
觸發 entry 寫入的相關回調邏輯執行;
-
更新 lastFlushPosition。
-
flush 操作完成後將進一步判斷是否需要向 forceWriteThread 提交真實的刷盤請求;
-
提交時會將 toFlush 列表中全部對象連同其他參數封裝成一個請求對象;
-
一旦提交後將更新 lastFlushTimeMs;
-
符合提交條件的情況有:
-
開啓 syncData ,journal 級別的開啓同步刷盤的開關;
-
當前需要滾動創建新的 journal 文件;
-
距離上次真實刷盤時間超過閾值 journalPageCacheFlushIntervalMSec。
Journal#run 方法
當真實刷盤請求被提交到 forceWriteThread 線程後,有必要進一步分析該線程的執行邏輯,相比之下 ForceWriteThread#run 方法的邏輯簡單很多,解包收到的請求,然後調用 syncJournal 進行強制刷盤,同時做一些清理回收的動作,以及最後的一些回調方法的觸發和統計操作。這裏的 localRequests 也是一個可複用的臨時數組。
ForceWriteThread#run
Journal#syncJournal 方法調用了 request 對象的 flushFileToDisk 方法,該方法內部調用了 logFile.forceWrite(false); 。
logFile 就是之前提到的單個 journal 文件的抽象,即 JournalChannel,其內部封裝了 BufferedChannel,實際的類型爲 DefaultEntryLogger,與 EntryLogger 所使用的底層實現如出一轍。
Journal#syncJournal
JournalChannel 類
再論 Bookie
上文提到 BookieImpl 中的 addEntry 邏輯似乎很簡單,數據寫入交由 LedgerHander 和 journal 組件,自身則是簡單的封裝。實則不然,查看 BooikeImpl 的實現,發現其中存在一個 SyncThread 對象,該對象是一個同步線程,其邏輯爲轉寫 journal 日誌的數據到 entryLog 和 entryIndex。
BookieImpl
啓動 checkpoint 定期檢查
doCheckpoint 在底層最終調用了 LedgerStorage#checkpoint 方法,與上文提到的 writeCache 背後的 flush 殊途同歸。這裏存在另外一個問題:SyncThread 線程是否會與 triggerFlushAndAddEntry 中的 flush 線程併發執行,以及是否存在併發刷盤帶來的數據錯亂問題。答案是不會,具體來看 checkpoint 方法內部存在一個 flushMutex 鎖,同時在進入鎖之前,首先會對當前的 checkpoint 做判斷,如果傳入的 checkpoint 水位線低於當前 SingleDirectoryDbLedgerStorage 對象持有的 lastCheckpoint 水位線,則不執行實際的 checkpoint 動作。
SyncThread#doCheckpoint
server 端分析總結
Bookeeper 的 server 端的架構較爲複雜,分爲多級寫入的架構,收據流向爲:
-
數據首先進入 writeCache,有後臺線程定期將 cache 數據同步到 entryLog 和 entryIndex;
-
writeCache 底層採用 swap 機制,保證寫入延遲的穩定性。
-
調用 entryLog 和 entryIndex 分別寫入業務數據和索引數據;
-
entryIndex 使用 rocksDB 作爲 KV 索引保存 LedgerId+ entryId 到 offset 的映射關係。
-
SingleDirectoryDbLedgerStorage#flush 操作和 EntryLogger#flush 操作不同;
-
前者只是將數據同步到 entryLog 和 entryIndex 中;
-
後者真實調用底層的文件系統進行刷盤。
-
journal 日誌的寫入時可配置的,默認開啓,journal 日誌同樣存在後臺的刷盤線程;
-
journal 線程一直重複在幹兩件事;
-
將 QueueEntry 轉化爲二進制寫入 bufferChannel 的 writebuffer;
-
綜合判斷各種條件,定期向 forceWriteThread 線程 提交真實的刷盤任務。
四
Bookie 的數據讀取流程
server 端源碼分析
請求路由
回到 BookieRequestProcessor#processRequest 的源碼截圖,讀取流程圍繞 READ_ENTRY 這一 opCode 展開,同樣在最新版本的 Bookie 代碼中,read processor 升級到了 V3 版本。
BookieRequestProcessor#processRequest
和寫入一樣,在讀取的 processReadRequestV3 方法中,依然有高優先級線程池和普通線程池,不同的是還多了一個長輪詢線程池,在投遞任務時又出現了熟悉的操作,跟 LedgerId 選擇線程池中具體的線程執行操作。
BookieRequestProcessor#processReadRequestV3
直接跳轉 ReadEntryProcessorV3#run 方法。發現是個空殼,邏輯封裝 executeOp 方法中。
ReadEntryProcessorV3#run
BookieImpl#readEntry
最終的讀取邏輯在 BookieImpl#readEntry 中,該方法只是簡單的封裝,根據 LedgerId 獲取到 LedgerDescriptor 後,讀取邏輯順利委託給了 LedgerDescriptor,在 LedgerDescriptor#readEntry 方法內進一步套娃,又將請求轉移給了 LedgerStorage#getEntry,前文提到 LedgerStorage 是個接口,真正幹活的是 SingleDirectoryDbLedgerStorage 中的 doGetEntry 方法,這個類在寫入請求的分析過程中同樣出場過。
BookieImpl#readEntry
doGetEntry 方法的邏輯整體較爲簡單,主要分爲以下幾步:
-
如果傳入的 entryId 爲 - 1 ,表示讀取 LAC ,先從 Ledger 中獲取實際的 LAC 的 entryId,在進行讀取;
-
默認先從 writeCache 中讀取,如果讀取不到則去 writeCacheBeingFlushed 中讀取,命中則直接返回;
-
如果 2 級緩存中均不存在,則去 readCache 中據需讀取;
-
如果 readCache 也不存在,那麼就要觸發磁盤讀,先去 entryLocationIndex 獲取 entryLogger 中的物理偏移量;
-
隨後調用 entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); 獲取真實數據;
-
數據獲取到後會放入 readCache 中;
-
在方法結束時,會觸發一次預讀,讀取緊挨着當前 entry 的下一個 entry 並放入 readCache 中。
SingleDirectoryDbLedgerStorage#doGetEntry
DefaultEntryLogger 如何讀取 entry
entry 的讀取在上圖的 640 行,最終調用方法爲 DefaultEntryLogger#internalReadEntry 方法。邏輯如下:
-
將 location 參數轉化爲 buffer 中的 position 位點;
-
獲取到 FileChannel(856 行);
-
從 pos-4 位置開始讀取 20 個字節並解析,sizeBuf 值爲 entry 的整體長度(4+8+8);
-
然後分配一個 sizeBuf 大小的 buffer 用於裝載即將要讀取的 entry。
DefaultEntryLogger#internalReadEntry
DefaultEntryLogger#readEntrySize
在存量數據足夠的情況下 readFromLogChannel 方法會盡可能將入參中的 buffer 填滿,在 BufferedReadChannel 中存在一個 readBuffer,默認大小 512 字節,read 方法仍然有可能命中該緩存。
DefaultEntryLogger#readFromLogChannel
server 端分析總結
數據的查詢內容比較簡單,從大的架構上來看整個讀取過程存在三級緩存,都不命中的話纔會讀取磁盤。
實際上在上層的 broker 組件裏還有一層緩存存在。消息獲取流程如下圖所示:
五
讀寫調用鏈分析
組件模塊分析
BufferedChannel
其派生關係如下圖所示,還有一個 SlowBufferedChannel 繼承 BufferedChannel 類,但是該類爲測試使用。BufferedReadChannel 是讀場景下的主要支撐類,內部有 512 字節的讀緩衝。
EntryLogger
默認使用 DefaultEntryLogger,主要用於存儲實際的 entry 對象數據,DefaultEntryLogger 和 DirectEntryLogger 的區別在於一個使用 JDK 的 RandomAccessFile ,另一個直接使用 DIO(單獨依賴特定 C 庫)。
DefaultEntryLogger
DirectEntryLogger
LedgerStorage
基於 EntryLogger 的上層抽象,主要實現有 InterleavedLedgerStorage 和 SingleDirectoryDbLedgerStorage, 還有一個 SortedLedgerStorage,內部封裝了 InterleavedLedgerStorage,複用了大部分的 InterleavedLedgerStorage 的方法。SortedLedgerStorage 每次寫入時對內部的數據進行排序,使用自帶的 KV 引擎存儲 LedgerId+entryId-->location 映射關係。SingleDirectoryDbLedgerStorage 每次刷盤時纔會對緩存的數據進行排序,使用 rocksDB 存儲 KV 關係。
SingleDirectoryDbLedgerStorage
InterleavedLedgerStorage
LedgerDescriptor
包裝類,大部分邏輯委託給 ledgerStorage 實現。內部持有 ledgerId,每個 ledgerId 對應一個 LedgerDescriptor 對象。
Bookie
Bookie 節點級存儲抽象,內部封裝了多個 journal 抽象組成的 journalList,ledgerStorage,syncThread 線程。
syncThread 線程主要負責將 journal 中的 appendLog 轉寫爲 entryLog 和 enrtyIndex,checkpoint 之前的數據在執行 GC(數據清理工作,非 JVM 中的 GC)時可被回收刪除。
ReadEntryProcessorV3,WriteEntryProcessorV3
負責讀寫指令的路由和轉化。
寫入流程調用時序
WriteEntryProcessorV3
--> Bookie
-->LedgerDescriptor
-->LedgerStorage
-->EntryLogger
-->BufferedLogChannel
讀取流程調用時序
ReadEntryProcessorV3
--> Bookie
-->LedgerDescriptor
-->LedgerStorage
-->EntryLogger
-->BufferedReadChannel
六
架構總結
Bookie 的存儲架構主要分爲三大塊,首先是代表 WAL 日誌的 journal 文件寫入,以順序混寫的方式提升寫入性能,保證低延遲,通常以獨立盤隔離掛載,典型的消息場景下 journal 日誌寫完後即可返回。由於是不同 topic 的混合寫入,journal 日誌無法很好的支撐單個 topic 的消息的順序讀,回溯等場景,會存在讀放大問題。
由此就衍生出了 entryLog 的二次轉儲,爲了儘可能利用順序讀,單個 entryLog 內部的數據在寫入時會根據 ledgerId+entryId 排序,這樣同一個 ledgerId 的數據會緊密的收斂在局部,能夠一定程度上提升讀性能;entryLog 寫入後會獲取到消息實際存儲的位點信息 offset,由於該 offset 不可被自定義,很難表述出這條消息在 topic 寫入序列上爲第幾條信息,這一點很重要,因爲消費的時候是基於這樣的序列來消費的,同時在消費位點管理時也需要這樣的信息。
entryId 的作爲一個傳入參數,其作用恰恰如此,是一個面向用戶的更易於管理的唯一 Id。當用戶基於 ledgerId+entryId 來查找數據時,顯然並不知道這個這條數據實際存儲 offset 信息。這就誕生了一個額外的 KV 結構,用來保存 ledgerId+entryId 到 offset 的映射關係。Bookie 內嵌了 rocksDB 的 KV 引擎,同時也自行實現了一套,Pulsar 默認使用 rocksDB 方式保存 KV 關係。
bookie 在整個寫入和讀取過程中利用了大量的用戶態緩存機制,相比於 mmap 的 pageCache 機制更爲靈活可控,同時也很大程度上降低了讀寫的抖動,尤其是在容器環境下不同 POD 互相干擾的情況。
文 / 簌語
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/rKKnUf37nuq7wg-VedwtVg