kafka 三高架構設計剖析

kafka 三高架構概述

    由於最近事情比較多, 工作也比較忙, 這篇差點難產, 經過幾個週末的構思和梳理, 終於跟大家見面了, 在上一篇我們講述了 kafka 的基礎入門, 工作流程, 存儲機制, 副本等知識, 本篇會爲大家揭祕 kafka 高可用, 高性能, 高併發架構設計奧祕。

   Kafka 向來以高吞吐量,低延遲,高併發,  高可擴展性而自稱,並在越來越多的場景中應用,這時候就對其穩定性的要求就越高。接下來就爲大家一一呈現裏面的細節。

kafka 高可用設計

Leader 選舉機制

        Kafka 中的選舉大致分爲三大類:  控制器的選舉,  Leader 的選舉, 消費者的選舉。在講解 Leader 選舉之前, 先說說 Kafka 控制器, 即 Broker。它除了具有一般 Broker 的功能外, 還具有選舉分區 Leader 節點的功能, 在啓動 Kafka 系統時候, 其中一個 Broker 會被選舉爲控制器, 負責管理主題分區和副本的狀態, 還會執行重分配的任務。

      控制器的啓動順序如下:

1) 第一個啓動的節點,會在 Zookeeper 系統裏面創建一個臨時節點 /controller ,並寫入該節點的註冊信息,使該節點成爲控制器。

2) 其他的節點在陸續啓動時,也會嘗試在 Zookeeper 系統中創建 /controller 節點,但是 /controller 節點已經存在,所以會拋出 “創建 / controller 節點失敗異常” 的信息。創建失敗的節點會根據返回的結果,判斷出在 Kafka 集羣中已經有一個控制器被成功創建了,所以放棄創建 /controller 節點,這樣就確保了 Kafka 集羣控制器的唯一性。

3) 其他的節點,也會在控制器上註冊相應的監聽器,各個監聽器負責監聽各自代理節點的狀態變化。當監聽到節點狀態發生變化時,會觸發相應的監聽函數進行處理。

       說完控制器方面的知識, 我們來講解 Leader 節點的選舉過程, 選舉控制器的核心思路是:各個節點公平競爭搶佔 Zookeeper 系統中創建 /controller 臨時節點,最先創建成功的節點會成爲控制器,並擁有選舉主題分區 Leader 節點的功能。選舉流程如下圖所示:

副本機制

       副本機制簡單來說就是備份機制,就是在分佈式集羣中保存着相同的數據備份。那麼副本機制的好處就是提供數據冗餘,   副本機制是 kafka 確保系統高可用和高持久的重要基石。

      爲了保證高可用,kafka 的分區是多副本的,如果其中一個副本丟失了,那麼還可以從其他副本中獲取分區數據 (要求對應副本的數據必須是完整的)。這是 Kafka 數據一致性的基礎, 下面將詳解介紹 Kafka 的副本機制。

      Kafka 使用 Zookeeper 來維護集羣 Brokers 的信息,每個 Broker 都有一個唯一的標識**broker.id**,用於標識自己在集羣中的身份。Brokers 會通過 Zookeeper 選舉出一個叫**Controller Broker**節點,它除了具備其它 Brokers 的功能外,還負責管理主題分區及其副本的狀態。

     在 Kafka 中 Topic 被分爲多個分區(Partition),分區是 Kafka 最基本的存儲單位。在創建主題的時候可使用**replication-factor**參數指定分區的副本個數。分區副本總會有一個 Leader 副本,所有的消息都直接發送給 Leader 副本,其它副本都需要通過複製 Leader 中的數據來保證數據一致。當 Leader 副本不可用時,其中一個 Follower 將會被選舉併成爲新的 Leader。

ISR 機制

✎ 認識 ISR

       如上圖所示, 每個分區都有一個 ISR(in-sync Replica) 列表,用於維護所有同步的、可用的副本。Leader 副本必然是同步副本,也就是說, ISR 不只是追隨者副本集合, 它比如包括 Leader 副本。甚至在某些情況下, ISR 只有 Leader 這一個副本, 而對於 Follower 副本來說,它需要滿足以下條件才能被認爲是同步副本:

**          1) **必須定時向 Zookeeper 發送心跳;

