Kafka 原理以及分區分配策略剖析

一、簡介

 Apache Kafka 是一個分佈式的流處理平臺(分佈式的基於發佈 / 訂閱模式的消息隊列【Message Queue】)。

流處理平臺有以下 3 個特性:

1.1 消息隊列的兩種模式

1.1.1 點對點模式

生產者將消息發送到 queue 中,然後消費者從 queue 中取出並且消費消息。消息被消費以後,queue 中不再存儲,所以消費者不可能消費到已經被消費的消息。Queue 支持存在多個消費者,但是對一個消息而言,只能被一個消費者消費。

圖片

1.1.2 發佈 / 訂閱模式

生產者將消息發佈到 topic 中,同時可以有多個消費者訂閱該消息。和點對點方式不同,發佈到 topic 的消息會被所有訂閱者消費。

圖片

1.2 Kafka 適合什麼樣的場景

它可以用於兩大類別的應用:

爲了理解 Kafka 是如何做到以上所說的功能,從下面開始,我們將深入探索 Kafka 的特性。

首先是一些概念:

1.3 主題和分區

 Kafka 的消息通過主題(Topic)進行分類,就好比是數據庫的表,或者是文件系統裏的文件夾。主題可以被分爲若干個分區(Partition),一個分區就是一個提交日誌。消息以追加的方式寫入分區,然後以先進先出的順序讀取。**注意,由於一個主題一般包含幾個分區,因此無法在整個主題範圍內保證消息的順序,但可以保證消息在單個分區內的順序。**主題是邏輯上的概念,在物理上,一個主題是橫跨多個服務器的。

圖片

**Kafka 集羣保留所有發佈的記錄(無論他們是否已被消費),並通過一個可配置的參數——保留期限來控制(可以同時配置時間和消息大小,以較小的那個爲準)。**舉個例子, 如果保留策略設置爲 2 天,一條記錄發佈後兩天內,可以隨時被消費,兩天過後這條記錄會被拋棄並釋放磁盤空間。

有時候我們需要增加分區的數量,比如爲了擴展主題的容量、降低單個分區的吞吐量或者要在單個消費者組內運行更多的消費者(因爲一個分區只能由消費者組裏的一個消費者讀取)。從消費者的角度來看,基於鍵的主題添加分區是很困難的,因爲分區數量改變,鍵到分區的映射也會變化,所以對於基於鍵的主題來說,建議在一開始就設置好分區,避免以後對其進行調整。

(注意:不能減少分區的數量,因爲如果刪除了分區,分區裏面的數據也一併刪除了,導致數據不一致。如果一定要減少分區的數量,只能刪除 topic 重建)

1.4 生產者和消費者

**生產者(發佈者)**創建消息,一般情況下,一個消息會被髮布到一個特定的主題上。生產者在默認情況下把消息均衡的分佈到主題的所有分區上,而並不關心特定消息會被寫入哪個分區。不過,生產者也可以把消息直接寫到指定的分區。這通常通過消息鍵和分區器來實現,分區器爲鍵生成一個散列值,並將其映射到指定的分區上。生產者也可以自定義分區器,根據不同的業務規則將消息映射到分區。

**消費者(訂閱者)**讀取消息,消費者可以訂閱一個或者多個主題,並按照消息生成的順序讀取它們。消費者通過檢查消息的偏移量來區分已經讀取過的消息。偏移量是一種元數據,它是一個不斷遞增的整數值,在創建消息時,kafka 會把它添加到消息裏。在給定的分區裏,每個消息的偏移量都是唯一的。消費者把每個分區最後讀取的消息偏移量保存在 zookeeper 或者 kafka 上,如果消費者關閉或者重啓,它的讀取狀態不會丟失。

消費者是消費者組的一部分,也就是說,會有一個或者多個消費共同讀取一個主題。消費者組保證每個分區只能被同一個組內的一個消費者使用。如果一個消費者失效,羣組裏的其他消費者可以接管失效消費者的工作。

圖片

1.5 broker 和集羣

broker:一個獨立的 kafka 服務器被稱爲 broker。broker 接收來自生產者的消息,爲消息設置偏移量,並提交消息到磁盤保存。broker 爲消費者提供服務,對讀取分區的請求作出相應,返回已經提交到磁盤上的消息。

