20 圖 15 問帶你輕鬆入門 Kafka!

大家好,我是君哥,Kafka 入門需要哪些基礎知識呢?

Kafka 是什麼, 適應場景有哪些?

Kafka 是一個分佈式的流式處理平臺,用於實時構建流處理應用。主要應用在大數據實時處理領域。Kafka 憑藉「高性能」、「高吞吐」、「高可用」、「低延遲」、「可伸縮」幾大特性,成爲「消息隊列」的首選。

其主要設計目標如下:

1)高性能: 以時間複雜度爲 O(1) 的方式提供消息持久化能力,即使對 TB 級以上數據也能保證常數時間的訪問性能。

2)高吞吐、低延遲: 在非常廉價的機器上也能做到單機支持每秒幾十萬條消息的傳輸,並保持毫秒級延遲。

3)持久性、可靠性: 消息最終被持久化到磁盤,且提供數據備份機制防止數據丟失。

4)容錯性: 支持集羣節點故障容災恢復,即使 Kafka 集羣中的某一臺 Kafka 服務節點宕機,也不會影響整個系統的功能(若副本數量爲 N, 則允許 N-1 臺節點故障)。

5)高併發: 可以支撐數千個客戶端同時進行讀寫操作。

其適應場景主要有:

1)日誌收集方向: 可以用 Kafka 來收集各種服務的 log,然後統一輸出,比如日誌系統 elk,用 Kafka 進行數據中轉。

2)消息系統方向: Kafka 具備系統解耦、副本冗餘、流量削峯、消息緩衝、可伸縮性、容錯性等功能,同時還提供了消息順序性保障以及消息回溯功能等。

3)大數據實時計算方向: Kafka 提供了一套完整的流式處理框架, 被廣泛應用到大數據處理,如與 flink、spark、storm 等整合。

Kafka 核心組件有哪些, 分別有什麼作用呢?

Kafka 核心組件的基礎概念:

1)Producer: 即消息生產者,向 Kafka Broker 發消息的客戶端。

2)Consumer: 即消息消費者,從 Kafka Broker 讀消息的客戶端。

3)Consumer Group: 即消費者組,由多個 Consumer 組成。消費者組內每個消費者負責消費不同分區的數據,以提高消費能力。一個分區只能由組內一個消費者消費,不同消費者組之間互不影響

4)Broker: 一臺 Kafka 服務節點就是一個 Broker。一個集羣是由 1 個或者多個 Broker 組成的,且一個 Broker 可以容納多個 Topic。

5)Topic: 一個邏輯上的概念,Topic 將消息分類,生產者和消費者面向的都是同一個 Topic, 同一個 Topic 下的 Partition 的消息內容是不相同的

6)Partition: 爲了實現 Topic 擴展性,提高併發能力,一個非常大的 Topic 可以分佈到多個 Broker 上,一個 Topic 可以分爲多個 Partition 進行存儲,且每個 Partition 是消息內容是有序的

7)Replica: 即副本,爲實現數據備份的功能,保證集羣中的某個節點發生故障時,該節點上的 Partition 數據不丟失,且 Kafka 仍然能夠繼續工作,爲此 Kafka 提供了副本機制,一個 Topic 的每個 Partition 都有若干個副本,一個 Leader 副本和若干個 Follower 副本

8)Leader: 即每個分區多個副本的主副本,生產者發送數據的對象,以及消費者消費數據的對象,都是 Leader

9)Follower: 即每個分區多個副本的從副本,會實時從 Leader 副本中同步數據,並保持和 Leader 數據的同步。Leader 發生故障時,某個 Follower 還會被選舉併成爲新的 Leader , 且不能跟 Leader 在同一個 Broker 上, 防止崩潰數據可恢復。

10)Offset: 消費者消費的位置信息,監控數據消費到什麼位置,當消費者掛掉再重新恢復的時候,可以從消費位置繼續消費。

在 Kafka 中 Zookeeper 作用是什麼?

Kafka 集羣能夠正常工作,目前還是需要依賴於 ZooKeeper,主要用來「負責 Kafka 集羣元數據管理,集羣協調工作」,在每個 Kafka 服務器啓動的時候去連接並將自己註冊到 Zookeeper,類似註冊中心。

Kafka 使用 Zookeeper 存放「集羣元數據」、「集羣成員管理」、 「Controller 選舉」、「其他管理類任務」等。待 KRaft 提案完成後,Kafka 將完全不依賴 Zookeeper。

1)集羣元數據:Topic 對應 Partition 的所有數據都存放在 Zookeeper 中,且以 Zookeeper 保存的數據爲準。

2)集羣成員管理:Broker 節點的註冊、刪除以及屬性變更操作等。主要包括兩個方面:成員數量的管理,主要體現在新增成員和移除現有成員;單個成員的管理,如變更單個 Broker 的數據等。

