Kafka 高可靠高性能原理探究
作者:mo
引言
在探究 Kafka 核心知識之前,我們先思考一個問題:什麼場景會促使我們使用 Kafka? 說到這裏,我們頭腦中或多或少會蹦出異步解耦和削峯填谷等字樣,是的,這就是 Kafka 最重要的落地場景。
-
異步解耦:同步調用轉換成異步消息通知,實現生產者和消費者的解耦。想象一個場景,在商品交易時,在訂單創建完成之後,需要觸發一系列其他的操作,比如進行用戶訂單數據的統計、給用戶發送短信、給用戶發送郵件等等。如果所有操作都採用同步方式實現,將嚴重影響系統性能。針對此場景,我們可以利用消息中間件解耦訂單創建操作和其他後續行爲。
-
削峯填谷:利用 broker 緩衝上游生產者瞬時突發的流量,使消費者消費流量整體平滑。對於發送能力很強的上游系統,如果沒有消息中間件的保護,下游系統可能會直接被壓垮導致全鏈路服務雪崩。想象秒殺業務場景,上游業務發起下單請求,下游業務執行秒殺業務(庫存檢查,庫存凍結,餘額凍結,生成訂單等等),下游業務處理的邏輯是相當複雜的,併發能力有限,如果上游服務不做限流策略,瞬時可能把下游服務壓垮。針對此場景,我們可以利用 MQ 來做削峯填谷,讓高峯流量填充低谷空閒資源,達到系統資源的合理利用。
通過上述例子可以發現交易、支付等場景常需要異步解耦和削峯填谷功能解決問題,而交易、支付等場景對性能、可靠性要求特別高。那麼,我們本文的主角 Kafka 能否滿足相應要求呢?下面我們來探討下。
Kafka 宏觀認知
在探究 Kafka 的高性能、高可靠性之前,我們從宏觀上來看下 Kafka 的系統架構:
如上圖所示,Kafka 由 Producer、Broker、Consumer 以及負責集羣管理的 ZooKeeper 組成,各部分功能如下:
-
Producer:生產者,負責消息的創建並通過一定的路由策略發送消息到合適的 Broker;
-
Broker:服務實例,負責消息的持久化、中轉等功能;
-
Consumer :消費者,負責從 Broker 中拉取(Pull)訂閱的消息並進行消費,通常多個消費者構成一個分組,消息只能被同組中的一個消費者消費;
-
ZooKeeper:負責 broker、consumer 集羣元數據的管理等;(注意:Producer 端直接連接 broker,不在 zk 上存任何數據,只是通過 ZK 監聽 broker 和 topic 等信息)
上圖消息流轉過程中,還有幾個特別重要的概念—主題(Topic)、分區(Partition)、分段 (segment)、位移(offset)。
-
topic:消息主題。Kafka 按 topic 對消息進行分類,我們在收發消息時只需指定 topic。
-
partition:分區。爲了提升系統的吞吐,一個 topic 下通常有多個 partition,partition 分佈在不同的 Broker 上,用於存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了並行的消息處理能力和橫向擴容能力。另外,爲了提升系統的可靠性,partition 通常會分組,且每組有一個主 partition、多個副本 partition,且分佈在不同的 broker 上,從而起到容災的作用。
-
segment:分段。宏觀上看,一個 partition 對應一個日誌(Log)。由於生產者生產的消息會不斷追加到 log 文件末尾,爲防止 log 文件過大導致數據檢索效率低下,Kafka 採取了分段和索引機制,將每個 partition 分爲多個 segment,同時也便於消息的維護和清理。每個 segment 包含一個. log 日誌文件、兩個索引 (.index、timeindex) 文件以及其他可能的文件。每個 Segment 的數據文件以該段中最小的 offset 爲文件名,當查找 offset 的 Message 的時候,通過二分查找快找到 Message 所處於的 Segment 中。
-
offset:消息在日誌中的位置,消息在被追加到分區日誌文件的時候都會分配一個特定的偏移量。offset 是消息在分區中的唯一標識,是一個單調遞增且不變的值。Kafka 通過它來保證消息在分區內的順序性,不過 offset 並不跨越分區,也就是說,Kafka 保證的是分區有序而不是主題有序。
Kafka 高可靠性、高性能探究
在對 Kafka 的整體系統框架及相關概念簡單瞭解後,下面我們來進一步深入探討下高可靠性、高性能實現原理。
Kafka 高可靠性探究
Kafka 高可靠性的核心是保證消息在傳遞過程中不丟失,涉及如下核心環節:
-
消息從生產者可靠地發送至 Broker;-- 網絡、本地丟數據;
-
發送到 Broker 的消息可靠持久化;-- Pagecache 緩存落盤、單點崩潰、主從同步跨網絡;
-
消費者從 Broker 消費到消息且最好只消費一次 -- 跨網絡消息傳輸 。
消息從生產者可靠地發送至 Broker
爲了保障消息從生產者可靠地發送至 Broker,我們需要確保兩點;
-
Producer 發送消息後,能夠收到來自 Broker 的消息保存成功 ack;
-
Producer 發送消息後,能夠捕獲超時、失敗 ack 等異常 ack 並做處理。
ack 策略
針對問題 1,Kafka 爲我們提供了三種 ack 策略,
-
Request.required.acks = 0:請求發送即認爲成功,不關心有沒有寫成功,常用於日誌進行分析場景;
-
Request.required.acks = 1:當 leader partition 寫入成功以後,纔算寫入成功,有丟數據的可能;
-
Request.required.acks= -1:ISR 列表裏面的所有副本都寫完以後,這條消息纔算寫入成功,強可靠性保證;
爲了實現強可靠的 kafka 系統,我們需要設置 Request.required.acks= -1,同時還會設置集羣中處於正常同步狀態的副本 follower 數量 min.insync.replicas>2,另外,設置 unclean.leader.election.enable=false 使得集羣中 ISR 的 follower 纔可變成新的 leader,避免特殊情況下消息截斷的出現。
消息發送策略
針對問題 2,kafka 提供兩類消息發送方式:同步(sync)發送和異步(async)發送,相關參數如下:
以 sarama 實現爲例,在消息發送的過程中,無論是同步發送還是異步發送都會涉及到兩個協程 -- 負責消息發送的主協程和負責消息分發的 dispatcher 協程。
異步發送
對於異步發送 (ack != 0 場景,等於 0 時不關心寫 kafka 結果,後文詳細講解) 而言,其流程大概如下:
-
在主協程中調用異步發送 kafka 消息的時候,其本質是將消息體放進了一個 input 的 channel,只要入 channel 成功,則這個函數直接返回,不會產生任何阻塞。相反,如果入 channel 失敗,則會返回錯誤信息。因此調用 async 寫入的時候返回的錯誤信息是入 channel 的錯誤信息,至於具體最終消息有沒有發送到 kafka 的 broker,我們無法從返回值得知。
-
當消息進入 input 的 channel 後,會有另一個 dispatcher 的協程負責遍歷 input,來真正發送消息到特定 Broker 上的主 Partition 上。發送結果通過一個異步協程進行監聽,循環處理 err channel 和 success channel,出現了 error 就記一個日誌。因此異步寫入場景時,寫 kafka 的錯誤信息,我們暫時僅能夠從這個錯誤日誌來得知具體發生了什麼錯,並且也不支持我們自建函數進行兜底處理,這一點在 trpc-go 的官方也得到了承認。
同步發送
同步發送 (ack != 0 場景) 是在異步發送的基礎上加以條件限制實現的。同步消息發送在 newSyncProducerFromAsyncProducer 中開啓兩個異步協程處理消息成功與失敗的 “回調”,並使用 waitGroup 進行等待,從而將異步操作轉變爲同步操作,其流程大概如下:
通過上述分析可以發現,kafka 消息發送本質上都是異步的,不過同步發送通過 waitGroup 將異步操作轉變爲同步操作。同步發送在一定程度上確保了我們在跨網絡向 Broker 傳輸消息時,消息一定可以可靠地傳輸到 Broker。因爲在同步發送場景我們可以明確感知消息是否發送至 Broker,若因網絡抖動、機器宕機等故障導致消息發送失敗或結果不明,可通過重試等手段確保消息至少一次(at least once) 發送到 Broker。另外,Kafka(0.11.0.0 版本後)還爲 Producer 提供兩種機制來實現精確一次(exactly once) 消息發送:冪等性(Idempotence)和事務(Transaction)。
小結
通過 ack 策略配置、同步發送、事務消息組合能力,我們可以實現 exactly once 語意跨網絡向 Broker 傳輸消息。但是,Producer 收到 Broker 的成功 ack,消息一定不會丟失嗎?爲了搞清這個問題,我們首先要搞明白 Broker 在接收到消息後做了哪些處理。
發送到 Broker 的消息可靠持久化
爲了確保 Producer 收到 Broker 的成功 ack 後,消息一定不在 Broker 環節丟失,我們核心要關注以下幾點:
-
Broker 返回 Producer 成功 ack 時,消息是否已經落盤;
-
Broker 宕機是否會導致數據丟失,容災機制是什麼;
-
Replica 副本機制帶來的多副本間數據同步一致性問題如何解決;
Broker 異步刷盤機制
kafka 爲了獲得更高吞吐,Broker 接收到消息後只是將數據寫入 PageCache 後便認爲消息已寫入成功,而 PageCache 中的數據通過 linux 的 flusher 程序進行異步刷盤(刷盤觸發條:主動調用 sync 或 fsync 函數、可用內存低於閥值、dirty data 時間達到閥值),將數據順序寫到磁盤。消息處理示意圖如下:
由於消息是寫入到 pageCache,單機場景,如果還沒刷盤 Broker 就宕機了,那麼 Producer 產生的這部分數據就可能丟失。爲了解決單機故障可能帶來的數據丟失問題,Kafka 爲分區引入了副本機制。
Replica 副本機制
Kafka 每組分區通常有多個副本,同組分區的不同副本分佈在不同的 Broker 上,保存相同的消息 (可能有滯後)。副本之間是“一主多從” 的關係,其中 leader 副本負責處理讀寫請求,follower 副本負責從 leader 拉取消息進行同步。分區的所有副本統稱爲 AR(Assigned Replicas),其中所有與 leader 副本保持一定同步的副本(包括 leader 副本在內)組成 ISR(In-Sync Replicas),與 leader 同步滯後過多的副本組成 OSR(Out-of-Sync Replicas),由此可見,AR=ISR+OSR。
follower 副本是否與 leader 同步的判斷標準取決於 Broker 端參數 replica.lag.time.max.ms(默認爲 10 秒),follower 默認每隔 500ms 向 leader fetch 一次數據,只要一個 Follower 副本落後 Leader 副本的時間不連續超過 10 秒,那麼 Kafka 就認爲該 Follower 副本與 leader 是同步的。在正常情況下,所有的 follower 副本都應該與 leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合爲空。
當 leader 副本所在 Broker 宕機時,Kafka 會藉助 ZK 從 follower 副本中選舉新的 leader 繼續對外提供服務,實現故障的自動轉移,保證服務可用。爲了使選舉的新 leader 和舊 leader 數據儘可能一致,當 leader 副本發生故障時,默認情況下只有在 ISR 集合中的副本纔有資格被選舉爲新的 leader,而在 OSR 集合中的副本則沒有任何機會(可通過設置 unclean.leader.election.enable 改變)。
當 Kafka 通過多副本機制解決單機故障問題時,同時也帶來了多副本間數據同步一致性問題。Kafka 通過高水位更新機制、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數據同步一致性問題,下面我們來依次看下這幾大措施。
HW 和 LEO
首先,我們來看下兩個和 Kafka 中日誌相關的重要概念 HW 和 LEO:
-
HW: High Watermark,高水位,表示已經提交 (commit) 的最大日誌偏移量,Kafka 中某條日誌 “已提交” 的意思是 ISR 中所有節點都包含了此條日誌,並且消費者只能消費 HW 之前的數據;
-
LEO: Log End Offset,表示當前 log 文件中下一條待寫入消息的 offset;
如上圖所示,它代表一個日誌文件,這個日誌文件中有 8 條消息,0 至 5 之間的消息爲已提交消息,5 至 7 的消息爲未提交消息。日誌文件的 HW 爲 6,表示消費者只能拉取到 5 之前的消息,而 offset 爲 5 的消息對消費者而言是不可見的。日誌文件的 LEO 爲 8,下一條消息將在此處寫入。
注意:所有副本都有對應的 HW 和 LEO,只不過 Leader 副本比較特殊,Kafka 使用 Leader 副本的高水位來定義所在分區的高水位。換句話說,分區的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特點:
-
Leader HW:min(所有副本 LEO),爲此 Leader 副本不僅要保存自己的 HW 和 LEO,還要保存 follower 副本的 HW 和 LEO,而 follower 副本只需保存自己的 HW 和 LEO;
-
Follower HW:min(follower 自身 LEO,leader HW)。
注意:爲方便描述,下面 Leader HW 簡記爲 HWL,Follower HW 簡記爲 F,Leader LEO 簡記爲 LEOL ,Follower LEO 簡記爲 LEOF。
下面我們演示一次完整的 HW / LEO 更新流程:
- 初始狀態
HWL=0,LEOL=0,HWF=0,LEOF=0。
- Follower 第一次 fetch
-
Leader 收到 Producer 發來的一條消息完成存儲, 更新 LEOL=1;
-
Follower 從 Leader fetch 數據, Leader 收到請求,記錄 follower 的 LEOF =0,並且嘗試更新 HWL =min(全部副本 LEO)=0;
-
eade 返回 HWL=0 和 LEOL=1 給 Follower,Follower 存儲消息並更新 LEOF =1, HW=min(LEOF,HWL)=0。
- Follower 第二次 fetch
-
Follower 再次從 Leader fetch 數據, Leader 收到請求,記錄 follower 的 LEOF =1,並且嘗試更新 HWL =min(全部副本 LEO)=1;
-
leade 返回 HWL=1 和 LEOL=1 給 Follower,Leader 收到請求,更新自己的 HW=min(LEOF,HWL)=1。
上述更新流程中 Follower 和 Leader 的 HW 更新有時間 GAP。如果 Leader 節點在此期間發生故障,則 Follower 的 HW 和 Leader 的 HW 可能會處於不一致狀態,如果 Followe 被選爲新的 Leader 並且以自己的 HW 爲準對外提供服務,則可能帶來數據丟失或數據錯亂問題。
數據丟失
第 1 步:
-
副本 B 作爲 leader 收到 producer 的 m2 消息並寫入本地文件,等待副本 A 拉取。
-
副本 A 發起消息拉取請求,請求中攜帶自己的最新的日誌 offset(LEO=1),B 收到後更新自己的 HW 爲 1,並將 HW=1 的信息以及消息 m2 返回給 A。
-
A 收到拉取結果後更新本地的 HW 爲 1,並將 m2 寫入本地文件。發起新一輪拉取請求(LEO=2),B 收到 A 拉取請求後更新自己的 HW 爲 2,沒有新數據只將 HW=2 的信息返回給 A,並且回覆給 producer 寫入成功。此處的狀態就是圖中第一步的狀態。
第 2 步:
此時,如果沒有異常,A 會收到 B 的回覆,得知目前的 HW 爲 2,然後更新自身的 HW 爲 2。但在此時 A 重啓了,沒有來得及收到 B 的回覆,此時 B 仍然是 leader。A 重啓之後會以 HW 爲標準截斷自己的日誌,因爲 A 作爲 follower 不知道多出的日誌是否是被提交過的,防止數據不一致從而截斷多餘的數據並嘗試從 leader 那裏重新同步。
第 3 步:
B 崩潰了,min.isr 設置的是 1,所以 zookeeper 會從 ISR 中再選擇一個作爲 leader,也就是 A,但是 A 的數據不是完整的,從而出現了數據丟失現象。
問題在哪裏?在於 A 重啓之後以 HW 爲標準截斷了多餘的日誌。不截斷行不行?不行,因爲這個日誌可能沒被提交過(也就是沒有被 ISR 中的所有節點寫入過),如果保留會導致日誌錯亂。
數據錯亂
在分析日誌錯亂的問題之前,我們需要了解到 kafka 的副本可靠性保證有一個前提:在 ISR 中至少有一個節點。如果節點均宕機的情況下,是不保證可靠性的,在這種情況會出現數據丟失,數據丟失是可接受的。這裏我們分析的問題比數據丟失更加槽糕,會引發日誌錯亂甚至導致整個系統異常,而這是不可接受的。
第 1 步:
-
A 和 B 均爲 ISR 中的節點。副本 A 作爲 leader,收到 producer 的消息 m2 的請求後寫入 PageCache 並在某個時刻刷新到本地磁盤。
-
**副本 B 拉取到 m2 後寫入 PageCage 後(尚未刷盤)**再次去 A 中拉取新消息並告知 A 自己的 LEO=2,A 收到更新自己的 HW 爲 1 並回復給 producer 成功。
-
此時 A 和 B 同時宕機,B 的 m2 由於尚未刷盤,所以 m2 消息丟失。此時的狀態就是第 1 步的狀態。
第 2 步:
由於 A 和 B 均宕機,而 min.isr=1 並且 unclean.leader.election.enable=true(關閉 unclean 選擇策略),所以 Kafka 會等到第一個 ISR 中的節點恢復並選爲 leader,這裏不幸的是 B 被選爲 leader,而且還接收到 producer 發來的新消息 m3。注意,這裏丟失 m2 消息是可接受的,畢竟所有節點都宕機了。
第 3 步:
A 恢復重啓後發現自己是 follower,而且 HW 爲 2,並沒有多餘的數據需要截斷,所以開始和 B 進行新一輪的同步。但此時 A 和 B 均沒有意識到,offset 爲 1 的消息不一致了。
問題在哪裏?在於日誌的寫入是異步的,上面也提到 Kafka 的副本策略的一個設計是消息的持久化是異步的,這就會導致在場景二的情況下被選出的 leader 不一定包含所有數據,從而引發日誌錯亂的問題。
Leader Epoch
爲了解決上述缺陷,Kafka 引入了 Leader Epoch 的概念。leader epoch 和 raft 中的任期號的概念很類似,每次重新選擇 leader 的時候,用一個嚴格單調遞增的 id 來標誌,可以讓所有 follower 意識到 leader 的變化。而 follower 也不再以 HW 爲準,每次奔潰重啓後都需要去 leader 那邊確認下當前 leader 的日誌是從哪個 offset 開始的。下面看下 Leader Epoch 是如何解決上面兩個問題的。
數據丟失解決
這裏的關鍵點在於副本 A 重啓後作爲 follower,不是忙着以 HW 爲準截斷自己的日誌,而是先發起 LeaderEpochRequest 詢問副本 B 第 0 代的最新的偏移量是多少,副本 B 會返回自己的 LEO 爲 2 給副本 A,A 此時就知道消息 m2 不能被截斷,所以 m2 得到了保留。當 A 選爲 leader 的時候就保留了所有已提交的日誌,日誌丟失的問題得到解決。
如果發起 LeaderEpochRequest 的時候就已經掛了怎麼辦?這種場景下,不會出現日誌丟失,因爲副本 A 被選爲 leader 後不會截斷自己的日誌,日誌截斷只會發生在 follower 身上。
數據錯亂解決
這裏的關鍵點還是在第 3 步,副本 A 重啓作爲 follower 的第一步還是需要發起 LeaderEpochRequest 詢問 leader 當前第 0 代最新的偏移量是多少,由於副本 B 已經經過換代,所以會返回給 A 第 1 代的起始偏移(也就是 1),A 發現衝突後會截斷自己偏移量爲 1 的日誌,並重新開始和 leader 同步。副本 A 和副本 B 的日誌達到了一致,解決了日誌錯亂。
小結
Broker 接收到消息後只是將數據寫入 PageCache 後便認爲消息已寫入成功,但是,通過副本機制並結合 ACK 策略可以大概率規避單機宕機帶來的數據丟失問題,並通過 HW、副本同步機制、 Leader Epoch 等多種措施解決了多副本間數據同步一致性問題,最終實現了 Broker 數據的可靠持久化。
消費者從 Broker 消費到消息且最好只消費一次
Consumer 在消費消息的過程中需要向 Kafka 彙報自己的位移數據,只有當 Consumer 向 Kafka 彙報了消息位移,該條消息纔會被 Broker 認爲已經被消費。因此,Consumer 端消息的可靠性主要和 offset 提交方式有關,Kafka 消費端提供了兩種消息提交方式:
正常情況下我們很難實現 exactly once 語意的消息,通常是通過手動提交 + 冪等實現消息的可靠消費。
Kafka 高性能探究
Kafka 高性能的核心是保障系統低延遲、高吞吐地處理消息,爲此,Kafaka 採用了許多精妙的設計:
-
異步發送
-
批量發送
-
壓縮技術
-
Pagecache 機制 & 順序追加落盤
-
零拷貝
-
稀疏索引
-
broker & 數據分區
-
多 reactor 多線程網絡模型
異步發送
如上文所述,Kafka 提供了異步和同步兩種消息發送方式。在異步發送中,整個流程都是異步的。調用異步發送方法後,消息會被寫入 channel,然後立即返回成功。Dispatcher 協程會從 channel 輪詢消息,將其發送到 Broker,同時會有另一個異步協程負責處理 Broker 返回的結果。同步發送本質上也是異步的,但是在處理結果時,同步發送通過 waitGroup 將異步操作轉換爲同步。使用異步發送可以最大化提高消息發送的吞吐能力。
批量發送
Kafka 支持批量發送消息,將多個消息打包成一個批次進行發送,從而減少網絡傳輸的開銷,提高網絡傳輸的效率和吞吐量。Kafka 的批量發送消息是通過以下兩個參數來控制的:
-
batch.size:控制批量發送消息的大小,默認值爲 16KB,可適當增加 batch.size 參數值提升吞吐。但是,需要注意的是,如果批量發送的大小設置得過大,可能會導致消息發送的延遲增加,因此需要根據實際情況進行調整。
-
linger.ms:控制消息在批量發送前的等待時間,默認值爲 0。當 linger.ms 大於 0 時,如果有消息發送,Kafka 會等待指定的時間,如果等待時間到達或者批量大小達到 batch.size,就會將消息打包成一個批次進行發送。可適當增加 linger.ms 參數值提升吞吐,比如 10 ~ 100。
在 Kafka 的生產者客戶端中,當發送消息時,如果啓用了批量發送,Kafka 會將消息緩存到緩衝區中。當緩衝區中的消息大小達到 batch.size 或者等待時間到達 linger.ms 時,Kafka 會將緩衝區中的消息打包成一個批次進行發送。如果在等待時間內沒有達到 batch.size,Kafka 也會將緩衝區中的消息發送出去,從而避免消息積壓。
壓縮技術
Kafka 支持壓縮技術,通過將消息進行壓縮後再進行傳輸,從而減少網絡傳輸的開銷 (壓縮和解壓縮的過程會消耗一定的 CPU 資源,因此需要根據實際情況進行調整。),提高網絡傳輸的效率和吞吐量。Kafka 支持多種壓縮算法,在 Kafka2.1.0 版本之前,僅支持 GZIP,Snappy 和 LZ4,2.1.0 後還支持 Zstandard 算法(Facebook 開源,能夠提供超高壓縮比)。這些壓縮算法性能對比(兩指標都是越高越好)如下:
- 吞吐量:LZ4>Snappy>zstd 和 GZIP,壓縮比:zstd>LZ4>GZIP>Snappy。
在 Kafka 中,壓縮技術是通過以下兩個參數來控制的:
-
compression.type:控制壓縮算法的類型,默認值爲 none,表示不進行壓縮。
-
compression.level:控制壓縮的級別,取值範圍爲 0-9,默認值爲 - 1。當值爲 - 1 時,表示使用默認的壓縮級別。
在 Kafka 的生產者客戶端中,當發送消息時,如果啓用了壓縮技術,Kafka 會將消息進行壓縮後再進行傳輸。在消費者客戶端中,如果消息進行了壓縮,Kafka 會在消費消息時將其解壓縮。注意:Broker 如果設置了和生產者不通的壓縮算法,接收消息後會解壓後重新壓縮保存。Broker 如果存在消息版本兼容也會觸發解壓後再壓縮。
Pagecache 機制 & 順序追加落盤
kafka 爲了提升系統吞吐、降低時延,Broker 接收到消息後只是將數據寫入 PageCache 後便認爲消息已寫入成功,而 PageCache 中的數據通過 linux 的 flusher 程序進行異步刷盤(避免了同步刷盤的巨大系統開銷),將數據順序追加寫到磁盤日誌文件中。由於 pagecache 是在內存中進行緩存,因此讀寫速度非常快,可以大大提高讀寫效率。順序追加寫充分利用順序 I/O 寫操作,避免了緩慢的隨機 I/O 操作,可有效提升 Kafka 吞吐。
如上圖所示,消息被順序追加到每個分區日誌文件的尾部。
零拷貝
Kafka 中存在大量的網絡數據持久化到磁盤(Producer 到 Broker)和磁盤文件通過網絡發送(Broker 到 Consumer)的過程,這一過程的性能直接影響 Kafka 的整體吞吐量。傳統的 IO 操作存在多次數據拷貝和上下文切換,性能比較低。Kafka 利用零拷貝技術提升上述過程性能,其中網絡數據持久化磁盤主要用 mmap 技術,網絡數據傳輸環節主要使用 sendfile 技術。
索引加速之 mmap
傳統模式下,數據從網絡傳輸到文件需要 4 次數據拷貝、4 次上下文切換和兩次系統調用。如下圖所示:
爲了減少上下文切換以及數據拷貝帶來的性能開銷,Kafka 使用 mmap 來處理其索引文件。Kafka 中的索引文件用於在提取日誌文件中的消息時進行高效查找。這些索引文件被維護爲內存映射文件,這允許 Kafka 快速訪問和搜索內存中的索引,從而加速在日誌文件中定位消息的過程。mmap 將內核中讀緩衝區(read buffer)的地址與用戶空間的緩衝區(user buffer)進行映射,從而實現內核緩衝區與應用程序內存的共享,省去了將數據從內核讀緩衝區(read buffer)拷貝到用戶緩衝區(user buffer)的過程,整個拷貝過程會發生 4 次上下文切換,1 次 CPU 拷貝和 2 次 DMA 拷貝。
網絡數據傳輸之 sendfile
傳統方式實現:先讀取磁盤、再用 socket 發送,實際也是進過四次 copy。如下圖所示:
爲了減少上下文切換以及數據拷貝帶來的性能開銷,Kafka 在 Consumer 從 Broker 讀數據過程中使用了 sendfile 技術。具體在這裏採用的方案是通過 NIO 的 transferTo/transferFrom
調用操作系統的 sendfile 實現零拷貝。總共發生 2 次內核數據拷貝、2 次上下文切換和一次系統調用,消除了 CPU 數據拷貝,如下:
稀疏索引
爲了方便對日誌進行檢索和過期清理,kafka 日誌文件除了有用於存儲日誌的. log 文件,還有一個位移索引文件. index 和一個時間戳索引文件. timeindex 文件,並且三文件的名字完全相同,如下:
Kafka 的索引文件是按照稀疏索引的思想進行設計的。稀疏索引的核心是不會爲每個記錄都保存索引,而是寫入一定的記錄之後纔會增加一個索引值,具體這個間隔有多大則通過 log.index.interval.bytes 參數進行控制,默認大小爲 4 KB,意味着 Kafka 至少寫入 4KB 消息數據之後,纔會在索引文件中增加一個索引項。可見,單條消息大小會影響 Kakfa 索引的插入頻率,因此 log.index.interval.bytes 也是 Kafka 調優一個重要參數值。由於索引文件也是按照消息的順序性進行增加索引項的,因此 Kafka 可以利用二分查找算法來搜索目標索引項,把時間複雜度降到了 O(lgN),大大減少了查找的時間。
位移索引文件. index
位移索引文件的索引項結構如下:
相對位移:保存於索引文件名字上面的起始位移的差值,假設一個索引文件爲:00000000000000000100.index,那麼起始位移值即 100,當存儲位移爲 150 的消息索引時,在索引文件中的相對位移則爲 150 - 100 = 50,這麼做的好處是使用 4 字節保存位移即可,可以節省非常多的磁盤空間。
文件物理位置:消息在 log 文件中保存的位置,也就是說 Kafka 可根據消息位移,通過位移索引文件快速找到消息在 log 文件中的物理位置,有了該物理位置的值,我們就可以快速地從 log 文件中找到對應的消息了。下面我用圖來表示 Kafka 是如何快速檢索消息:
假設 Kafka 需要找出位移爲 3550 的消息,那麼 Kafka 首先會使用二分查找算法找到小於 3550 的最大索引項:[3528, 2310272],得到索引項之後,Kafka 會根據該索引項的文件物理位置在 log 文件中從位置 2310272 開始順序查找,直至找到位移爲 3550 的消息記錄爲止。
時間戳索引文件. timeindex
Kafka 在 0.10.0.0 以後的版本當中,消息中增加了時間戳信息,爲了滿足用戶需要根據時間戳查詢消息記錄,Kafka 增加了時間戳索引文件,時間戳索引文件的索引項結構如下:
時間戳索引文件的檢索與位移索引文件類似,如下快速檢索消息示意圖:
broker & 數據分區
Kafka 集羣包含多個 broker。一個 topic 下通常有多個 partition,partition 分佈在不同的 Broker 上,用於存儲 topic 的消息,這使 Kafka 可以在多臺機器上處理、存儲消息,給 kafka 提供給了並行的消息處理能力和橫向擴容能力。
多 reactor 多線程網絡模型
多 Reactor 多線程網絡模型 是一種高效的網絡通信模型,可以充分利用多核 CPU 的性能,提高系統的吞吐量和響應速度。Kafka 爲了提升系統的吞吐,在 Broker 端處理消息時採用了該模型,示意如下:
SocketServer 和 KafkaRequestHandlerPool 是其中最重要的兩個組件:
-
SocketServer:實現 Reactor 模式,用於處理多個 Client(包括客戶端和其他 broker 節點)的併發請求,並將處理結果返回給 Client
-
KafkaRequestHandlerPool:Reactor 模式中的 Worker 線程池,裏面定義了多個工作線程,用於處理實際的 I/O 請求邏輯。
整個服務端處理請求的流程大致分爲以下幾個步驟:
-
Acceptor 接收客戶端發來的請求
-
輪詢分發給 Processor 線程處理
-
Processor 將請求封裝成 Request 對象,放到 RequestQueue 隊列
-
KafkaRequestHandlerPool 分配工作線程,處理 RequestQueue 中的請求
-
KafkaRequestHandler 線程處理完請求後,將響應 Response 返回給 Processor 線程
-
Processor 線程將響應返回給客戶端
其他知識探究
負載均衡
生產者負載均衡
Kafka 生產端的負載均衡主要指如何將消息發送到合適的分區。Kafka 生產者生產消息時,根據分區器將消息投遞到指定的分區中,所以 Kafka 的負載均衡很大程度上依賴於分區器。Kafka 默認的分區器是 Kafka 提供的 DefaultPartitioner。它的分區策略是根據 Key 值進行分區分配的:
-
如果 key 不爲 null:對 Key 值進行 Hash 計算,從所有分區中根據 Key 的 Hash 值計算出一個分區號;擁有相同 Key 值的消息被寫入同一個分區,順序消息實現的關鍵;
-
如果 key 爲 null:消息將以輪詢的方式,在所有可用分區中分別寫入消息。如果不想使用 Kafka 默認的分區器,用戶可以實現 Partitioner 接口,自行實現分區方法。
消費者負載均衡
在 Kafka 中,每個分區(Partition)只能由一個消費者組中的一個消費者消費。當消費者組中有多個消費者時,Kafka 會自動進行負載均衡,將分區均勻地分配給每個消費者。在 Kafka 中,消費者負載均衡算法可以通過設置消費者組的 partition.assignment.strategy 參數來選擇。目前主流的分區分配策略以下幾種:
-
range: 在保證均衡的前提下,將連續的分區分配給消費者,對應的實現是 RangeAssignor;
-
**round-robin:**在保證均衡的前提下,輪詢分配,對應的實現是 RoundRobinAssignor;
-
0.11.0.0 版本引入了一種新的分區分配策略 StickyAssignor,其優勢在於能夠保證分區均衡的前提下儘量保持原有的分區分配結果,從而避免許多冗餘的分區分配操作,減少分區再分配的執行時間。
集羣管理
Kafka 藉助 ZooKeeper 進行集羣管理。Kafka 中很多信息都在 ZK 中維護,如 broker 集羣信息、consumer 集羣信息、 topic 相關信息、 partition 信息等。Kafka 的很多功能也是基於 ZK 實現的,如 partition 選主、broker 集羣管理、consumer 負載均衡等,限於篇幅本文將不展開陳述,這裏先附一張網上截圖大家感受下:
參考文獻
-
https://www.cnblogs.com/arvinhuang/p/16437948.html
-
https://segmentfault.com/a/1190000039133960
-
http://matt33.com/2018/11/04/kafka-transaction/
-
https://blog.51cto.com/u_14020077/5836698
-
https://t1mek1ller.github.io/2020/02/15/kafka-leader-epoch/
-
https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
-
https://xie.infoq.cn/article/c06fea629926e2b6a8073e2f0
-
https://xie.infoq.cn/article/8191412c8da131e78cbfa6600
-
https://mp.weixin.qq.com/s/iEk0loXsKsMO_OCVlUsk2Q
-
https://cloud.tencent.com/developer/article/1657649
-
https://www.cnblogs.com/vivotech/p/16347074.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/_g11mmmQse6KrkUE8x4abQ