Kafka Producer 發送消息的內存緩衝器原理 2
** Kafka Producer 源碼原理**
第 1 節
內存緩衝的創建
上一節我們分析到如何將消息放入內存緩衝器主要分三步,如下圖所示:
我們重點分析了 getOrCreateDeque() 方法,它主要創建瞭如下數據結構,如下所示:
這一節我們繼續向下分析,看看如何通過 BufferPool 申請內存空間 NIO 的直接內存 ByteBuffer 的。
BufferPool 的創建
內存緩衝區,分配內存的邏輯代碼主要如下所示:
private final BufferPool free;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
//getOrCreateDeque()相關邏輯 省略...
//free.allocate()相關邏輯
// we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
//tryAppend相關邏輯 省略...
}
可以看到這個邏輯非常簡單,只是計算了一個空間大小,之後根據 free.allocate() 創建內存空間 ByteBuffer。
熟悉 NIO 的同學,一定知道 ByteBuffer 這個組件,是 NIO 核心 3 大組件之一。它是一塊直接內存,什麼意思呢?就是不受 JVM 垃圾回收器管理和回收,由單獨 HotSpot 虛擬機中 C++ 的線程負責回收。這樣的好處就是避免創建的內存空間,頻繁的被 GC,而且可以達到很好的重用性。這一點是不錯的思考。而且由於 Kafka 底層使用 NIO 進行通信,使用 ByteBuffer 存放的數據,可以更好、更簡單的被髮送出去。
好了回到正題,這個 ByteBuffer 可以明顯的看到是被 BufferPool 的 allocate 方法創建的。但是在研究 allocate 方法之前,我們先來看看 ByteBuffer 是如何創建的。
在之前第二節組件分析時,初步看過 BufferPool 這個類的結構,可以看到之前初始化 RecordAccumulator 時候,創建的 BufferPool。它的基本核心是一個 ReentrantLock 和 Deque free 隊列。如下圖所示:
有了之前初步的瞭解,現在我們再仔細看下它的創建細節:
public final class BufferPool {
private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
private long availableMemory;
private final Metrics metrics;
private final Time time;
private final Sensor waitTime;
/**
* Create a new buffer pool
*
* @param memory The maximum amount of memory that this buffer pool can allocate
* @param poolableSize The buffer size to cache in the free list rather than deallocating
* @param metrics instance of Metrics
* @param time time instance
* @param metricGrpName logical group name for metrics
*/
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
this.free = new ArrayDeque<ByteBuffer>();
this.waiters = new ArrayDeque<Condition>();
this.totalMemory = memory;
this.availableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor("bufferpool-wait-time");
MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
}
這個構造函數主要脈絡如下:
**1)根據入參,設置核心的參數。**主要有兩個,long memory, int poolableSize, 其餘的入參都是時間或者統計相關的,可以先忽略。你可以向上查找構造函數傳遞入參的入口,最終會找到 ConfigDef 中默認初始化的值。如下:
memory 默認對應的配置 buffer.memory=33554432 ,也就是總緩衝區的大小,默認是 32MB。poolableSize 對應的配置 batch.size=16384, 默認是 16KB,也就是說消息可以打包的 batch 默認一批是 16KB。這裏要注意如果消息比較大,這個兩個參數需要適當調整。
**2)初始化核心內存結構和一把鎖。**new ArrayDeque()、new ArrayDeque()、new ReentrantLock()。(Condition 和 ReentrantLock 都是 JDK 併發包下的常用類。不熟悉的同學可以回顧下 JDK 成長記)
構造函數的邏輯整體如下圖所示:
你可以連蒙帶猜下,free 這個隊列,應該是存放內存塊 ByteBuffer 的。由於是 ArrayDeque,所以需要 ReentrantLock 進行併發控制。waiters 的 Condition 隊列暫時不知道是做什麼的,可能是線程排隊等待獲取內存塊使用的。
BufferPool 如何申請內存
創建好了 BufferPool,它是如何通過 allocate() 申請內存的呢?
首先申請內存前需要明確申請內存的大小 size,如下:
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
public interface Records extends Iterable<LogEntry> {
int SIZE_LENGTH = 4;
int OFFSET_LENGTH = 8;
int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
}
size 的計算涉及到了幾個值取 Max 的邏輯。
batchSize 就是之前 BufferPool 使用的參數,默認是 16KB。
LOG_OVERHEAD + 消息大小:12+keyBytes.size()+valueBytes.size();
簡單的說意思就是,如果消息的大小大於默認的 batchSize,申請的內存以消息大小爲主,否則就是默認 batchSize 的大小 16KB。
PS:batchSize 一般根據我們發送的消息肯定會調整的,如果你消息大於 16KB,之後打包發送的時候是基於 batchSize 大小的 ByteBuffer 內存塊的,結果由於你的消息大小超過默認 batchSize,每次打包發送其實就是一條消息,這樣每一條消息一次網絡傳輸,批量打包發送的意義就不大了。
上面的邏輯如下圖所示:
確認了申請內存空間的大小後,就會執行如下代碼申請內存了:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request
freeUp(size);
this.availableMemory -= size;
lock.unlock();
return ByteBuffer.allocate(size);
} else {
// we are out of memory and will have to block
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
this.waiters.remove(moreMemory);
throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// remove the condition for this thread to let the next thread
// in line start getting memory
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// signal any additional waiters if there is more memory left
// over for them
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
// unlock and return the buffer
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
lock.unlock();
}
}
這個方法比較長,但是邏輯比較清晰,整體分爲一個大的 if-else 主要脈絡如下:
1)最外層的 if 主要邏輯是:如果 free 隊列存在空閒內存,直接使用,否則創建一塊大小爲 size 的 ByteBuffer,可用內存會扣減相應值
2)else 主要邏輯是:如果總緩衝區的內存 32MB 都使用完了,線程需要通過 Condition 隊列進行排隊等待,獲取 ByteBuffer
整體如下圖所示:
我們分別來看下細節,首先是第一段邏輯:
//如果free隊列存在空閒內存,直接使用
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = this.free.size() * this.poolableSize;
if (this.availableMemory + freeListSize >= size) {
//創建一塊大小爲size的ByteBuffer,可用內存會扣減相應值
// we have enough unallocated or pooled memory to immediately
// satisfy the request
freeUp(size);
this.availableMemory -= size;
lock.unlock();
return ByteBuffer.allocate(size);
}
這塊邏輯很簡單。獲取 ByteBuffer 的方式不是從 free 隊列就是新創建。
但是這裏有一個問題,free 隊列什麼時候有值的?
其實可以猜到,當從緩衝區發送出去消息後,會清空 ByteBuffer,之後就會空閒這塊內存,自然也就會加入 free 這個隊列中了。你可以搜索下這個 free 隊列的引用自己大體看下。之後分析如何發送緩衝器中的消息時會帶大家看到的。
剩下的第二段邏輯是總內存不夠用的時候線程排隊等待,之後喚醒的邏輯。這塊邏輯考慮很多特殊邏輯,看上去比較複雜。
// we are out of memory and will have to block
int accumulated = 0;
ByteBuffer buffer = null;
Condition moreMemory = this.lock.newCondition();
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
this.waiters.remove(moreMemory);
throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.availableMemory);
this.availableMemory -= got;
accumulated += got;
}
}
// remove the condition for this thread to let the next thread
// in line start getting memory
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
// signal any additional waiters if there is more memory left
// over for them
if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
}
// unlock and return the buffer
lock.unlock();
if (buffer == null)
return ByteBuffer.allocate(size);
else
return buffer;
}
但是當你梳理清楚後,發現**其實本質就是 Condition 的 await 和 signal 而已。而且這裏有一個最大的等待超時時間,超時後會拋出異常。**具體就不一步一步帶大家分析了,我們肯定是儘量避免這種情況的。大體邏輯總結如下圖:
Condition 這個 waiter 隊列如何被喚醒的呢?其實和 free 內存增加是一樣的,當發送消息之後,內存使用完成,有可用內存之後,自然會被喚醒,之後分析如何發送緩衝器中的消息時會帶大家看到的。如下所示:
小結
好了, 到這裏,內存緩衝器 RecordAccumulator 通過 BufferPool 申請內存的源碼原理基本就分析完了。你主要知道了:
BufferPool 的創建直接內存 ByteBuffer 的原因
兩個核心的參數 batchSize=16kb,bufferMemory=32MB
核心數據結構 Deque waiters 和 Dequefree。
每一塊 ByteBuffer 的大小計算邏輯
如何申請和重用內存 ByteBuffer 的邏輯
下一節我們繼續來分析發送消息的內存緩衝器原理—tryAppend 的邏輯。之後如何打包消息,並將打包好的消息發送出去的。消息的最終序列化格式和 NIO 的拆包粘包問題。大家敬請期待!
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/2gyb1xhACwFElkyv6Go3Ow