****          2) 在規定的時間內從 Leader 副本 "低延遲" 地獲取過消息。

      如果副本不滿足上面條件的話,就會被從 ISR 列表中移除,直到滿足條件纔會被再次加入。所以就可能會存在 Follower 不可能與 Leader 實時同步的風險。

     Kafka 判斷 Follower 是否與 Leader 同步的條件就是 Broker 端參數 replica.lag.time.max.ms 參數值。這個參數的含義就是 Follower 副本能夠落後 Leader 副本的最長時間間隔, 當前默認值爲 10 秒, 也就是說, 只要一個 Follower 副本落後 Leader 副本的時間不連續超過 10 秒, Kafka 就認爲兩者是同步的, 即使 Follower 副本中保持的消息要少於 Leader 副本中的消息。

     Kafka 中 ISR 的管理最終都會反饋到 Zookeeper 節點上。具體位置爲:**/brokers/topics/[topic]/partitions/[partition]/state****。**目前有兩個地方會對這個 Zookeeper 的節點進行維護:

1) Controller 來維護:Kafka 集羣中的其中一個 Broker 會被選舉爲 Controller,主要負責 Partition 管理和副本狀態管理,也會執行重分配 Partition 之類的管理任務。在符合某些特定條件下,Controller 下的 LeaderSelector 會選舉新的 Leader,ISR 和新的 leader_epoch 及 controller_epoch 寫入 Zookeeper 的相關節點中。同時發起 leaderAndIsrRequest 通知所有的 Replicas。

2) Leader 來維護:Leader 有單獨的線程定期檢測 ISR 中 Follower 是否脫離 ISR , 如果發現 ISR 變化,則會將新的 ISR 信息返回到 Zookeeper 的相關節點中。

ACK 機制

     這個 acks 參數在 kafka 的使用中,是非常核心以及關鍵的一個參數,決定了很多東西, 這個 acks 跟副本機制, 同步機制, ISR 機制都密切相關, 如果無法理解這些, 是無法充分理解 acks 參數的含義。

   首先這個 acks 參數,是在 KafkaProducer,也就是生產者客戶端裏設置的。那麼也就是說,你往 kafka 寫數據的時候,就可以來設置這個 acks 參數。這個參數實際上有三種常見的值可以設置,分別是:0、1 和 all。

acks = 0

   如果 acks 設置爲 0,那麼 Producer 是不會等待 Broker 的反饋。該消息會被立刻添加到 Socket Buffer 中就認爲已經發送完成。在這種情況下,服務器端是否收到請求是無法保證的,並且參數 Retries 也不會生效(因爲客戶端無法獲得失敗信息)。

    這個時候每個記錄返回的 Offset 總是被設置爲 - 1。這個模式下 Kafka 的吞吐量最大,併發最高,但是數據非常容易丟失,通常適用在一些記錄應用日誌,對數據要求不高的業務場景。

acks = 1

    如果 acks 設置爲 1,這個時候 Leader 節點會將記錄先寫入本地日誌,並且在所有 Follower 節點反饋之前就先確認成功。在這種情況下,如果 Leader 節點在接收記錄之後,並且在 Follower 節點複製數據完成之前發生錯誤,那麼這條記錄會丟失。這個模式和 Mysql 的主從異步複製一樣,主從之間會有數據差異,此配置爲 Kafka 默認配置。它平衡了數據安全和性能。

acks = all & min.insync.replicas >= 2

     如果 acks 設置爲 all,這個時候 Leader 節點會等待所有同步中的 LSR 副本確認之後再確認這條記錄是否發送完成。只要至少有一個同步副本存在,記錄就不會丟失。

     如果說 Leader 這時候剛接收到了消息,但是 Follower 沒有收到消息,此時 Leader 宕機了,那麼客戶端會感知到這個消息沒發送成功,他會重試再次發送消息過去。

   其中 Broker 有個配置項 min.insync.replicas(默認值爲 1) 代表了正常寫入生產者數據所需要的最少 ISR 個數,  當 ISR 中的副本數量小於 min.insync.replicas 時,Leader 停止寫入生產者生產的消息,並向生產者拋出 NotEnoughReplicas 異常,阻塞等待更多的 Follower 趕上並重新進入 ISR,  因此能夠容忍 min.insync.replicas-1 個副本同時宕機

     這種方式是犧牲了性能爲代價,適合對數據要求比較高的業務場景。

kafka 高性能設計