3)Controller 選舉:即選舉 Broker 集羣的控制器 Controller。其實它除了具有一般 Broker 的功能之外,還具有選舉主題分區 Leader 節點的功能。在啓動 Kafka 系統時,其中一個 Broker 會被選舉爲控制器,負責管理主題分區和副本狀態,還會執行分區重新分配的管理任務。如果在 Kafka 系統運行過程中,當前的控制器出現故障導致不可用,那麼 Kafka 系統會從其他正常運行的 Broker 中重新選舉出新的控制器。

4)其他管理類任務:包括但不限於 Topic 的管理、參數配置等等。

Kafka 3.X 「2.8 版本開始」爲什麼移除 Zookeeper 的依賴的原因有以下 2 點:

1)集羣運維層面:Kafka 本身就是一個分佈式系統,如果還需要重度依賴 Zookeeper,集羣運維成本和系統複雜度都很高。

2)集羣性能層面:Zookeeper 架構設計並不適合這種高頻的讀寫更新操作, 由於之前的提交位移的操作都是保存在 Zookeeper 裏面的,這樣的話會嚴重影響 Zookeeper 集羣的性能。

生產者有哪些發消息的模式?

Kafka 生產者發送消息主要有三種模式:

01 發後即忘發送模式

發後即忘模式「fire-and-forget」,它只管發送消息,並不需要關心消息是否發送成功。其本質上也是一種異步發送的方式,消息先存儲在緩衝區中,達到設定條件後再批量進行發送。這是 kafka 吞吐量最高的方式**,**但同時也是消息最不可靠的方式**,因爲對於發送失敗的消息並沒有做任何處理,某些異常情況下會導致消息丟失。

ProducerRecord<k,v> record = new ProducerRecord<k,v>("this-topic", key, value);
try {
  //fire-and-forget 模式   
  producer.send(record);
} catch (Exception e) {
  e.printStackTrace();
}

02 同步發送模式

同步發送模式 「sync」,調用 send() 方法會返回一個 Future 對象,再通過調用 Future 對象的 get() 方法,等待結果返回,根據返回的結果可以判斷消息是否發送成功, 由於是同步發送會阻塞,只有當消息通過 get() 返回數據時,纔會繼續下一條消息的發送

ProducerRecord<k,v> record = new ProducerRecord<k,v>("this-topic", key, value);
try {
  //sync 模式 調用future.get()
  future = producer.send(record);
  RecordMetadata metadata = future.get();
} catch (Exception e) {
  e.printStackTrace();
}
producer.flush();
producer.close();

03 異步發送模式

異步發送模式「async」,在調用 send() 方法的時候指定一個 callback 函數,當 Broker 接收到返回的時候,該 callback 函數會被觸發執行,通過回調函數能夠對異常情況進行處理,當調用了回調函數時,只有回調函數執行完畢生產者纔會結束,否則一直會阻塞

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        //intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
}

以上三種方式各有各的特點,具體還要看業務的應用場景適合哪一種:

1)場景 1:如果業務只是關心消息的吞吐量,且允許少量消息發送失敗,也不關注消息的發送順序的話,那麼可以使用發後即忘發送「fire-and-forget」的方式,配合參數 acks = 0,這樣生產者並不需要等待服務器的響應,以網絡能支持的最大速度發送消息。

2)場景 2:如果業務要求消息必須是按順序發送的話,且數據只能落在一個 Partition 上,那麼可以使用同步發送「sync」的方式,並結合參數來設置 retries 的值讓消息發送失敗時可以進行多次重試「retries > 0」,再結合參數設置「acks=all & max_in_flight_requests_per_connection=1」,可以控制生產者在收到服務器成功響應之前只能發送 1 條消息,在消息發送成功後立即進行 flush, 從而達到控制消息順序發送。

3)場景 3:如果業務需要知道消息是否發送成功,但對消息的順序並不關心的話,那麼可以用「異步 async + 回調 callback 函數」的方式來發送消息,並配合參數 retries=0,待發送失敗時將失敗的消息記錄到日誌文件中進行後續處理。

Kafka 爲什麼要設計分區?

其實這個問題說來很簡單, 假如不進行分區的話就如同 MySQL 單表存儲一樣,發消息就會被集中存儲,這樣會導致某臺 Kafka 服務器存儲 Topic 消息過多,如果在寫消息壓力很大的情況下,最終會導致這臺 Kafka 服務器吞吐量出現瓶頸,  因此 Kafka 設計了分區的概念,同時也帶來了「負載均衡」、「橫向擴展」的能力,如下圖所示:。

