圖解 Kafka Producer 消息緩存模型

在閱讀本文之前, 希望你可以思考一下下面幾個問題, 帶着問題去閱讀文章會獲得更好的效果。

  1. 發送消息的時候, 當 Broker 掛掉了, 消息體還能寫入到消息緩存中嗎?

  2. 當消息還存儲在緩存中的時候, 假如 Producer 客戶端掛掉了, 消息是不是就丟失了?

  3. 當最新的 ProducerBatch 還有空餘的內存, 但是接下來的一條消息很大, 不足以加上上一個 Batch 中, 會怎麼辦呢?

  4. 那麼創建 ProducerBatch 的時候, 應該分配多少的內存呢?

什麼是消息累加器 RecordAccumulator

kafka 爲了提高 Producer 客戶端的發送吞吐量和提高性能, 選擇了將消息暫時緩存起來, 等到滿足一定的條件, 再進行批量發送, 這樣可以減少網絡請求, 提高吞吐量。

而緩存這個消息的就是 RecordAccumulator 類.

上圖就是整個消息存放的緩存模型, 我們接下來一個個來講解。

消息緩存模型

上圖表示的就是 消息緩存的模型, 生產的消息就是暫時存放在這個裏面。

  1. 每條消息, 我們按照 TopicPartition 維度, 把他們放在不同的Deque<ProducerBatch> 隊列裏面。TopicPartition 相同, 會在相同Deque<ProducerBatch> 的裏面。

  2. ProducerBatch : 表示同一個批次的消息, 消息真正發送到 Broker 端的時候都是按照批次來發送的, 這個批次可能包含一條或者多條消息。

  3. 如果沒有找到消息對應的 ProducerBatch 隊列, 則創建一個隊列。

  4. 找到 ProducerBatch 隊列隊尾的 Batch, 發現 Batch 還可以塞下這條消息, 則將消息直接塞到這個 Batch 中

  5. 找到 ProducerBatch 隊列隊尾的 Batch, 發現 Batch 中剩餘內存, 不夠塞下這條消息, 則會創建新的 Batch

  6. 當消息發送成功之後, Batch 會被釋放掉。

ProducerBatch 的內存大小

那麼創建 ProducerBatch 的時候, 應該分配多少的內存呢?

先說結論: 當消息預估內存大於batch.size的時候, 則按照消息預估內存創建, 否則按照batch.size的大小創建 (默認 16k).

我們來看一段代碼,這段代碼就是在創建 ProducerBatch 的時候預估內存的大小

RecordAccumulator#append

    /**
     * 公衆號: 石臻臻的雜貨鋪
     * 微信:szzdzhp001
     **/
       // 找到 batch.size 和 這條消息在batch中的總內存大小的 最大值
       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       // 申請內存
       buffer = free.allocate(size, maxTimeToBlock);
  1. 假設當前生產了一條消息爲 M, 剛好消息 M 找不到可以存放消息的 ProducerBatch(不存在或者滿了),那麼這個時候就需要創建一個新的 ProducerBatch 了

  2. 預估消息的大小 跟batch.size 默認大小 16384(16kb). 對比, 取最大值用於申請的內存大小的值。

那麼, 這個消息的預估是如何預估的?純粹的是消息體的大小嗎?

DefaultRecordBatch#estimateBatchSizeUpperBound

預估需要的 Batch 大小, 是一個預估值, 因爲沒有考慮壓縮算法從額外開銷

    /**
    * 使用給定的鍵和值獲取只有一條記錄的批次大小的上限。
    * 這只是一個估計,因爲它沒有考慮使用的壓縮算法的額外開銷。
    **/
    static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
        return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
    }
  1. 預估這個消息 M 的大小 + 一個 RECORD_BATCH_OVERHEAD 的大小

  2. RECORD_BATCH_OVERHEAD 是一個 Batch 裏面的一些基本元信息, 總共佔用了 61B

  3. 消息 M 的大小也並不是單單的只有消息體的大小, 總大小 =(key,value,headers) 的大小 +MAX_RECORD_OVERHEAD

  4. MAX_RECORD_OVERHEAD :一條消息頭最大佔用空間, 最大值爲 21B

也就是說創建一個 ProducerBatch, 最少就要 83B .

比如我發送一條消息 "1" , 預估得到的大小是 86B, 跟batch.size(默認16384) 相比取最大值。那麼申請內存的時候取最大值 16384 。

關於 Batch 的結構和消息的結構, 我們回頭單獨用一篇文章來講解

內存分配

