Apache BookKeeper 一致性協議解析
導語
Apache Pulsar 是一個多租戶、高性能的服務間消息傳輸解決方案,支持多租戶、低延時、讀寫分離、跨地域複製(GEO replication)、快速擴容、靈活容錯等特性。Pulsar 存儲層依託於 BookKeeper 組件,所以本文簡單探討一下 BookKeeper(下文簡稱 BK) 的一致性協議是如何實現的。
背景
Pulsar 相對於 Kafka 根本的區別在於數據一致性協議,這也是爲什麼 Pulsar 可以做到兩副本就能保障高可用、高可靠,在磁盤使用方面更均衡,也不會存在單分區容量上限,同時在擴縮容、故障屏蔽等日常運維方面更加靈活和方便。
一致性協議簡介
我們常見的一致性協議,比如 Raft、Kafka、ZAB 等,都是服務端集成協議 (協議控制和數據存儲綁定),簡單來說一致性協議由服務端存儲節點來執行。數據流向通常是客戶端寫數據到 Leader 節點,其他節點再通過推或拉的方式從 Leader 獲取數據。
而 BK 的一致性協議控制和存儲是分開的,協議控制是在客戶端執行,可以稱之爲外部一致性協議,或者客戶端一致性協議。數據流向爲客戶端向多臺存儲節點同時寫入數據,存儲節點之間基本不通信。
由於一致性協議主要是在客戶端執行,本文聚焦於 BK 客戶端的實現。所以,外部一致性協議需要解決的問題就簡化成,給定 N 個 KV 存儲 (Bookie 高性能、低可靠) 和一個分佈式協調系統(下文以 ZK 爲例, 低性能,高可靠),如何實現高可用、高可靠、低時延的讀寫服務。這裏引入 ZK 集羣的原因是 Bookie 節點會動態增減,需要有註冊中心能讓客戶端拉取存活 Bookie 的 IPPort,同時客戶端不具備存儲能力,協議控制必要的元信息信息也需要存儲到外部。在詳細瞭解 BK 一致性協議之前, 我們先簡單介紹一下 BK 提供的基礎能力。
BookKeeper 基本能力
協議作者設計 BK 初衷是提供分佈式日誌段(Ledger)而不是無界日誌(如 Raft、Kafka)。所以 BK 提供是主要能力就是創建刪除 Ledger 以及 Ledger 的讀寫操作。代碼片段如下:
public interface BookKeeper extends AutoCloseable {
CreateBuilder newCreateLedgerOp(); // 創建新 Ledger
OpenBuilder newOpenLedgerOp(); // 打開已存在Ledger
DeleteBuilder newDeleteLedgerOp(); // 刪除 Ledger
}
public interface ReadHandle extends Handle {
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry); // 從 ledger 裏讀數據
}
public interface WriteHandle extends ReadHandle, ForceableHandle {
CompletableFuture<Long> appendAsync(ByteBuf data); // 往ledger 寫數據
}
而 MQ 場景需要提供無界數據流,所以 Pulsar 爲每一個分區維護一組 Ledger,每個 Ledger 只會寫一定量數據(條數 | 大小 | 時間限制),寫滿就會創建新的 Ledger 來繼續寫入,只有最新的 Ledger 可以寫入,歷史的 Ledger 只能讀取。再根據用戶配置的數據保存策略逐步刪除歷史 Ledger。Pulsar 內分區 Legder 組成如下:
Pulsar 內分區的元數據如下:
{
"lastLedgerCreatedTimestamp": "2024-06-06T17:03:54.666+08:00",
"waitingCursorsCount": 0,
"pendingAddEntriesCount": 2,
"lastConfirmedEntry": "11946613:185308",
"state": "LedgerOpened",
"ledgers": [{
"ledgerId": 11945498,
"entries": 194480,
"size": 420564873,
"offloaded": false,
"underReplicated": false
}, {
"ledgerId": 11946057,
"entries": 194817,
"size": 420583702,
"offloaded": false,
"underReplicated": false
}, {
"ledgerId": 11946613,
"entries": 0, // 未關閉的ledger,還沒有條數
"size": 0, // 未關閉的ledger,還沒有大小
"offloaded": false,
"underReplicated": false
}]
}
創建 Ledger
BK 以 Ledger 維度對外提供服務,第一步就是創建 Ledger。一致性協議高可用是由數據的多副本存儲來實現的,所以創建 Ledger 時需要明確指定該 Ledger 的數據存儲在幾臺 Bookie(E),每條數據寫幾份(Qw),需要等到多少臺 Bookie 確認收到數據後才認爲數據寫入成功(Qa)。代碼片段如下:
public interface CreateBuilder extends OpBuilder<WriteHandle> {
// E: 接收該 Ledger 數據的節點數 默認值3
CreateBuilder withEnsembleSize(int ensembleSize);
/**
* Qw: 每條數據需要寫幾個節點, 默認值2
* 如果 E > Qw, 數據會條帶化均勻寫入到 Bookie 節點,按照默認配置,
* 寫入順序爲: 消息0=0,1; 消息1=1,2; 消息3=2,0
*/
CreateBuilder withWriteQuorumSize(int writeQuorumSize);
// Qa: 需要等待幾臺節點確認寫入成功,默認值2
CreateBuilder withAckQuorumSize(int ackQuorumSize);
ps: 配置條帶化寫入(E > Qw)可以讓多個節點分攤該 Ledger 的讀寫壓力,但是由於數據會更分散且不連續,會影響服務端讀寫優化(順序寫、預讀、批讀等), 所以生產環境建議配置 E=Qw。而 Qa < Qw 可以讓客戶端在避免等待慢節點返回從而降低寫入時延,但是優化效果有限,同時增加了存儲成本,可以配合客戶端熔斷以及服務端指標來剔除慢節點。簡單來說,建議生產環境配置 E=Qw=Qa。
在創建 Ledger 時,客戶端會從 ZK 中拉取全部 Bookie,根據 EnsemblePlacementPolicy 放置策略挑選節點,默認策略會隨機挑選出 E 個數的節點作爲初始存儲節點以保證存儲節點使用均衡。同時 BK 提供豐富的放置策略滿足用戶實現跨機房、地區容災的需求。
現在我們已經有了 Ledger 基本元數據,包含 Ledger 配置信息以及初始的 Bookie 列表,由於 Ledger 需要全局唯一,所以還需要使用 ZK 獲取一個全局唯一的 ID,最後把全部元數據寫入到 ZK 集羣即可 (創建過程無需與 Bookie 節點交互)。代碼如下:
BK 中 Ledger 元數據信息如下:
LedgerMetadata {
formatVersion = 3, ensembleSize = 3, writeQuorumSize = 3, ackQuorumSize = 3, state = CLOSED, length = 461178392, lastEntryId = 203045, digestType = CRC32C, password = base64: , ensembles = {
0 = [127.0 .0 .1: 3181, 127.0 .0 .2: 3181, 127.0 .0 .3: 3181]
}, customMetadata = {
component = base64: bWFYWdlZC1sZWRnZXI = ,
pulsar / managed - ledger = base64: cHVibGjL2RlZmF1bHQvcGVyc2lzdGVudC9kd2RfMjEwODQtcGFydGl0aW9uLTA = ,
application = base64: cHVsc2Fy
}
}
數據寫入
Ledger 已經創建好了,現在可以往裏面寫數據了。由於 Bookie 是個 KV 存儲,數據需要全局唯一的 Key, Ledger Id 已經是全局唯一了,並且只有創建這個 Ledger 的客戶端才能往裏面寫數據,所以數據的 Key 爲 LedgerId + EntryId(客戶端單調自增即可)。
每次寫入都從 E 個 Bookie 中順序挑選出 Qw 個節點(writeSet)並行發送數據。
可以看到客戶端並行寫入 Bookie 節點,Bookie 間也不需要做數據同步,所以 Pulsar 即使加了一層 Broker 層,但是寫入時延還是能做到和 Kafka 基本一致(2RTT)。
正常返回
發送數據後,如果收到 Qa 個數成功響應, 就可以認爲這條數據寫入成功了。同時這條數據也可以被讀取。
LAC(LastAddConfirmed)
由於 EntryId 單調遞增,數據按順序寫入,當一個位置的數據成功寫入到 Qa 個存儲節點後,就意味着該條數據以及之前的所有數據都可以被消費走,這個位置就是可讀位置(不考慮事務),由於已寫入多副本,也意味着這些數據可以在部分存儲節點故障下存活,從而保證讀一致性。不同協議中有不同的叫法。在 BK 中,這個位置稱爲 LAC(LastAddConfirmed),Kafka 中稱爲 HW(High Watermark),在 Raft 中稱爲 Commitindex。
下面代碼可以看到,成功寫入一條數據後會立即更新可讀位置。
LAC 是一致性協議至關重要的信息,正常情況下,LAC 只需要維護在內存中,寫入成功後更新。讀取時使用 LAC 來限制讀取位置即可。如果當前 Ledger 寫滿了,正常關閉,需要把 LAC 也保存在 ZK 的元數據中。這樣重新打開 Ledger 用於讀取時,才能加載 LAC 回內存中,用來正確限制讀取位置,同時 LAC 值也能標識 Ledger 中存儲的數據條數。
而異常情況下,故障會直接導致內存中的 LAC 丟失,這是不可接受的。最簡單粗暴的方式是每次更新 LAC 都直接同步到 ZK 中,但顯然 ZK 性能無法滿足高頻率寫入,所以 BK 的做法是在每條數據都攜帶 LAC 信息一起寫入到 Bookie 中。
如下圖,由於最新的 LAC 需要等下一輪的消息寫入纔會一起存儲到 Bookie,所以 Bookie 中存儲的 LAC 與實際的 LAC 相比是存在一定滯後。
上圖可看到初始時客戶端和 Bookie 中的 LAC 都爲 - 1,在一輪寫入後,Bookie 中的 LAC 就會滯後於客戶端了,好在這個問題並不會影響到一致性協議的正確性,下文中會提到。
故障處理
因爲需要容忍一定數據的節點故障,所以一致性協議複雜的部分都在故障處理邏輯。接下來我們先看寫入失敗場景。
寫入失敗
Kafka、Raft 等集成一致性協議,部分存儲節點異常處理相對簡單,如果是 Leader 節點故障,切換到其他節點進行讀寫即可。如果是 Follower 節點異常,通常不需要做任何操作,只需要等節點恢復後,從 Leader 節點同步數據補齊差異。由於節點是帶存儲的,所以可以容忍較長時間的節點故障。而 BK 是在客戶端實現的一致性協議,客戶端不帶存儲,沒寫成功的數據需要緩存在內存裏,顯然可緩存的數據是非常有限的,同時沒有寫成功 LAC 是不能推進的,整個寫入也就停止了。所以 BK 在寫入失敗時,最好是方式就是挑選新的 Bookie 節點重新寫入,創建 Ledger 時已經指定了初始的 Bookie 列表,後續的 Bookie 列表變更都稱爲 EnsembleChange。
EnsembleChange
如下圖所示,寫入失敗時會替換新節點重新寫入,同時也會把新元數據更新到 ZK。每次 EnsembleChange,Ensembles 中就會新增一個 Fragment,起始位置爲當前 LAC+1。
寫入存儲節點失敗簡單可以分兩種情況,如果還沒有收到 Qa 個數成功響應前,收到了錯誤響應(比如超時等),會立即執行 EnsembleChange。如果已經收到了 Qa 個數成功響應(更新 LAC),後續的錯誤響應只會記錄下失敗 Bookie 節點,在下一次寫入時再觸發 EnsembleChange。如果是不可恢復的異常,會直接返回寫入失敗,不會做 EnsembleChange。
EnsembleChange 和初始創建 Ledger 一樣,需要先選擇新的 Bookie 節點列表,替換掉失敗的 Bookie 節點列表,然後把替換後的元數據更新回 ZK,代碼如下:
更新完元數據後,重寫數據到新節點,或者根據條件重新觸發 EnsembleChange,代碼如下:
簡而言之,在寫入 Bookie 異常時,BK 客戶端都會嘗試切換 Bookie 節點重寫數據。如果遇到無法恢復的狀態碼,就會直接往外層拋出異常。上層使用方比如 Pulsar 側接收到這個異常後,會正常關閉當前 Ledger,然後創建新的 Ledger 繼續寫入。
客戶端故障
如果存儲 Bookie 節點故障,客戶端可以切換其他 Bookie 繼續完成寫入。接下來我們看看如果正在寫入的客戶端故障了,應該怎麼處理。在 Pulsar 架構裏,BK 客戶端是 Pulsar Broker 內的一個組件,當 Pulsar Broker 故障後,分區的 Leader 會切換到新 Broker 繼續服務。由於 BK 客戶端故障時沒有來得及正確關閉 Ledger。當前 Ledger 數據是不可讀的,因爲 LAC 信息已經丟失。所以新 Broker 主要任務就是要恢復 LAC 信息,然後正確關閉 Ledger,這樣 Ledger 內數據就可以正確讀取。最後再創建新的 Ledger 來恢復寫入。由於恢復過程對應分區無法寫入,所以要求恢復過程越快越好,不可存在長時間阻塞。
Fence Bookie
在恢復之前,由於老 Broker 有可能是短暫假死 (比如長時間 gc、網絡隔離等),後續可能還會持續向當前 Ledger 寫入數據導致腦裂。所以恢復的第一步就是要通知對應 Bookie 節點,後續禁止往這個 ledger 裏面寫任何數據,這個過程稱爲 Fence。過程相對簡單,就是向所有 Essemble 中的 Bookie 節點發送請求讓其禁止該 Ledger 的後續寫入。如下圖所示:
由於當前 Bookie 節點不能保證全部存活,同時需要滿足快速恢復,所以需要考慮客戶端至少收到多少個成功響應,才能認爲 Fence 操作執行成功。這裏可以反過來想一下,客戶端每次需要寫入 Qw 個節點,然後收到 Qa 個成功響應就能認爲寫入成功,我們只需要讓其湊不足 Qa 個成功響應就好了,也就是我們 Fence 掉的節點數只要大於 Qw - Qa,那寫入客戶端就一定湊不足 Qa 個成功響應,我們暫且稱這個數量爲 Qf。例子如下:
Qw = 5, Qa = 3 ==> Qf = 5 - 3 + 1 = 3
Qw = 3, Qa = 2 ==> Qf = 3 - 2 + 1 = 2
Qw = 3, Qa = 3 ==> Qf = 3 - 3 + 1 = 1 // Qw=Qa 場景需要fence的節點數最少,恢復最快
由於每次寫入都要從 E 個 Bookie 中挑選出 Qw 個節點來條帶化寫入,所以還需要保證任意一組 Qw,我們都 Fence 掉了對應的 Qf 個節點。例子如下:
假設 E=3,Qw=Qa=2, Bookies列表爲0,1,2, Qf=2-2+1=1
由於會條帶化寫入,每次寫入挑選兩臺節點,全部組合爲:
0,1 // 0,1節點至少要成功fence 掉一臺
1,2 // 1,2節點至少要成功fence 掉一臺
2,0 // 2,0節點至少要成功fence 掉一臺
易得 0,1,2 三臺節點節點至少要成功fence 掉兩臺,
同理可證如果按上文中推薦配置 E=Qw=Qa, 任何情況下都只需要fence掉任意1臺節點即可,fence 節點數量最少,恢復最快。
實際 BK 中實現的代碼邏輯是統計任意 writeSet 未成功響應數大於等於 Qa 都認爲未成功(功能一致),代碼如下:
恢復 LAC
恢復主要的任務就是恢復 LAC 信息,上文中已經介紹 LAC 信息位於每條數據中存儲在 Bookie 裏面。接下來看看恢復整個過程。
修改元數據狀態
第一步: 把 Ledger 狀態改成 Recovery 狀態。
讀取初始 LAC
第二步: 讀取 Bookie 中的 LAC,下圖代碼中可以看到 Fence Bookie 操作其實與查詢最後 LAC 爲同一個請求,只是攜帶了 Fence 標識。
取最大的 Bookie LAC 值返回。
初始 LAC = max(zkLac, bookieLac)作爲初始 LAC。
恢復真實 LAC
由於 Bookie 中存儲的 LAC 有滯後性,也就是真實 LAC(故障前內存中的 LAC)往往大於目前從 Bookie 查詢到的 LAC 最大值。所以我們還需要恢復真實的 LAC,在找到真實的 LAC 之前,我們先確定一個基本的原則。最終恢復的 LAC 可以大於真實 LAC,但是絕對不能小於真實 LAC(會導致數據丟失)。比如一條數據已經成功寫入到 Qa 個節點中,但是客戶端還沒來得及接受到 Qa 個成功響應(不更新 LAC)就故障了,那麼恢復時把這條數據 EntryID 更新到 LAC 也是合理的。換句話說,當數據成功寫入到了 Qa 節點中的那一刻,真實 LAC 就應該立即更新,只不過客戶端內存中的 LAC 也存在一定的滯後(需等收到響應)。所以恢復過程可以以當前讀取到的 LAC 作爲起始 LAC, 依次往後面查詢下一條數據(LAC + 1),如果這條數據已經存在到 Qa 個節點中,那這條數據就是可恢復的,向前推進 LAC 並查詢下一條數據。直到某條數據存在的節點數少於 Qa , 就可以認定這條數據是不可恢復的。那當前的 LAC 就是真實的 LAC, 恢復過程結束。以上方案判斷某條數據是否可恢復是根據收到的存在響應大於等於 Qa,所以對於每條數據都需要查詢全部節點。BK 客戶端實際上使用另一種更加快速的方式來判斷數據是否可恢復。數據是否可恢復判斷方式如下:
-
串行發送讀請求
-
如果收到存在響應,認定數據可恢復,推進 LAC, 繼續恢復下一條數據。
-
其餘響應依次讀取下一個 Bookie 存儲節點
-
如果已經有 Qf (Fence 章節中定義)個節點明確返回不存在該數據,表明該數據不可能存在於 Qa 個節點上,後續的數據也不用查詢了,恢復結束,當前 LAC 爲最終需要恢復的 LAC。
下面使用僞代碼描述如果判斷單條數據是否可恢復。
int notFoundCount = 0;
for(Bookie bookie : writeSet) {
result = bookie.query(lac + 1);
switch( result) {
case : "found"
return "數據可恢復"; // 可恢復,推進LAC 讀取下一條數據
case : "not found"
if( ++ notFoundCount >= Qf) {
// 終止恢復,當前 LAC 作爲最終 LAC
return "數據無法恢復";
};
break;
case : "unkonwn" // 連接不上、超時等
}
}
throw Exception("多節點未知,終止恢復") // 無法恢復,等待重試
舉例如下:
Qw=3, Qa = 2, Qf = 3 - 2 + 1 = 2
向bookie0, bookie1, bookie2 同時發出讀取請求
1. 收到 bookie0 或 bookie1 或 bookie2 存在響應,可恢復
2. bookie0 否定回答,bookie1 否定回答, bookie2 未知回答,不可恢復
3. bookie0 否定回答,bookie1 未知回答, bookie2 未知回答,恢復失敗,需要重試
需要特別注意,由於 LAC 非常重要,如果最終恢復的 LAC 小於實際 LAC,就會發生日誌截斷,相當於這段數據就丟失了。所以我們恢復過程中,需要明確收到 Qf 個 Bookie 節點返回數據不存在,而未知返回(節點暫時不可用,超時等)不能等同於數據不存在。
以上方案爲了快速恢復,只需要任意一臺節點返回數據存在,就認爲可恢復,從而推進 LAC。假設這條數據實際上存在份數不足 Qa,就不能容忍後續對應的存儲節點故障了。所以會把這條數據覆蓋寫入全部的存儲節點(寫成功後自然 LAC 會更新),寫入失敗(未收到 Qa 個成功)會導致恢復失敗。Bookie 是 KV 存儲,支持冪等覆蓋寫。
代碼如下:
開始執行恢復 LAC。
發起讀請求。
返回失敗後,向下一個節點發送讀請求,如果收到 Qf 個不存在響應,恢復終止。
讀取成功(包含失敗重試其他節點): 回寫這條數據回 Bookie(LAC 自然推進), 觸發下一條數據恢復 讀取失敗(Qf 個節點明確返回數據不存在): 當前 LAC 就爲需要恢復的真正 LAC,恢復成功。
消費
消費邏輯相對簡單,當前 Ledger 在寫入時,只有當前寫入的客戶端可以讀取數據,使用內存中的 LAC 防止讀取越線。其他客戶端只能讀取已關閉的 Ledger,首先從 ZK 中獲取元數據(包含 LAC),然後正常向對應 Bookie 發起請求即可。失敗後嘗試下一個 Bookie 節點,直到成功或者所有節點都嘗試過。恢復 LAC 的讀取過程和正常讀取數據使用同一套邏輯,且相對簡單,這裏就不做代碼分析。
可以看到 BK 客戶端讀取消息是按單條消息來讀取的,會造成請求數較多。高版本 BK 已經做了一定優化,客戶端提供了批讀能力,可以和服務端一次交互就讀到多條消息。這裏有個前提條件,就是數據不能條帶化寫入,因爲條帶化寫入會讓數據分散到多臺節點,單臺節點內數據不連續,所以生產環境還是建議配置 E=Qw=Qa 。
結語
本文主要從客戶端視角分析 BK 一致性協議設計理念以及實現原理,包含 Leger 創建、數據的寫入、讀取以及客戶端服務端故障恢復等內容。可以看到 BK 的一致性協議還是有一些有趣的地方,並且實實在在的解決了一些問題,也能理解到 Pulsar 的存算分離並不是簡單的加一層無狀態代理層來實現的。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/zw7KxjZluMZm_bfnWPTSAA