1)負載均衡: 發送消息時可以根據分區的數量進行數據均勻分佈,使其落在不同的分區上, 這樣可以提高併發寫性能;同時消費的時候多個訂閱者可以從一個或者多個分區中同時消費數據,以支撐海量數據處理能力,提高讀消息性能。

2)橫向擴展: 可以將一個 Topic 分成了多個 Partition,將不同的 Partition 儘可能的部署在不同的物理節點上,這樣擴展起來非常方便,另外一個消費者可以消費多個分區中的數據,但是這樣還是不能夠充分的發揮橫向擴展,這時候消費者組就出現了,我們用消費者組,來消費整個的 Topic,一個消費者消費 Topic 中的一個分區。

生產者發送消息時如何選擇分區的?

生產者發送消息的時候選擇分區的策略方式主要有以下 4 種:

1)輪詢策略: 順序分配消息,即按照消息順序依次發送到某 Topic 下不同的分區,它總是能保證消息最大限度地被平均分配到所有分區上,如果消息在創建的時候 key 爲 null, 那麼 Kafka 默認會採用這種策略

2)消息 key 指定分區策略: Kafka 允許爲每條消息定義 key,即消息在創建的時候 key 不爲空,此時 Kafka 會根據消息的 key 進行 hash, 然後根據 hash 值對 Partition 進行取模映射到指定的分區上, 這樣的好處就是相同 key 的消息會發送到同一個分區上, 這樣 Kafka 雖然不能保證全局有序,但是可以保證每個分區的消息是有序的,這就是消息分區有序性,適應場景有下單支付的時候希望消息有序,可以通過訂單 id 作爲 key 發送消息達到分區有序性。

3)隨機策略: 隨機發送到某個分區上,看似也是將消息均勻打散分配到各個分區,但是性能還是無法跟輪詢策略比,「如果追求數據的均勻分佈,最好還是使用輪詢策略」。

4)自定義策略: 可以通過實現 org.apache.kafka.clients.producer.Partitioner 接口,重寫 partition 方法來達到自定義分區效果。

Kafka 如何合理設置分區數, 越多越好嗎?

一、Kafka 如何合理設置分區數

首先我們要瞭解在 Partition 級別上達到負載均衡是實現高吞吐量的關鍵,合適的 Partition 數量可以達到並行讀寫和負載均衡的目的,需要根據每個分區的生產者和消費者的目標吞吐量進行估計

此時我們可以遵循一定的步驟來計算確定分區數:

1)首先根據某個 Topic 當前接收的數據量等經驗來確定分區的初始值。

2)然後針對這個 Topic,進行測試 Producer 端吞吐量和 Consumer 端的吞吐量。

3)測試的結果, 假設此時他們的值分別是 Tp「Producer 端吞吐量」、Tc「負 Consumer 端吞吐量」,總的目標吞吐量是 Tt, 單位是 MB/s, 那麼結果 numPartition = Tt / max (Tp, Tc)。

4)特殊說明: 測試 Tp 通常是很容易的,因爲它的邏輯非常簡單,就是直接發送消息到 Kafka 就好了。而測試 Tc 通常與應用消費消息後進行其他什麼處理有關,相對複雜一些。

二、分區設置越多越好嗎?

首先 Kafka 高吞吐量的原因之一就是通過 Partition 將 Topic 中的消息均衡保存到 Kafka 集羣中不同的 Broker 中

理論上說,如果一個 Topic 分區越多,整個集羣所能達到的吞吐量就越大」。但是,實際生產中 Kafka Topic 的分區數真的配置越多越好嗎?很顯然不是!分區數過多會有什麼弊端和問題呢,我們可以從下面 4 個方向進行深度分析:

01 使用內存方面分析

1)Broker 端: 有很多組件都在內存中維護了分區級別的緩存,比如 Controller,FetcherManager 等,因此分區數越多,這類緩存的成本就越大。

2)Producer 端: 比如參數 batch.size,默認是 16KB。它會爲每個分區緩存消息,在數據積累到一定大小或者足夠的時間時,累積的消息將會從緩存中移除併發往 Broker 節點。這個功能是爲了提高性能而設計,但是隨着分區數增多,這部分緩存所需的內存佔用也會更多。

3)Consumer 端: 消費者數跟分區數是直接掛鉤的,在消費消息時的內存佔用、以及爲達到更高的吞吐性能需要開啓的 Consumer 數也會隨着分區數增加而增加。

02 消耗文件句柄方面分析

在 Kafka 的 Broker 中,每個 Partition 都會對應磁盤文件系統中一個目錄。在 Kafka 的日誌文件目錄中,每個日誌數據段都會分配三個文件,兩個索引文件和一個數據文件。每個 Broker 會爲每個日誌段文件打開兩個 index 文件句柄和一個 log 數據文件句柄。因此,隨着 Partition 的增多,所需要保持打開狀態的文件句柄數也就越多,最終可能超過底層操作系統配置的文件句柄數量限制。