Reactor 多路複用模型

       提到 Reactor (多路複用), 就不得不提 Java 中的 NIO, 接下來 我們先來看下 Java 的 NIO。

      Java NIO 由以下幾個核心部分組成 :

          1) Channels;

          2)  Buffers;

          3)  Selectors;

     Channel 和 Java 中的 Stream 一樣, 用於傳輸數據的數據流, 數據可以 Channel 讀取到 Buffer 中, 也可以從 Buffer 寫到 Channel 中, 如下圖所示:

      Selector 允許單線程處理多個 Channel。使用 Selector,首先得向 Selector 註冊 Channel,然後調用它的 select() 方法。此方法會一直阻塞到某個註冊的 Channel 有事件就緒。一旦這個方法返回,線程就可以處理這些事件,事件的例子如新連接進來,數據接收等

     下圖爲一個單線程中使用一個 Selector 處理 3 個 Channel:

      Kafka SocketServer 是基於 Java NIO 開發的,採用了 Reactor 的模式 (已被大量實踐證明非常高效,在 Netty 和 Mina 中廣泛使用)。Kafka Reactor 的模式包含三種角色:

         1) Acceptor;

         2) Processor;

         3) Handler;

    Kafka Reacator 包含了 1 個 Acceptor 負責接受客戶端請求,N 個 Processor 線程負責讀寫數據 (即爲每個 Connection 創建出一個 Processor 去單獨處理, 每個 Processor 中均引用獨立的 Selector),M 個 Handler 來處理業務邏輯。在 Acceptor 和 Processor,Processor 和 Handler 之間都有隊列來緩衝請求。

      如下圖所示是 kafka 簡版的 Reactor 模型架構圖

生產消息流程

     生產者發送到 Kafka 集羣的詳細流程如下圖所示:

1)  首先來一條消息後, 生產者源碼裏面會對消息進行封裝成 ProducerRecord 對象。 

2)  封裝成對象後會對該對象進行序列化 [涉及網絡傳輸], 調用 Serializer 組件進行序列化, 序列化後進行發送。

3)  在發送前要確定一件事, 到底要把這條消息發送到哪個主題的哪個分區, 這個時候就需要通過 Partitioner 分區器 從 Kafka Broker 集羣中獲取集羣元數據,  獲取到元數據後就可以進行發送了。

4)  在 0.8 版本之前, 這個時候來了一條消息就會封裝成一個請求發送到 Broker, 這種情況下, 性能是非常差的, 在 0.8 版本之後, 進行簡單的改進, 性能得到了指數級上升, 即來了一條消息後不會立馬發送出去, 而是先寫入到一個緩存 (RecordAccumulator) 隊列中, 封裝成一個個批次(RecordBatch)。

 5) 這個時候會有一個 sender 線程會將多個批次封裝成一個請求 (Request), 然後進行發送, 這樣會減少很多請求, 提高吞吐量。這個時候有個問題,  一條消息過來後沒有立即發送出去, 而是封裝成了批次, 這樣會不會有延遲的問題,  默認的 batch.size 是 16K, 寫滿會立即發送, 如果寫不滿, 也會在規定的時間進行發送 (linger.ms = 500ms)

  1. 發送的時候 每個 Request 請求對應多路複用器 (Selector) 中的每個 kafka channel 然後將數據發送給 Broker 集羣

7) 在封裝 Batch 批次和 Request 請求的過程中, 還涉及一個重要的設計理念即內存池方案, 在後面的服務端內存池部分進行詳細說明

順序寫磁盤 + OS Cache

       首先 Kafka 爲了保證磁盤寫入性能,通過基於操作系統的頁緩存來實現文件寫入的。操作系統本身有一層緩存,叫做 page cache,是在內存裏的緩存,我們也可以稱之爲 os cache,意思就是操作系統自己管理的緩存。那麼在寫磁盤文件的時候,就可以先直接寫入 os cache 中,也就是僅僅寫入內存中,接下來由操作系統自己決定什麼時候把 os cache 裏的數據真的刷入到磁盤中, 這樣大大提高寫入效率和性能。 如下圖所示:

      另外還有一個非常關鍵的操作, 就是 kafka 在寫數據的時候是以磁盤順序寫的方式來進行落盤的, 即將數據追加到文件的末尾, 而不是在文件的隨機位置來修改數據, 對於普通機械磁盤, 如果是隨機寫的話, 涉及到磁盤尋址的問題, 導致性能確實極低,  但是如果只是按照順序的方式追加文件末尾的話, 這種磁盤順序寫的性能基本可以跟寫內存的性能差不多的。

零拷貝技術 (zero-copy)

      上面說完了寫入的過程, 我們來講講消費的這塊流程,  從 Kafka 消費數據, 在消費的時候實際上就是從 Kafka 的磁盤文件讀取數據然後發送給下游的消費者。大概過程如下:

‍‍‍‍‍‍‍‍‍‍‍‍‍1) 先檢查要讀取的數據是否在 os cache 中, 如果不在的話就從磁盤文件讀取數據後放入 os cache。