集羣:交給同一個 zookeeper 集羣來管理的 broker 節點就組成了 kafka 的集羣。

broker 是集羣的組成部分,每個集羣都有一個 broker 同時充當集羣控制器的角色。控制器負責管理工作,包括將分區分配給 broker 和監控 broker。在 broker 中,一個分區從屬於一個 broker,該 broker 被稱爲分區的首領。一個分區可以分配給多個 broker(Topic 設置了多個副本的時候),這時會發生分區複製。如下圖:

圖片

**broker 如何處理請求:**broker 會在它所監聽的每個端口上運行一個 Acceptor 線程,這個線程會創建一個連接並把它交給 Processor 線程去處理。Processor 線程(也叫網絡線程)的數量是可配的,Processor 線程負責從客戶端獲取請求信息,把它們放進請求隊列,然後從響應隊列獲取響應信息,併發送給客戶端。如下圖所示:

圖片

**生產請求和獲取請求都必須發送給分區的首領副本(分區 Leader)。**如果 broker 收到一個針對特定分區的請求,而該分區的首領在另外一個 broker 上,那麼發送請求的客戶端會收到一個 “非分區首領” 的錯誤響應。Kafka 客戶端要自己負責把生產請求和獲取請求發送到正確的 broker 上。

客戶端如何知道該往哪裏發送請求呢?客戶端使用了另外一種請求類型——元數據請求。這種請求包含了客戶端感興趣的主題列表,服務器的響應消息裏指明瞭這些主題所包含的分區、每個分區都有哪些副本,以及哪個副本是首領。元數據請求可以發給任意一個 broker,因爲所有的 broker 都緩存了這些信息。客戶端緩存這些元數據,並且會定時從 broker 請求刷新這些信息。此外如果客戶端收到 “非首領” 錯誤,它會在嘗試重新發送請求之前,先刷新元數據。

圖片

1.6 Kafka 基礎架構

圖片

 二、Kafka 架構深入

2.1 Kafka 工作流程及文件存儲機制

2.1.1 工作流程

圖片

Kafka 中消息是以 topic 進行分類的,生產者生產消息,消費者消費消息,都是面向 topic 的。

 Topic 是邏輯上的概念,而 partition(分區)是物理上的概念,每個 partition 對應於一個 log 文件,該 log 文件中存儲的就是 producer 生產的數據。Producer 生產的數據會被不斷追加到該 log 文件末端,且每條數據都有自己的 offset。消費者組中的每個消費者,都會實時記錄自己消費到哪個 offset,以便出錯恢復時,從上次的位置繼續消費。

2.1.2 文件存儲機制

圖片

由於生產者生產的消息會不斷追加到 log 文件末尾,爲防止 log 文件過大導致數據定位效率低下,Kafka 採取了分片和索引的機制,將每個 partition 分爲多個 segment。(由 log.segment.bytes 決定,控制每個 segment 的大小,也可通過 log.segment.ms 控制,指定多長時間後日志片段會被關閉)每個 segment 對應兩個文件——“.index”文件和 “.log” 文件。這些文件位於一個文件夾下,該文件夾的命名規則爲:topic 名稱 + 分區序號。例如:bing 這個 topic 有 3 個分區,則其對應的文件夾爲:bing-0、bing-1 和 bing-2。

 索引文件和日誌文件命名規則:每個 LogSegment 都有一個基準偏移量,用來表示當前 LogSegment 中第一條消息的 offset。偏移量是一個 64 位的長整形數,固定是 20 位數字,長度未達到,用 0 進行填補。如下圖所示:

圖片

index 和 log 文件以當前 segment 的第一條消息的 offset 命名。index 文件記錄的是數據文件的 offset 和對應的物理位置,正是有了這個 index 文件,才能對任一數據寫入和查看擁有 O(1) 的複雜度,index 文件的粒度可以通過參數 log.index.interval.bytes 來控制,默認是是每過 4096 字節記錄一條 index。下圖爲 index 文件和 log 文件的結構示意圖:

圖片