03 端到端的延遲方面分析

首先我們得先了解 Kafka 端對端延遲是什麼? Producer 端發佈消息到 Consumer 端接收消息所需要的時間,即 Consumer 端接收消息的時間減去 Producer 端發佈消息的時間

在 Kafka 中只對「已提交的消息做最大限度的持久化保證不丟失」,因此 Kafka 只有在消息提交之後,纔會將消息暴露給消費者。此時如果分區越多那麼副本之間需要同步的數據就會越多,假如消息需要在所有 ISR 副本集合列表同步複製完成之後才能進行暴露。因此 ISR 副本集合間複製數據所花時間將是 kafka 端對端延遲的最主要部分

此時我們可以通過加大 kafka 集羣來進行緩解。比如,我們將 100 個分區 Leader 放到一個 Broker 節點和放到 10 個 Broker 節點,它們之間的延遲是不一樣的。如在 10 個 Broker 節點的集羣中,每個 Broker 節點平均只需要處理 10 個分區的數據複製。此時端對端的延遲將會變成原來的十分之一。

因此根據實戰經驗,如果你特別關心消息延遲問題的話,此時限制每個 Broker 節點的 Partition 數量是一個非常不錯的主意: 對於 N 個 Broker 節點和複製副本因子「replication-factor」爲 F 的 Kafka 集羣,那麼整個 Kafka 集羣的 Partition 數量最好不超過 「100 * N * F」 個,即單個  Broker 節點 Partition 的 Leader 數量不超過 100。

04 高可用性方面分析

我們知道 Kafka 是通過多副本複製技術來實現集羣的高可用和穩定性的。每個 Partition 都會有多個數據副本,每個副本分別存在於不同的 Broker 上。所有的數據副本中,其中一個數據副本爲 Leader,其他的數據副本爲 Follower。

在 Kafka 集羣內部,所有的數據副本採用自動化的方式管理且會確保所有副本之間的數據是保持同步狀態的。 當 Broker 發生故障時,對於 Leader 副本所在 Broker 的所有 Partition 將會變得暫不可用。Kafka 將自動在其它副本中選擇出一個 Leader,用於接收客戶端的請求。這個過程由 Kafka Controller 節點 Broker 自動選舉完成。

正常情況下,當一個 Broker 在有計劃地停止服務時候,那麼 Controller 會在服務停止之前,將該 Broker 上 的所有 Leader 副本一個個地移走。對於單個 Leader 副本的移動速度非常快,從客戶層面看,有計劃的服務停服只會導致系統很短時間窗口不可用。

但是,當 Broker 不是正常停止服務時「kill -9 強殺方式」,系統的不可用時間窗口將會與受影響的 Partition 數量有關。如果此時發生宕機的 Broker 是 Controller 節點時, 這時 Controller 節點故障恢復會自動的進行,但是新的 Controller 節點需要從 Zookeeper 中讀取每一個 Partition 的元數據信息用於初始化數據。假設一個 Kafka 集羣存在 10000 個 Partition,從 Zookeeper 中恢復元數據時每個 Partition 大約花費 2ms,則 Controller 恢復將會增加約 20 秒的不可用時間窗口。

總之,通常情況下 Kafka 集羣中越多的 Partition 會帶來越高的吞吐量。但是,如果 Kafka 集羣中 Partition 總量過大或者單個 Broker 節點 Partition 過多,都可能會對系統的可用性和消息延遲帶來潛在的負面影響,需要引起我們的重視。

如何保證 Kafka 中的消息是有序的?

我們知道在 Kafka 中,並不保證消息全局有序,但是可以保證分區有序性 ,分區與分區之間是無序的。那麼如何保證 Kafka 中的消息是有序的呢? 可以從以下三個方面來入手分析:

01 **生產端 Producer **

在第 4 道題「生產者有哪些發送模式」的最後的場景分析裏面簡單的說明了下, 這裏再詳細的進行分析下:

首先 Kafka 的 Producer 端發送消息,如果是不對默認參數進行任何設置且網絡沒有抖動的情況下,消息是可以一批批的按消息發送的順序被髮送到 Kafka Broker 端。但是,一旦有網絡波動了,則消息就可能出現亂序。

所以,要嚴格保證 Kafka 發消息有序,首先要考慮用同步的方式來發送消息, 兩種同步發送的方式如下:

1)設置 **消息響應參數 acks = all &   max.in.flight.requests.per.connection = 1:**發送端將會在一條消息發出後,響應必須滿足 acks 設置的參數後,纔會發送下一條消息。雖然在使用時還是異步發送的方式,其實底層已經是一條接一條的發送了。