2) 接着從 os cache 裏面 copy 數據到應用程序進程的緩存裏面, 在從應用程序進程的緩存裏 copy 數據到操作系統層面的 socket 緩存裏面, 最後再從 socket 緩存裏面讀取數據後發送到網卡, 最後從網卡發送到下游的消費者。‍‍‍‍‍‍‍‍‍‍‍‍‍

從上圖可以看出, 整個過程有兩次沒必要的拷貝操作

  1. 從操作系統的 os cache  拷貝數據到應用程序進程的緩存。

  2. 接着又從應用程序緩存裏拷貝到操作系統的 socket 緩存中。

這兩次拷貝過程中, 還發生了好幾次上下文的切換, 所以相對來說是比較消耗性能的

kafka 爲了解決這個問題, 在讀取數據的時候就引入了零拷貝技術。即讓操作系統的 os cache 中的數據直接發送到網卡後傳出給下游的消費者,中間跳過了兩次拷貝數據的步驟,從而減少拷貝的 CPU 開銷, 減少用戶態內核態的上下文切換次數,  從而優化數據傳輸的性能, **Socket 緩存中僅僅會拷貝一個描述符過去,不會拷貝數據到 Socket 緩存。**如下圖所示:

常見的零拷貝思路主要有兩種實現方式:

1) 直接 I/O: 數據直接跳過內核, 在用戶空間與 I/O 設備之間進行傳遞, 內核在這種情況下只是進行必要的輔助工作

  1. copy-on-write: 寫時複製, 數據不需要提前進行拷貝, 而是在當需要修改的時候再進行部分數據的拷貝

      這裏, Kafka 主要使用到了 mmapsendfile 的方式來實現零拷貝,  對應 java 裏面的MappedByteBufferFileChannel.transferIO。

使用 java NIO 實現的 零拷貝, 如下:

     transferTo() 方法會將數據從文件通道傳輸到了給定的可寫字節通道。在其內部它依賴底層操作系統對零拷貝的支持;在 Linux 系統中,此調用被傳遞到 sendfile() 系統調用中,調用過程如下: 

壓縮傳輸

      默認情況下, 在 Kafka 生產者中不啓用壓縮  Compression 不僅可以更快地從生產者傳輸到代理, 還可以在複製過程中進行更快的傳輸。壓縮有助於提高吞吐量, 降低延遲並提高磁盤利用率。

      在 Kafka 中, 壓縮可能會發生在兩個地方: 生產者端和 Broker 端, 一句話總結下壓縮和解壓縮, 即 Producer 端壓縮, Broker 端保持, Consumer 端解壓縮。

 Kafka 支持多種壓縮算法: lz4, snappy, gzip,  從 Kafka 2.1.0 開始新增了 ZStandard 算法, 該算法是 Facebook 開源的壓縮算法,  能提供超高的壓縮比。

    Producer、Broker、Consumer 要使用相同的壓縮算法, 在 Producer 向 Broker 寫入數據, Consumer 向 Broker 讀取數據的時候可以不用解壓縮, 只需要在最終 Consumer 到消息的時候才進行解壓縮, 這樣可以節省大量的網絡和磁盤開銷。

服務端內存池設計

在前面我們講解了一條消息生產的詳細流程, 中間涉及到了批次 (Batch) 和請求(Request), 在這個過程中, Kafka 還有一個重要的設計理念 即內存池方案, 這裏就詳細講述下內存池的實現過程.

  1. 這裏簡化下流程, 來一條消息會先進行封裝然後序列化最後會計算出分區號, 並把這個消息存儲到緩存裏面

2)  這個緩存裏面也是有設計的 即批次隊列, 那麼這個批次隊列是使用什麼策略存儲呢? 一個分區對應一個隊列, 這裏有個重要的數據結構: Batches, 這個數據結構是 Key-value 形式, key 是消息主題的分區, value 是一個隊列, 裏面存儲的發送到對應分區的批次

3) 那麼假設這個時候 我們有個 2 個 topic, 每個 topic 有 2 個分區, 那麼是不是總共有 4 個的分區即 4 個隊列, 每個隊列裏面都有一個個批次,  這個時候消息算出來分區後就會寫入隊列的最新一個批次

4) Sender 線程就會檢測這個批次 (Batch) 是否已經寫滿, 或者時間是否到達, 如果滿足 Sender 線程就會取出封裝成 Request 就會發送