我們都知道 RecordAccumulator 裏面的緩存大小是一開始定義好的, 由buffer.memory控制, 默認 33554432 (32M)

當生產的速度大於發送速度的時候, 就可能出現 Producer 寫入阻塞。

而且頻繁的創建和釋放 ProducerBatch, 會導致頻繁 GC, 所有 kafka 中有個緩存池的概念,這個緩存池會被重複使用, 但是隻有固定 (batch.size) 的大小才能夠使用緩存池。

PS:以下 16k 指得是 batch.size 的默認值.

Batch 的創建和釋放

1. 內存 16K 緩存池中有可用內存

①. 創建 Batch 的時候, 會去緩存池中, 獲取隊首的一塊內存 ByteBuffer 使用。

②. 消息發送完成, 釋放 Batch, 則會把這個 ByteBuffer, 放到緩存池的隊尾中, 並且調用ByteBuffer.clear 清空數據。以便下次重複使用

2. 內存 16K 緩存池中無可用內存

①. 創建 Batch 的時候, 去非緩存池中的內存獲取一部分內存用於創建 Batch. 注意:這裏說的獲取內存給 Batch, 其實就是讓 非緩存池 nonPooledAvailableMemory 減少 16K 的內存, 然後 Batch 正常創建就行了, 不要誤以爲好像真的發生了內存的轉移。

②. 消息發送完成, 釋放 Batch, 則會把這個 ByteBuffer, 放到緩存池的隊尾中, 並且調用ByteBuffer.clear 清空數據, 以便下次重複使用

3. 內存非 16K 非緩存池中內存夠用

①. 創建 Batch 的時候, 去非緩存池 (nonPooledAvailableMemory) 內存獲取一部分內存用於創建 Batch. 注意:這裏說的獲取內存給 Batch, 其實就是讓 非緩存池(nonPooledAvailableMemory) 減少對應的內存, 然後 Batch 正常創建就行了, 不要誤以爲好像真的發生了內存的轉移。

②. 消息發送完成, 釋放 Batch, 純粹的是在非緩存池 (nonPooledAvailableMemory) 中加上剛剛釋放的 Batch 內存大小。當然這個 Batch 會被 GC 掉

4. 內存非 16K  非緩存池內存不夠用

①. 先嚐試將 緩存池中的內存一個一個釋放到 非緩存池中, 直到非緩存池中的內存夠用與創建 Batch 了

②. 創建 Batch 的時候, 去非緩存池 (nonPooledAvailableMemory) 內存獲取一部分內存用於創建 Batch. 注意:這裏說的獲取內存給 Batch, 其實就是讓 非緩存池(nonPooledAvailableMemory) 減少對應的內存, 然後 Batch 正常創建就行了, 不要誤以爲好像真的發生了內存的轉移。

③. 消息發送完成, 釋放 Batch, 純粹的是在非緩存池 (nonPooledAvailableMemory) 中加上剛剛釋放的 Batch 內存大小。當然這個 Batch 會被 GC 掉

例如: 下面我們需要創建 48k 的 batch, 因爲超過了 16k, 所以需要在非緩存池中分配內存, 但是非緩存池中當前可用內存爲 0 , 分配不了, 這個時候就會嘗試去 緩存池裏面釋放一部分內存到 非緩存池。

釋放第一個 ByteBuffer(16k) 不夠,則繼續釋放第二個, 直到釋放了 3 個之後總共 48k,發現內存這時候夠了, 再去創建 Batch。

注意:這裏我們涉及到的 非緩存池中的內存分配, 僅僅指的的內存數字的增加和減少。

問題和答案

  1. 發送消息的時候, 當 Broker 掛掉了, 消息體還能寫入到消息緩存中嗎?

當 Broker 掛掉了, Producer 會提示下面的警告⚠️,  但是發送消息過程中

這個消息體還是可以寫入到 消息緩存中的, 也僅僅是寫到到緩存中而已。

 WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available

  1. 當最新的 ProducerBatch 還有空餘的內存, 但是接下來的一條消息很大, 不足以加上上一個 Batch 中, 會怎麼辦呢?

那麼會創建新的 ProducerBatch。

  1. 那麼創建 ProducerBatch 的時候, 應該分配多少的內存呢?

觸發創建 ProducerBatch 的那條消息預估大小大於 batch.size ,則以預估內存創建。否則, 以 batch.size 創建。

還有一個問題供大家思考:

當消息還存儲在緩存中的時候, 假如 Producer 客戶端掛掉了, 消息是不是就丟失了?

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/gWJ0-FafnMtLQwCjFfWVJQ