2)Sync 發送方式: 當調用 KafkaProducer 的 send() 後,返回的 Future 對象的 get 方式阻塞等待結果。根據返回的結果可以判斷是否發送成功, 由於是同步發送會阻塞, 只有當消息通過 get() 返回數據時,纔會繼續下一條消息的發送

通過上面方式還可能出現消息重發和冪等問題:

1)重發問題: Kafka 在消息發送出現問題時,通過判斷是否可以自動重試恢復,如果是可以自動恢復的問題,設置 retries > 0,讓 Kafka 自動重試。

2)冪等問題:Kafka 1.0 之後的版本,Producer 端引入了冪等特性。設置 enable.idempotence = true,  冪等特性可以給消息添加序列號,即每次發送會把序列號遞增 1。開啓了 Kafka Producer 端的冪等特性後,我們就可以通過設置參數 max.in.flight.requests.per.connection = 5 「默認值」, 這樣當 Kafka 發消息的時候,由於消息有了序列號當發送消息出現錯誤的時候,Kafka 底層會通過獲取服務器端的最近幾條日誌的序列號和發送端需要重新發送的消息序列號做對比,如果是連續的,那麼就可以繼續發送消息,保證消息順序。

02 服務端 Broker 

在 Kafka 中,Topic 只是一個邏輯上的概念,而組成 Topic 的分區 Partition 纔是真正存消息的地方

Kafka 只保證單分區內的消息是有序的,所以如果要保證業務全局嚴格有序,就要設置 Topic 爲單分區的方式。不過對業務來說一般不需要考慮全局有序的,只需要保證業務中不同類別的消息有序即可。

但是這裏有個必須要受到重視的問題,就是當我們對分區 Partition 進行數量改變的時候,由於是簡單的 Hash 算法會把以前可能分到相同分區的消息分到別的分區上。這樣就不能保證消息順序了。面對這種情況,就需要在動態變更分區的時候,考慮對業務的影響。有可能需要根據業務和當前分區需求,重新劃分消息類別

03 消費端 Consumer

在 Consumer 端,根據 Kafka 的模型,一個 Topic 下的每個分區只能從屬於這個 Topic 的消費者組中的某一個消費者

當消息被髮送分配到同一個 Partition 中,消費者從 Partition 中取出來數據的時候,也一定是有順序的,沒有錯亂。

但是消費者可能會有多個線程來併發來消費消息。如果單線程消費數據,吞吐量太低了,而多個線程併發消費的話,順序可能就亂掉了。

此時可以通過寫多個內存隊列,將相同 key 的消息都寫入同一個隊列,然後對於多個線程,每個線程分別消息一個隊列即可保證消息順序

Kafka 爲什麼不支持讀寫分離呢?

在很多主從模型系統中,是允許從節點可以對外提供讀服務的,只不過 Kafka 當初爲了避免數據不一致的問題,而採用了通過主節點來統一提供服務的方式。

不支持讀寫分離的原因有 2 點:

1)場景不一致: 讀寫分離架構適用於那種讀操作負載很大,但寫操作相對不頻繁的場景,但是 Kafka 顯然不適合這種場景。

2)延遲問題: Kafka 通過 PULL 方式來實現數據同步,因此 Leader 副本和 Follower 副本存在數據不一致的情況, 如果允許 Follower 副本提供讀服務的話,就會帶來消息滯後的問題。

Kafka 副本有哪兩種,作用是什麼?

在 Kafka 中,爲實現「數據備份」的功能,保證集羣中的某個節點發生故障時,該節點上的 Partition 數據不丟失,且 Kafka 仍然能夠繼續工作,爲此 Kafka 提供了副本機制一個 Topic 的每個 Partition 都有若干個副本,一個 Leader 副本和若干個 Follower 副本

1)Leader 主副本負責對外提供讀寫數據服務。

2)Follower 從副本只負責和 Leader 副本保持數據同步,並不對外提供任何服務。

Kafka 能否手動刪除消息?

首先 Kafka 是支持手動刪除消息的, 當然它本身提供了消息留存策略,能夠自動刪除過期的消息。

Kafka 將消息存儲到磁盤中,隨着寫入數據不斷增加,磁盤佔用空間越來越大,爲了控制佔用空間就需要對消息做一定的清理操作。Kafka 存儲日誌結構分析中每一個分區副本(Replica)都對應一個 Log,而 Log 又可以分爲多個日誌分段(LogSegment),這樣就便於 Kafka 對日誌的清理操作。

1)普通消息: 我們可以使用 Kafka-delete-records 命令或者通過程序調用 Admin.deleteRecords 方法來刪除消息。兩者底層都是調用 Admin 的 deleteRecords 的方法,通過將分區的 LEO 值抬高來間接刪除消息。