5) 封裝批次會用到內存, Sender 發送完畢內存會進行回收, 在 Java 中如果頻繁操作內存和回收, 會遇到頭疼的 FullGC 的問題, 工作線程的性能就會降低, 整個生產者的性能就會受到影響, Kafka 的解決方案就是內存池, 對內存塊的使用跟數據庫的連接池一樣

  1. 整個 Buffer Poll 內存池大小是 32M , 內存池分爲兩個部分, 一個部分是內存隊列, 隊列裏面有一個個內存塊 (16K), 另外一部分是可用內存,   一條消息過來後會向內存池申請內存塊, 申請完後封裝批次並寫入數據, sender 線程就會發送並響應, 然後清空內存放回內存池裏面進行反覆使用, 這樣就大大減少了 GC 的頻率, 保證了生產者的穩定和高效, 性能會大大提高

kafka 高併發設計

高併發網絡設計

  上面通過大量的篇幅講解了 kafka 生產者和服務端的高可用和高性能的方方面面, 這裏主要來分析下 Kafka 的超高併發網絡架構設計, 此架構設計是 Kafka 中最經典的。

      這裏我們將 Kafka 的網絡架構抽象成如上圖所示的三層架構, 整個請求流轉的路徑如下:

  1. 客戶端發送請求過來,  在 Kafka 服務端會有個 Acceptor 線程, 這個線程上面綁定了 OP_ACCEPT 事件, 用來監聽發送過來的請求, 下面有個 while 死循環會源源不斷的監聽 Selector 是否有請求發送過來, 接收到請求鏈接後封裝成 socketchannel, 然後將 socketChannel 發送給網絡第一層架構中。

  2. 在第一層架構中有 3 個一模一樣的 Processor 線程, 這個線程的裏面都有一個連接隊列, 裏面存放 socketchannel, 存放規則爲輪詢存放, ** 隨着請求的不斷增加, 連接隊列裏面就會有很多個 socketchannel,   這個時候 socketchannel 就會在每個 selector 上面註冊 OP_READ 事件,  參考上圖第一層的第三個 Processor 線程, 即每個線程裏面還有一個 while 循環會遍歷每個 socketchannel, 監聽到事件後就會接收到客戶端發送過來的請求, 這個時候 Processor 線程會對請求進行解析 (發送過來的請求是二進制的, 上面已經說過, 跨網絡傳輸需要進行序列化) , 並解析封裝成 Request 對象發送到上圖所示的網絡第二層架構中**。

3) 在第二層架構中會有兩個隊列, 一個 RequestQueue(請求隊列), 一個是 ResponseQueue(返回隊列), 在請求隊列中會存放一個個 Request 請求, 起到緩衝的作用, 這個時候就到了網絡第三層架構中。

  1. 在第三層架構中有個 RequestHandler 線程池, 裏面默認有 8 個 RequestHandler 線程, 這 8 個線程啓動後會不斷的從第二層的 RequestQueue 隊列中獲取請求, 解析請求體裏面的數據, 通過內置工具類將數據寫入到磁盤

  2. 寫入成功後還要響應客戶端, 這個時候會封裝一個 Response 對象, 會將返回結果存放到第二層的 ResponseQueue 隊列中,  此時默認有 3 個小的 Response 隊列, 這裏面的個數是同第一層架構中的 Processor 線程一一對應的

  3. 這個時候第一層的 Processor 線程中 while 循環就會遍歷 Response 請求, 遍歷完成後就會在 selector 上註冊 OP_WRITE 事件, 這個時候就會將響應請求發送回客戶端。

7) 在整個過程中涉及到 2 個參數:num.network.threads = 3 和 num.io.threads = 8   如果感覺默認參數性能不夠好的話, 可以對這 2 個參數進行優化,  比如將 num.network.threads = 9,  num.io.threads = 32(和 CPU 個數要一致),   每個 RequestHandler 線程可以處理 2000QPS,    2000 * 8 = 1.6 萬 QPS , 擴容後可以支撐 6.4 萬 QPS, 通過擴容後 Kafka 可以支撐 6 萬 QPS, 可以看出通過上面的架構講解, kafka 是可以支撐高併發的請求的

總結

      至此已經跟大家全面揭祕了 Kafka 三高架構的方方面面, 下一篇會講解 Kafka 生產級部署和容量規劃方面的知識, 大家敬請期待......

如喜歡本文,請點擊右上角,把文章分享到朋友圈
如有想了解學習的技術點,請留言給若飛安排分享

作者:王江華

來源:華仔聊技術

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/k6lmBX1nSLXm3IOmCushqQ