查找 message 的流程(比如要查找 offset 爲 170417 的 message):

  1. 首先用二分查找確定它是在哪個 Segment 文件中,其中 0000000000000000000.index 爲最開始的文件,第二個文件爲 0000000000000170410.index(起始偏移爲 170410+1 = 170411),而第三個文件爲 0000000000000239430.index(起始偏移爲 239430+1 = 239431)。所以這個 offset = 170417 就落在第二個文件中。其他後續文件可以依此類推,以起始偏移量命名並排列這些文件,然後根據二分查找法就可以快速定位到具體文件位置。

  2. 用該 offset 減去索引文件的編號,即 170417 - 170410 = 7,也用二分查找法找到索引文件中等於或者小於 7 的最大的那個編號。可以看出我們能夠找到 [4,476] 這組數據,476 即 offset=170410 + 4 = 170414 的消息在 log 文件中的偏移量。

  3. 打開數據文件(0000000000000170410.log),從位置爲 476 的那個地方開始順序掃描直到找到 offset 爲 170417 的那條 Message。

2.1.3 數據過期機制

當日志片段大小達到 log.segment.bytes 指定的上限(默認是 1GB)或者日誌片段打開時長達到 log.segment.ms 時,當前日誌片段就會被關閉,一個新的日誌片段被打開。如果一個日誌片段被關閉,就開始等待過期。當前正在寫入的片段叫做活躍片段**,活躍片段永遠不會被刪除,**所以如果你要保留數據 1 天,但是片段包含 5 天的數據,那麼這些數據就會被保留 5 天,因爲片段被關閉之前,這些數據無法被刪除。

2.2 Kafka 生產者

2.2.1 分區策略

爲什麼要分區

  1. 多 Partition 分佈式存儲,利於集羣數據的均衡。

  2. 併發讀寫,加快讀寫速度。

  3. 加快數據恢復的速率:當某臺機器掛了,每個 Topic 僅需恢復一部分的數據,多機器併發。