2)設置 key 且參數 cleanup.policy=delete/campact 的消息: 可以依靠 Log Cleaner 組件提供的功能刪除該 Key 的消息。

日誌刪除(Log Retention): 按照一定的保留策略直接刪除不符合條件的日誌分段(LogSegment)。

日誌壓縮(Log Compaction): 針對每個消息的 key 進行整合,對於有相同 key 的不同 value 值,只保留最後一個版本。

01 日誌刪除

Kafka 的日誌管理器(LogManager)中有一個專門的日誌清理任務通過週期性檢測和刪除不符合條件的日誌分段文件(LogSegment),這裏我們可以通過設置 Kafka Broker 端的參數「log.retention.check.interval.ms」,默認值爲 300000,即 5 分鐘。

在 Kafka 中一共有 3 種保留策略:

基於時間策略

日誌刪除任務會週期檢查當前日誌文件中是否有保留時間超過設定的閾值 **(retentionMs)**來尋找可刪除的日誌段文件集合 (deletableSegments)

其中 retentionMs 可以通過 Kafka Broker 端的這幾個參數的大小判斷的

log.retention.ms > log.retention.minutes > log.retention.hours 優先級來設置,默認情況只會配置 log.retention.hours 參數,值爲 168 即爲 7 天。

這裏需要注意:刪除過期的日誌段文件,並不是簡單的根據該日誌段文件的修改時間計算的,而是要根據該日誌段中最大的時間戳 largestTimeStamp 來計算的,首先要查詢該日誌分段所對應的時間戳索引文件,查找該時間戳索引文件的最後一條索引數據,如果時間戳值大於 0,則取值,否則纔會使用最近修改時間(lastModifiedTime)。

【刪除步驟】:

  1.  首先從 Log 對象所維護的日誌段的跳躍表中移除要刪除的日誌段,用來確保已經沒有線程來讀取這些日誌段。

  2.  將日誌段所對應的所有文件,包括索引文件都添加上 “.deleted” 的後綴。

  3.  最後交給一個以 “delete-file” 命名的延遲任務來刪除這些以 “ .deleted ” 爲後綴的文件。默認 1 分鐘執行一次, 可以通過 file.delete.delay.ms 來配置。

基於日誌大小策略

日誌刪除任務會週期檢查當前日誌大小是否超過設定的閾值 (retentionSize) 來尋找可刪除的日誌段文件集合 (deletableSegments)

其中 retentionSize 這裏我們可以通過 Kafka Broker 端的參數 log.retention.bytes 來設置, 默認值爲 - 1,即無窮大。

這裏需要注意的是 log.retention.bytes 設置的是 Log 中所有日誌文件的大小,而不是單個日誌段的大小。單個日誌段可以通過參數 log.segment.bytes 來設置,默認大小爲 1G。

【刪除步驟】:

  1.  首先計算日誌文件的總大小 Size 和 retentionSize 的差值,即需要刪除的日誌總大小。

2.  然後從日誌文件中的第一個日誌段開始進行查找可刪除的日誌段的文件集合 (deletableSegments)

  1.  找到後就可以進行刪除操作了。

基於日誌起始偏移量

該策略判斷依據是日誌段的下一個日誌段的起始偏移量 baseOffset 是否小於等於 logStartOffset,如果是,則可以刪除此日誌分段。

【如下圖所示 刪除步驟】:

  1.   首先從頭開始遍歷每個日誌段,日誌段 1 的下一個日誌分段的起始偏移量爲 20,小於 logStartOffset 的大小,將日誌段 1 加入 deletableSegments。

  2.  日誌段 2 的下一個日誌偏移量的起始偏移量爲 35,也小於 logStartOffset 的大小,將日誌分段 2 頁加入 deletableSegments。

  3.  日誌段 3 的下一個日誌偏移量的起始偏移量爲 50,也小於 logStartOffset 的大小,將日誌分段 3 頁加入 deletableSegments。

  4.  日誌段 4 的下一個日誌偏移量通過對比後,在 logStartOffset 的右側,那麼從日誌段 4 開始的所有日誌段都不會加入 deletableSegments。

  5.  待收集完所有的可刪除的日誌集合後就可以直接刪除了。

02 日誌壓縮

日誌壓縮 Log Compaction 對於有相同 key 的不同 value 值,只保留最後一個版本。 如果應用只關心 key 對應的最新 value 值,則可以開啓 Kafka 相應的日誌清理功能,Kafka 會定期將相同 key 的消息進行合併,只保留最新的 value 值。

Log Compaction 可以類比 Redis 中的 RDB 的持久化模式。我們可以想象下,如果每次消息變更都存 Kafka,在某一時刻, Kafka 異常崩潰後,如果想快速恢復,可以直接使用日誌壓縮策略, 這樣在恢復的時候只需要恢復最新的數據即可,這樣可以加快恢復速度。

