Kafka Producer 內存緩衝器原理 1
第 6 節
內存緩衝器原理 1
** Kafka Producer 源碼原理**
之前我們分析了 Producer 的配置解析、組件分析、拉取元數據、消息的初步序列化方式、消息的路由策略。如下圖:
這一節我們繼續分析發送消息的內存緩衝器原理—RecordAccumulator.append()。
如何將消息放入內存緩衝器的?
在 doSend 中的,拉取元數據、消息的初步序列化方式、消息的路由策略之後就是 accumulator.append()。
如下代碼所示:(去除了多餘的日誌和異常處理,截取了核心代碼)
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//拉取元數據、消息的初步序列化方式、消息的路由策略
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
byte[] serializedKey = keySerializer.serialize(record.topic(), record.key());
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
Callback interceptCallback = this.interceptors == null ?
callback : new InterceptorCallback<>(callback, this.interceptors, tp);
// 將路由結果、初步序列化的消息放入到消息內存緩衝器中
RecordAccumulator.RecordAppendResult result =
accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
} catch (Exception e) {
throw e;
}
//省略其他各種異常捕獲
}
accumulator.append() 它主要是將路由結果、初步序列化的消息放入到消息內存緩衝器中。
分析如何將消息放入內存緩衝器之前,需要回顧下它內部的基本結構。之前組件分析的時候,我們初步分析過 RecordAccumulator 的大體結構,如下圖:
1)設置了一些參數 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等
2)初始化了一些數據結構,比如 batches 是一個 new CopyOnWriteMap<>()
3)初始化了 BufferPool 和 IncompleteRecordBatches
回顧了 RecordAccumulator 這個組件之後,我們就來看看到底如何將消息放入內存緩衝器的數據結構中的。
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
try {
// check if we have an in-progress batch
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
整個方法的脈絡,看着邏輯比較多,涉及了很多數據結構,我們一步一步來分析下。第一次看的話,大體你可以梳理如下脈絡:
1)getOrCreateDeque 這個方法應該是才創建一個雙端隊列,隊列放的每一個元素不是單條消息 Record,而是消息的集合 RecordBatch。
2)free.allocate 應該是在分配內存緩衝器中的內存
3)tryAppend 應該是將消息放入內存中
創建存放消息集合的隊列
在將消息放入內存緩衝器之前,首先通過 getOrCreateDeque 創建的是一個存放消息集合的隊列。代碼如下:
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
public RecordAccumulator(int batchSize,
long totalSize,
CompressionType compression,
long lingerMs,
long retryBackoffMs,
Metrics metrics,
Time time) {
//省略...
this.batches = new CopyOnWriteMap<>();
//省略...
}
/**
* Get the deque for the given topic-partition, creating it if necessary.
*/
private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
Deque<RecordBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
這個創建的內存結構可以看到,是**一個變量 batches,它是一個 CopyOnWriteMap****。**這個數據結構之前我們組件圖初步分析過。再結合這段代碼,不難理解它的脈絡:
這個 map 主要根據 Topic 分區信息作爲 key,value 是一個隊列核心數據結構是 RecordBatch,由於是第一次給某個 topic 分區發送的消息,value 爲空,需要初始化隊列,否則說明曾經給這個 topic 的分區發送給數據,value 非空,直接返回之前的隊列。
由於我們這裏是第一次向 test-topic 發送消息,所以可以得到下圖的數據結構:
之後執行了一段加鎖邏輯,之前提到,tryAppend 應該是將消息放入內存中。但是由於隊列是剛創建的,deque.peekLast(); 肯定是空,所以這段加鎖的代碼不會執行。
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque){
RecordBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null)
last.records.close();
else
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
}
return null;
}
但是到這裏你會發現代碼一個明顯的特點:
在這些代碼紅,使用了的 synchronized 加鎖和線程安全的內存結構 CopyOnWriteMap,這些都是明顯線程安全的控制。
爲什麼呢?因爲同一個 Producer 可以使用多線程進行發送消息,必然要考慮線程安全的很多東西。
爲什麼選用 CopyOnWriteMap, 而不用 ConcurrentHashMap 呢?你可以思考下。(這裏給個提示,JDK 成長記提到過,CopyOnWriteMap 它的底層是寫時複製,適合讀多寫少的場景)
synchronized 加鎖代碼塊使用了,分段加鎖,並沒有暴力的在方法上加 synchronized。這也是一個使用亮點。
寫在結尾的話
到這裏,你會發現在中間件會大量的見到併發包下的組件的使用,工作中你用到可能都是鳳毛麟角,這些組件的使用是我們研究中間件源碼值得學習的一點。
你一定要多思考爲什麼,不要停留在是什麼,怎麼用上,這個思想需要刻意訓練,希望你可以慢慢養成。
好了,今天的內容就到這裏,之前有同學反饋,每一節的只是太過於幹了,實實在在的乾貨!看起來有時候會比較費勁,所以之後的章節儘量會避免上萬字的大章節,會控制在 6000 字左右。
另外,等端午過後,每週除了 2 更的 java 或者大數據技術的乾貨成長記外,我會分享我自己的故事和行業中遇見的事情,希望大家從我的經歷中可以有另一番成長和收穫,比如我是如何學習和提升技術的?我是如何畫圖的?我如何做技術分享的等等。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/E2X-gxMF9otxQFipThEIGA