分區的原則

  1. 指明 partition 的情況下,使用指定的 partition;

  2. 沒有指明 partition,但是有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取餘得到 partition 值;

  3. 既沒有指定 partition,也沒有 key 的情況下,第一次調用時隨機生成一個整數(後面每次調用在這個整數上自增),將這個值與 topic 可用的 partition 數取餘得到 partition 值,也就是常說的 round-robin 算法。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        //key爲空時,獲取一個自增的計數,然後對分區做取模得到分區編號
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        // key不爲空時,通過key的hash對分區取模(疑問:爲什麼這裏不像上面那樣,使用availablePartitions呢?)
        // 根據《Kafka權威指南》Page45理解:爲了保證相同的鍵,總是能路由到固定的分區,如果使用可用分區,那麼因爲分區數變化,會導致相同的key,路由到不同分區
        // 所以如果要使用key來映射分區,最好在創建主題的時候就把分區規劃好
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
private int nextValue(String topic) {
    //爲每個topic維護了一個AtomicInteger對象,每次獲取時+1
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

2.2.2 數據可靠性保證

kafka 提供了哪些方面的保證

複製

Kafka 的複製機制和分區的多副本架構是 kafka 可靠性保證的核心。把消息寫入多個副本可以使 kafka 在發生奔潰時仍能保證消息的持久性。

kafka 的 topic 被分成多個分區,分區是基本的數據塊。每個分區可以有多個副本,其中一個是首領。所有事件都是發給首領副本,或者直接從首領副本讀取事件。其他副本只需要與首領副本保持同步,並及時複製最新的事件。

Leader 維護了一個動態的 in-sync replica set(ISR),意爲和 leader 保持同步的 follower 集合。當 ISR 中的 follower 完成數據同步後,leader 就會發送 ack。如果 follower 長時間未向 leader 同步數據,則該 follower 將被踢出 ISR,該時間閾值由 replica.lag.time.max.ms 參數設定。Leader 不可用時,將會從 ISR 中選舉新的 leader。滿足以下條件才能被認爲是同步的:

影響 Kafka 消息存儲可靠性的配置

圖片

ack 應答機制

對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒有必要等 ISR 中的 follower 全部接收成功。所以 Kafka 提供了三種可靠性級別,用戶可以根據對可靠性和延遲的要求進行權衡。acks:

消費一致性保證

圖片

(1)follower 故障

 follower 發生故障後會被臨時踢出 ISR,待該 follower 恢復後,follower 會讀取本地磁盤記錄的上次的 HW,並將 log 文件高於 HW 的部分截取掉,從 HW 開始向 leader 進行同步。

等該 follower 的 LEO 大於等於該 Partition 的 HW,即 follower 追上 leader 之後,就可以重新加入 ISR 了。

(2)leader 故障

 leader 發生故障後,會從 ISR 中選出一個新的 leader,之後爲了保證多個副本之間的數據一致性,其餘的 follower 會先將各自的 log 文件高於 HW 的部分截掉,然後從新的 leader 同步數據。

 注意:這隻能保證副本之間的數據一致性,並不能保證數據不丟失或者不重複。

2.2.3 消息發送流程

**Kafka 的 producer 發送消息採用的是異步發送的方式。**在消息發送過程中,涉及到了兩個線程——main 線程和 sender 線程,以及一個線程共享變量——RecordAccumulator。main 線程將消息發送給 RecordAccumulator,sender 線程不斷從 RecordAccumulator 中拉取消息發送到 Kafka broker。

圖片

爲了提高效率,消息被分批次寫入 kafka。批次就是一組消息,這些消息屬於同一個主題和分區。(如果每一個消息都單獨穿行於網絡,會導致大量的網絡開銷,把消息分成批次傳輸可以減少網絡開銷。不過要在時間延遲和吞吐量之間做出權衡:批次越大,單位時間內處理的消息就越多,單個消息的傳輸時間就越長)。批次數據會被壓縮,這樣可以提升數據的傳輸和存儲能力,但要做更多的計算處理。

相關參數:

2.3 Kafka 消費者

2.3.1 消費方式

 consumer 採用 pull(拉)的模式從 broker 中讀取數據。

 push(推)模式很難適應消費速率不同的消費者,因爲消息發送速率是由 broker 決定的。它的目標是儘可能以最快的速度傳遞消息,但是這樣容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式可以根據 consumer 的消費能力以適當的速率消費消息。

 pull 模式的不足之處是,如果 kafka 沒有數據,消費者可能會陷入循環中,一直返回空數據。針對這一點,kafka 的消費者在消費數據時會傳入一個時長參數 timeout,如果當前沒有數據可消費,consumer 會等待一段時間後再返回。

2.3.2 分區分配策略

 一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定哪個 partition 由哪個 consumer 來消費。Kafka 提供了 3 種消費者分區分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。

 PartitionAssignor 接口用於用戶定義實現分區分配算法,以實現 Consumer 之間的分區分配。消費組的成員訂閱它們感興趣的 Topic 並將這種訂閱關係傳遞給作爲訂閱組協調者的 Broker。協調者選擇其中的一個消費者來執行這個消費組的分區分配並將分配結果轉發給消費組內所有的消費者。Kafka 默認採用 RangeAssignor 的分配算法。

2.3.2.1 RangeAssignor

 RangeAssignor 對每個 Topic 進行獨立的分區分配。對於每一個 Topic,首先對分區按照分區 ID 進行排序,然後訂閱這個 Topic 的消費組的消費者再進行排序,之後儘量均衡的將分區分配給消費者。這裏只能是儘量均衡,因爲分區數可能無法被消費者數量整除,那麼有一些消費者就會多分配到一些分區。分配示意圖如下:

圖片

分區分配的算法如下:

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                Map<String, Subscription> subscriptions) {
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());
    //for循環對訂閱的多個topic分別進行處理
    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();
        List<String> consumersForTopic = topicEntry.getValue();
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;
        //對消費者進行排序
        Collections.sort(consumersForTopic);
        //計算平均每個消費者分配的分區數
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        //計算平均分配後多出的分區數
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            //計算第i個消費者,分配分區的起始位置
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            //計算第i個消費者,分配到的分區數量
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

這種分配方式明顯的一個問題是隨着消費者訂閱的 Topic 的數量的增加,不均衡的問題會越來越嚴重,比如上圖中 4 個分區 3 個消費者的場景,C0 會多分配一個分區。如果此時再訂閱一個分區數爲 4 的 Topic,那麼 C0 又會比 C1、C2 多分配一個分區,這樣 C0 總共就比 C1、C2 多分配兩個分區了,而且隨着 Topic 的增加,這個情況會越來越嚴重。分配結果:

圖片