Kafka 讀寫數據這麼快是如何做到的?

01 順序追加寫

kafka 在寫數據的時是以「磁盤順序寫」的方式來進行落盤的, 即將數據追加到文件的末尾。對於普通機械磁盤, 如果是隨機寫的話, 涉及到磁盤尋址的問題, 導致性能極低,  但是如果只是按照順序的方式追加文件末尾的話, 這種磁盤順序寫的性能基本可以跟寫內存的性能差不多的。下圖所示普通機械磁盤的順序 I/O 性能指標是 53.2M values/s。

02 Page Cache

首先 Kafka 爲了保證磁盤寫入性能,通過 mmap 內存映射的方式利用操作系統的 Page Cache 異步寫入 。 也可以稱爲 os cache,意思就是操作系統自己管理的緩存。那麼在寫磁盤文件的時候,就可以先直接寫入 os cache 中,接下來由操作系統自己決定什麼時候把 os cache 裏的數據真正刷入到磁盤中, 這樣大大提高寫入效率和性能。 如下圖所示:

03 零拷貝技術

kafka 爲了解決內核態和用戶態數據不必要 Copy 這個問題, 在讀取數據的時候就引入了「零拷貝技術」。即讓操作系統的 os cache 中的數據直接發送到網卡後傳出給下游的消費者,中間跳過了兩次拷貝數據的步驟,從而減少拷貝的 CPU 開銷, 減少用戶態內核態的上下文切換次數,  從而優化數據傳輸的性能, 而 Socket 緩存中僅僅會拷貝一個描述符過去,不會拷貝數據到 Socket 緩存,如下圖所示:

在 Kafka 中主要有以下兩個地方使用到了「零拷貝技術:

1)基於 mmap 機制實現的索引文件: 首先索引文件都是基於 MappedByBuffer 實現,即讓用戶態和內核態來共享內核態的數據緩衝區,此時數據不需要 Copy 到用戶態空間。雖然 mmap 避免了不必要的 Copy,但是在不同操作系統下, 其創建和銷燬成功是不一樣的,不一定都能保證高性能。所以在 Kafka 中只有索引文件使用了 mmap。

2)基於 sendfile 機制實現的日誌文件讀寫: 在 Kafka 傳輸層接口中有個 TransportLayer 接口,它的實現類中有使用了 Java FileChannel 中 transferTo 方法。該方法底層就是使用 sendfile 實現的零拷貝機制, 目前只是在 I/O 通道是普通的 PLAINTEXT 的時候纔會使用到零拷貝機制。

04 消息批量發送

Kafka 在發送消息的時候並不是一條條的發送的,而是會把多條消息合併成一個批次 Batch進行處理發送,消費消息也是同樣,一次拉取一批次的消息進行消費。如下圖所示:

05 數據壓縮

在 Kafka 中三個端都使用了優化後的壓縮算法,壓縮有助於提高吞吐量, 降低延遲並提高磁盤利用率。Kafka 底層支持多種壓縮算法: ** lz4,  snappy,  gzip**,  從 Kafka 2.1.0 開始新增了 **ZStandard** 算法, 該算法是 Facebook 開源的壓縮算法,  能提供超高的壓縮比。

在 Kafka 中, 壓縮可能會發生在兩個地方: 生產者端和 Broker 端一句話總結下壓縮和解壓縮, 即 Producer 端壓縮, Broker 端保持, Consumer 端解壓縮,這樣可以節省大量的網絡和磁盤開銷

Kafka 消費模型有哪些?

一般情況下消息消費總共有兩種模式:「推模型」和 「拉模型」。在 Kafka 中的消費模型是屬於「拉模型」,此模式的消息消費方式實現有兩種:「點對點方式」和 「發佈訂閱方式」。

01 點對點方式

**點對點方式:  ** 假如所有消費者都同屬於一個消費組的話,此時所有的消息都會被分配給每一個消費者,但是消息只會被其中一個消費者進行消費

02 發佈訂閱方式

**發佈訂閱: ** 假如所有消費者屬於不同的消費組,此時所有的消息都會被分配給每一個消費者,每個消費者都會收到該消息

什麼是消費者組, 有什麼作用?

首先我來看看什麼是「消費者組」:

消費者組 Consumer Group,顧名思義就是由多個 Consumer 組成,且擁有一個公共且唯一的 Group ID。組內每個消費者負責消費不同分區的數據,**但一個分區只能由一個組內消費者消費,**消費者組之間互不影響。

爲什麼 Kafka 要設計 Consumer Group,  只有 Consumer 不可以嗎? 

