圖解 Kafka Producer 消息緩存模型
在閱讀本文之前, 希望你可以思考一下下面幾個問題, 帶着問題去閱讀文章會獲得更好的效果。
-
發送消息的時候, 當 Broker 掛掉了, 消息體還能寫入到消息緩存中嗎?
-
當消息還存儲在緩存中的時候, 假如 Producer 客戶端掛掉了, 消息是不是就丟失了?
-
當最新的 ProducerBatch 還有空餘的內存, 但是接下來的一條消息很大, 不足以加上上一個 Batch 中, 會怎麼辦呢?
-
那麼創建 ProducerBatch 的時候, 應該分配多少的內存呢?
什麼是消息累加器 RecordAccumulator
kafka 爲了提高 Producer 客戶端的發送吞吐量和提高性能, 選擇了將消息暫時緩存起來, 等到滿足一定的條件, 再進行批量發送, 這樣可以減少網絡請求, 提高吞吐量。
而緩存這個消息的就是 RecordAccumulator 類.
上圖就是整個消息存放的緩存模型, 我們接下來一個個來講解。
消息緩存模型
上圖表示的就是 消息緩存的模型, 生產的消息就是暫時存放在這個裏面。
-
每條消息, 我們按照 TopicPartition 維度, 把他們放在不同的
Deque<ProducerBatch>
隊列裏面。TopicPartition 相同, 會在相同Deque<ProducerBatch>
的裏面。 -
ProducerBatch
: 表示同一個批次的消息, 消息真正發送到 Broker 端的時候都是按照批次來發送的, 這個批次可能包含一條或者多條消息。 -
如果沒有找到消息對應的 ProducerBatch 隊列, 則創建一個隊列。
-
找到 ProducerBatch 隊列隊尾的 Batch, 發現 Batch 還可以塞下這條消息, 則將消息直接塞到這個 Batch 中
-
找到 ProducerBatch 隊列隊尾的 Batch, 發現 Batch 中剩餘內存, 不夠塞下這條消息, 則會創建新的 Batch
-
當消息發送成功之後, Batch 會被釋放掉。
ProducerBatch 的內存大小
那麼創建 ProducerBatch 的時候, 應該分配多少的內存呢?
先說結論: 當消息預估內存大於batch.size
的時候, 則按照消息預估內存創建, 否則按照batch.size
的大小創建 (默認 16k).
我們來看一段代碼,這段代碼就是在創建 ProducerBatch 的時候預估內存的大小
RecordAccumulator#append
/**
* 公衆號: 石臻臻的雜貨鋪
* 微信:szzdzhp001
**/
// 找到 batch.size 和 這條消息在batch中的總內存大小的 最大值
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
// 申請內存
buffer = free.allocate(size, maxTimeToBlock);
-
假設當前生產了一條消息爲 M, 剛好消息 M 找不到可以存放消息的 ProducerBatch(不存在或者滿了),那麼這個時候就需要創建一個新的 ProducerBatch 了
-
預估消息的大小 跟
batch.size
默認大小 16384(16kb). 對比, 取最大值用於申請的內存大小的值。
那麼, 這個消息的預估是如何預估的?純粹的是消息體的大小嗎?
DefaultRecordBatch#estimateBatchSizeUpperBound
預估需要的 Batch 大小, 是一個預估值, 因爲沒有考慮壓縮算法從額外開銷
/**
* 使用給定的鍵和值獲取只有一條記錄的批次大小的上限。
* 這只是一個估計,因爲它沒有考慮使用的壓縮算法的額外開銷。
**/
static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers);
}
-
預估這個消息 M 的大小 + 一個 RECORD_BATCH_OVERHEAD 的大小
-
RECORD_BATCH_OVERHEAD 是一個 Batch 裏面的一些基本元信息, 總共佔用了 61B
-
消息 M 的大小也並不是單單的只有消息體的大小, 總大小 =(key,value,headers) 的大小 +MAX_RECORD_OVERHEAD
-
MAX_RECORD_OVERHEAD :一條消息頭最大佔用空間, 最大值爲 21B
也就是說創建一個 ProducerBatch, 最少就要 83B .
比如我發送一條消息 "1" , 預估得到的大小是 86B, 跟batch.size(默認16384)
相比取最大值。那麼申請內存的時候取最大值 16384 。
關於 Batch 的結構和消息的結構, 我們回頭單獨用一篇文章來講解。
內存分配
我們都知道 RecordAccumulator 裏面的緩存大小是一開始定義好的, 由buffer.memory
控制, 默認 33554432 (32M)
當生產的速度大於發送速度的時候, 就可能出現 Producer 寫入阻塞。
而且頻繁的創建和釋放 ProducerBatch, 會導致頻繁 GC, 所有 kafka 中有個緩存池的概念,這個緩存池會被重複使用, 但是隻有固定 (batch.size) 的大小才能夠使用緩存池。
PS:以下 16k 指得是 batch.size 的默認值.
Batch 的創建和釋放
1. 內存 16K 緩存池中有可用內存
①. 創建 Batch 的時候, 會去緩存池中, 獲取隊首的一塊內存 ByteBuffer 使用。
②. 消息發送完成, 釋放 Batch, 則會把這個 ByteBuffer, 放到緩存池的隊尾中, 並且調用ByteBuffer.clear
清空數據。以便下次重複使用
2. 內存 16K 緩存池中無可用內存
①. 創建 Batch 的時候, 去非緩存池中的內存獲取一部分內存用於創建 Batch. 注意:這裏說的獲取內存給 Batch, 其實就是讓 非緩存池 nonPooledAvailableMemory 減少 16K 的內存, 然後 Batch 正常創建就行了, 不要誤以爲好像真的發生了內存的轉移。
②. 消息發送完成, 釋放 Batch, 則會把這個 ByteBuffer, 放到緩存池的隊尾中, 並且調用ByteBuffer.clear
清空數據, 以便下次重複使用
3. 內存非 16K 非緩存池中內存夠用
①. 創建 Batch 的時候, 去非緩存池 (nonPooledAvailableMemory) 內存獲取一部分內存用於創建 Batch. 注意:這裏說的獲取內存給 Batch, 其實就是讓 非緩存池(nonPooledAvailableMemory) 減少對應的內存, 然後 Batch 正常創建就行了, 不要誤以爲好像真的發生了內存的轉移。
②. 消息發送完成, 釋放 Batch, 純粹的是在非緩存池 (nonPooledAvailableMemory) 中加上剛剛釋放的 Batch 內存大小。當然這個 Batch 會被 GC 掉
4. 內存非 16K 非緩存池內存不夠用
①. 先嚐試將 緩存池中的內存一個一個釋放到 非緩存池中, 直到非緩存池中的內存夠用與創建 Batch 了
②. 創建 Batch 的時候, 去非緩存池 (nonPooledAvailableMemory) 內存獲取一部分內存用於創建 Batch. 注意:這裏說的獲取內存給 Batch, 其實就是讓 非緩存池(nonPooledAvailableMemory) 減少對應的內存, 然後 Batch 正常創建就行了, 不要誤以爲好像真的發生了內存的轉移。
③. 消息發送完成, 釋放 Batch, 純粹的是在非緩存池 (nonPooledAvailableMemory) 中加上剛剛釋放的 Batch 內存大小。當然這個 Batch 會被 GC 掉
例如: 下面我們需要創建 48k 的 batch, 因爲超過了 16k, 所以需要在非緩存池中分配內存, 但是非緩存池中當前可用內存爲 0 , 分配不了, 這個時候就會嘗試去 緩存池裏面釋放一部分內存到 非緩存池。
釋放第一個 ByteBuffer(16k) 不夠,則繼續釋放第二個, 直到釋放了 3 個之後總共 48k,發現內存這時候夠了, 再去創建 Batch。
注意:這裏我們涉及到的 非緩存池中的內存分配, 僅僅指的的內存數字的增加和減少。
問題和答案
- 發送消息的時候, 當 Broker 掛掉了, 消息體還能寫入到消息緩存中嗎?
當 Broker 掛掉了, Producer 會提示下面的警告⚠️, 但是發送消息過程中
這個消息體還是可以寫入到 消息緩存中的, 也僅僅是寫到到緩存中而已。
WARN [Producer clientId=console-producer] Connection to node 0 (/172.23.164.192:9090) could not be established. Broker may not be available
- 當最新的 ProducerBatch 還有空餘的內存, 但是接下來的一條消息很大, 不足以加上上一個 Batch 中, 會怎麼辦呢?
那麼會創建新的 ProducerBatch。
- 那麼創建 ProducerBatch 的時候, 應該分配多少的內存呢?
觸發創建 ProducerBatch 的那條消息預估大小大於 batch.size ,則以預估內存創建。否則, 以 batch.size 創建。
還有一個問題供大家思考:
當消息還存儲在緩存中的時候, 假如 Producer 客戶端掛掉了, 消息是不是就丟失了?
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/gWJ0-FafnMtLQwCjFfWVJQ