訂閱 2 個 Topic,每個 Topic4 個分區,共 3 個 Consumer

2.3.2.2 RoundRobinAssignor

RoundRobinAssignor 的分配策略是將消費組內訂閱的所有 Topic 的分區及所有消費者進行排序後儘量均衡的分配(RangeAssignor 是針對單個 Topic 的分區進行排序分配的)。如果消費組內,消費者訂閱的 Topic 列表是相同的(每個消費者都訂閱了相同的 Topic),那麼分配結果是儘量均衡的(消費者之間分配到的分區數的差值不會超過 1)。如果訂閱的 Topic 列表是不同的,那麼分配結果是不保證 “儘量均衡” 的,因爲某些消費者不參與一些 Topic 的分配。

圖片

以上兩個 topic 的情況,相比於之前 RangeAssignor 的分配策略,可以使分區分配的更均衡。不過考慮這種情況,假設有三個消費者分別爲 C0、C1、C2,有 3 個 Topic T0、T1、T2,分別擁有 1、2、3 個分區,並且 C0 訂閱 T0,C1 訂閱 T0 和 T1,C2 訂閱 T0、T1、T2,那麼 RoundRobinAssignor 的分配結果如下:

圖片

看上去分配已經儘量的保證均衡了,不過可以發現 C2 承擔了 4 個分區的消費而 C1 訂閱了 T1,是不是把 T1P1 交給 C1 消費能更加的均衡呢?

2.3.2.3 StickyAssignor

StickyAssignor 分區分配算法,目的是在執行一次新的分配時,能在上一次分配的結果的基礎上,儘量少的調整分區分配的變動,節省因分區分配變化帶來的開銷。Sticky 是 “粘性的”,可以理解爲分配結果是帶 “粘性的”——每一次分配變更相對上一次分配做最少的變動。其目標有兩點:

當這兩個目標發生衝突時,優先保證第一個目標。第一個目標是每個分配算法都儘量嘗試去完成的,而第二個目標才真正體現出 StickyAssignor 特性的。

StickyAssignor 算法比較複雜,下面舉例來說明分配的效果(對比 RoundRobinAssignor),前提條件:

圖片

上面紅色的箭頭代表的是有變動的分區分配,可以看出,StickyAssignor 的分配策略,變動較小。

2.3.3 offset 的維護

由於 Consumer 在消費過程中可能會出現斷電宕機等故障,Consumer 恢復後,需要從故障前的位置繼續消費,所以 Consumer 需要實時記錄自己消費到哪個位置,以便故障恢復後繼續消費。Kafka0.9 版本之前,Consumer 默認將 offset 保存在 zookeeper 中,從 0.9 版本開始,Consumer 默認將 offset 保存在 Kafka 一個內置的名字叫_consumeroffsets 的 topic 中。默認是無法讀取的,可以通過設置 consumer.properties 中的 exclude.internal.topics=false 來讀取。

2.3.4 kafka 高效讀寫數據(瞭解)

順序寫磁盤

Kafka 的 producer 生產數據,要寫入到 log 文件中,寫的過程是一直追加到文件末端,爲順序寫。數據表明,同樣的磁盤,順序寫能到 600M/s,而隨機寫只有 100K/s。這與磁盤的機械結構有關,順序寫之所以快,是因爲其省去了大量磁頭尋址的時間。

零拷貝技術

零拷貝主要的任務就是避免 CPU 將數據從一塊存儲拷貝到另外一塊存儲,主要就是利用各種零拷貝技術,避免讓 CPU 做大量的數據拷貝任務,減少不必要的拷貝,或者讓別的組件來做這一類簡單的數據傳輸任務,讓 CPU 解脫出來專注於別的任務。這樣就可以讓系統資源的利用更加有效。

參考文獻

  1. Kafka 中文文檔

  2. [Kafka 系列] 之指定了一個 offset, 怎麼查找到對應的消息?

  3. 尚硅谷 Kafka 教程 (Kafka 框架快速入門)

  4. Kafka 分區分配策略分析——重點:StickyAssignor

  5. Kafka 日誌存儲

  6. 淺析 Linux 中的零拷貝技術

  7. 《Kafka 權威指南》

作者:Li Xiaobing

來源: vivo 互聯網技術

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OQ-YLB6-WPj5nPB_cK55Pg