我們知道 Kafka 是一款高吞吐量,低延遲,高併發,  高可擴展的消息隊列產品, 那麼如果某個 Topic 擁有數百萬到數千萬的數據量, 僅僅依靠 Consumer 進程消費, 消費速度可想而知, 所以需要一個擴展性較好的機制來保障消費進度, 這個時候 Consumer Group 應運而生, Consumer Group 是 Kafka 提供的可擴展且具有容錯性的消費者機制

Kafka Consumer Group 特點如下:

1)每個 Consumer Group 有一個或者多個 Consumer。

2)每個 Consumer Group 擁有一個公共且唯一的 Group ID。

3)  Consumer Group 在消費 Topic 的時候,Topic 的每個 Partition 只能分配給組內的某個 Consumer,只要被任何 Consumer 消費一次, 那麼這條數據就可以認爲被當前 Consumer Group 消費成功。

Kafka 中 Offset 的作用是什麼, 如何進行維護?

在 Kafka 中每個 Topic 分區下面的每條消息都被賦予了一個唯一的 ID 值,用來標識它在分區中的位置。這個 ID 值就被稱爲位移「Offset」或者叫偏移量,一旦消息被寫入到日誌分區中,它的位移值將不能被修改。

01 位移 Offset 管理方式

Kafka 舊版本(0.9 版本之前)是把位移保存在 ZooKeeper 中,減少 Broker 端狀態存儲開銷。

鑑於 Zookeeper 不適合頻繁寫更新,而 Consumer Group 的位移提交又是高頻寫操作,這樣會拖慢 ZooKeeper 集羣的性能, 於是在新版 Kafka 中, 社區採用了將位移保存在 Kafka 內部「Kafka Topic 天然支持高頻寫且持久化」,這就是所謂大名鼎鼎的__consumer_offsets。

__consumer_offsets: 用來保存 Kafka Consumer 提交的位移信息,另外它是由 Kafka 自動創建的,和普通的 Topic 相同,它的消息格式也是 Kafka 自己定義的,我們無法進行修改。

__consumer_offsets 有 3 種消息格式:

1)用來保存 Consumer Group 信息的消息。

2)用來刪除 Group 過期位移甚至是刪除 Group 的消息,也可以稱爲 tombstone 消息,即墓碑消息,它的主要特點是空消息體,一旦某個 Consumer Group 下的所有 Consumer 位移數據都已被刪除時,Kafka 會向 __consumer_offsets 主題的對應分區寫入 tombstone 消息,表明要徹底刪除這個 Group 的信息。

3)  用來保存位移值。

__consumer_offsets 消息格式分析揭祕:

  1. 消息格式我們可以簡單理解爲是一個 KV 對。Key 和 Value 分別表示消息的鍵值和消息體。

  2. 那麼 Key 存什麼呢?既然是存儲 Consumer 的位移信息,在 Kafka 中,Consumer 數量會很多,必須有字段來標識位移數據是屬於哪個 Consumer 的,怎麼來標識 Consumer 字段呢?我們知道 Consumer Group 會共享一個公共且唯一的 Group ID,那麼只保存它就可以了嗎?我們知道 Consumer 提交位移是在分區的維度進行的,很顯然,key 中還應該保存 Consumer 要提交位移的分區。

  3. 總結:位移主題的 Key 中應該保存 3 部分內容:<Group ID,主題名,分區號 >

  4. value 可以簡單認爲存儲的是 offset 值,當然底層還存儲其他一些元數據,幫助 Kafka 來完成一些其他操作,比如刪除過期位移數據等。

__consumer_offsets 消息格式示意圖:

02 __consumer_offsets 創建

__consumer_offsets 是怎麼被創建出來的呢? 當 Kafka 集羣中的第一個 Consumer 啓動時,Kafka 會自動創建__consumer_offsets

它就是普通的 Topic,也有對應的分區數,如果由 Kafka 自動創建的,那麼分區數又是怎麼設置的呢?

這個依賴 Broker 端參數主題分區位移個數即「offsets.topic.num.partitions」 默認值是 50,因此 Kafka 會自動創建一個有 50 個分區的 __consumer_offsets 。既然有分區數,必然就會有分區對應的副本個數,這個是依賴 Broker 端另外一個參數來完成的,即 「offsets.topic.replication.factor」默認值爲 3。

總結一下, __consumer_offsets 由 Kafka 自動創建的,那麼該 Topic 的分區數是 50,副本數是 3,而具體 Consumer Group 的消費情況要存儲到哪個 Partition ,根據 abs(GroupId.hashCode()) % NumPartitions來計算的,這樣就可以保證 Consumer Offset 信息與 Consumer Group 對應的 Coordinator 處於同一個 Broker 節點上。如下圖所示:

華仔聊技術 聊聊後端技術架構以及中間件源碼

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Cl6K8Yh9rZp3cNCyqg27ew