Kafka Producer 發送消息的內存緩衝器原理 2

**  Kafka Producer 源碼原理**

第 1 節

內存緩衝的創建

上一節我們分析到如何將消息放入內存緩衝器主要分三步,如下圖所示:

我們重點分析了 getOrCreateDeque() 方法,它主要創建瞭如下數據結構,如下所示:

這一節我們繼續向下分析,看看如何通過 BufferPool 申請內存空間 NIO 的直接內存 ByteBuffer 的。

BufferPool 的創建

內存緩衝區,分配內存的邏輯代碼主要如下所示:

private final BufferPool free;

public RecordAppendResult append(TopicPartition tp,
                                    long timestamp,
                                    byte[] key,
                                    byte[] value,
                                    Callback callback,
                                    long maxTimeToBlock) throws InterruptedException {

       //getOrCreateDeque()相關邏輯 省略...

       //free.allocate()相關邏輯
       // we don't have an in-progress record batch try to allocate a new batch
       int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
       ByteBuffer buffer = free.allocate(size, maxTimeToBlock);

       //tryAppend相關邏輯 省略...  
  }

可以看到這個邏輯非常簡單,只是計算了一個空間大小,之後根據 free.allocate() 創建內存空間 ByteBuffer。

熟悉 NIO 的同學,一定知道 ByteBuffer 這個組件,是 NIO 核心 3 大組件之一。它是一塊直接內存,什麼意思呢?就是不受 JVM 垃圾回收器管理和回收,由單獨 HotSpot 虛擬機中 C++ 的線程負責回收。這樣的好處就是避免創建的內存空間,頻繁的被 GC,而且可以達到很好的重用性。這一點是不錯的思考。而且由於 Kafka 底層使用 NIO 進行通信,使用 ByteBuffer 存放的數據,可以更好、更簡單的被髮送出去。

好了回到正題,這個 ByteBuffer 可以明顯的看到是被 BufferPool 的 allocate 方法創建的。但是在研究 allocate 方法之前,我們先來看看 ByteBuffer 是如何創建的。

在之前第二節組件分析時,初步看過 BufferPool 這個類的結構,可以看到之前初始化 RecordAccumulator 時候,創建的 BufferPool。它的基本核心是一個 ReentrantLock 和 Deque free 隊列。如下圖所示:

有了之前初步的瞭解,現在我們再仔細看下它的創建細節:

public final class BufferPool {

   private final long totalMemory;
   private final int poolableSize;
   private final ReentrantLock lock;
   private final Deque<ByteBuffer> free;
   private final Deque<Condition> waiters;
   private long availableMemory;
   private final Metrics metrics;
   private final Time time;
   private final Sensor waitTime;

   /**
    * Create a new buffer pool
    *
    * @param memory The maximum amount of memory that this buffer pool can allocate
    * @param poolableSize The buffer size to cache in the free list rather than deallocating
    * @param metrics instance of Metrics
    * @param time time instance
    * @param metricGrpName logical group name for metrics
    */
   public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
       this.poolableSize = poolableSize;
       this.lock = new ReentrantLock();
       this.free = new ArrayDeque<ByteBuffer>();
       this.waiters = new ArrayDeque<Condition>();
       this.totalMemory = memory;
       this.availableMemory = memory;
       this.metrics = metrics;
       this.time = time;
       this.waitTime = this.metrics.sensor("bufferpool-wait-time");
       MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                                  metricGrpName,
                                                  "The fraction of time an appender waits for space allocation.");
       this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
  }
}

這個構造函數主要脈絡如下:

**1)根據入參,設置核心的參數。**主要有兩個,long memory, int poolableSize, 其餘的入參都是時間或者統計相關的,可以先忽略。你可以向上查找構造函數傳遞入參的入口,最終會找到 ConfigDef 中默認初始化的值。如下:

memory 默認對應的配置 buffer.memory=33554432 ,也就是總緩衝區的大小,默認是 32MB。poolableSize 對應的配置 batch.size=16384, 默認是 16KB,也就是說消息可以打包的 batch 默認一批是 16KB。這裏要注意如果消息比較大,這個兩個參數需要適當調整。

**2)初始化核心內存結構和一把鎖。**new ArrayDeque()、new ArrayDeque()、new ReentrantLock()。(Condition 和 ReentrantLock 都是 JDK 併發包下的常用類。不熟悉的同學可以回顧下 JDK 成長記)

構造函數的邏輯整體如下圖所示:

你可以連蒙帶猜下,free 這個隊列,應該是存放內存塊 ByteBuffer 的。由於是 ArrayDeque,所以需要 ReentrantLock 進行併發控制。waiters 的 Condition 隊列暫時不知道是做什麼的,可能是線程排隊等待獲取內存塊使用的。

BufferPool 如何申請內存

創建好了 BufferPool,它是如何通過 allocate() 申請內存的呢?

首先申請內存前需要明確申請內存的大小 size,如下:

int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);

public interface Records extends Iterable<LogEntry> {

   int SIZE_LENGTH = 4;
   int OFFSET_LENGTH = 8;
   int LOG_OVERHEAD = SIZE_LENGTH + OFFSET_LENGTH;
}

size 的計算涉及到了幾個值取 Max 的邏輯。

batchSize 就是之前 BufferPool 使用的參數,默認是 16KB。

LOG_OVERHEAD + 消息大小:12+keyBytes.size()+valueBytes.size();

簡單的說意思就是,如果消息的大小大於默認的 batchSize,申請的內存以消息大小爲主,否則就是默認 batchSize 的大小 16KB。

PS:batchSize 一般根據我們發送的消息肯定會調整的,如果你消息大於 16KB,之後打包發送的時候是基於 batchSize 大小的 ByteBuffer 內存塊的,結果由於你的消息大小超過默認 batchSize,每次打包發送其實就是一條消息,這樣每一條消息一次網絡傳輸,批量打包發送的意義就不大了。

上面的邏輯如下圖所示:

確認了申請內存空間的大小後,就會執行如下代碼申請內存了:

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
   if (size > this.totalMemory)
       throw new IllegalArgumentException("Attempt to allocate " + size
                                          + " bytes, but there is a hard limit of "
                                          + this.totalMemory
                                          + " on memory allocations.");

   this.lock.lock();
   try {
       // check if we have a free buffer of the right size pooled
       if (size == poolableSize && !this.free.isEmpty())
           return this.free.pollFirst();

       // now check if the request is immediately satisfiable with the
       // memory on hand or if we need to block
       int freeListSize = this.free.size() * this.poolableSize;
       if (this.availableMemory + freeListSize >= size) {
           // we have enough unallocated or pooled memory to immediately
           // satisfy the request
           freeUp(size);
           this.availableMemory -= size;
           lock.unlock();
           return ByteBuffer.allocate(size);
      } else {
           // we are out of memory and will have to block
           int accumulated = 0;
           ByteBuffer buffer = null;
           Condition moreMemory = this.lock.newCondition();
           long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
           this.waiters.addLast(moreMemory);
           // loop over and over until we have a buffer or have reserved
           // enough memory to allocate one
           while (accumulated < size) {
               long startWaitNs = time.nanoseconds();
               long timeNs;
               boolean waitingTimeElapsed;
               try {
                   waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
              } catch (InterruptedException e) {
                   this.waiters.remove(moreMemory);
                   throw e;
              } finally {
                   long endWaitNs = time.nanoseconds();
                   timeNs = Math.max(0L, endWaitNs - startWaitNs);
                   this.waitTime.record(timeNs, time.milliseconds());
              }

               if (waitingTimeElapsed) {
                   this.waiters.remove(moreMemory);
                   throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
              }

               remainingTimeToBlockNs -= timeNs;
               // check if we can satisfy this request from the free list,
               // otherwise allocate memory
               if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                   // just grab a buffer from the free list
                   buffer = this.free.pollFirst();
                   accumulated = size;
              } else {
                   // we'll need to allocate memory, but we may only get
                   // part of what we need on this iteration
                   freeUp(size - accumulated);
                   int got = (int) Math.min(size - accumulated, this.availableMemory);
                   this.availableMemory -= got;
                   accumulated += got;
              }
          }

           // remove the condition for this thread to let the next thread
           // in line start getting memory
           Condition removed = this.waiters.removeFirst();
           if (removed != moreMemory)
               throw new IllegalStateException("Wrong condition: this shouldn't happen.");

           // signal any additional waiters if there is more memory left
           // over for them
           if (this.availableMemory > 0 || !this.free.isEmpty()) {
               if (!this.waiters.isEmpty())
                   this.waiters.peekFirst().signal();
          }

           // unlock and return the buffer
           lock.unlock();
           if (buffer == null)
               return ByteBuffer.allocate(size);
           else
               return buffer;
      }
  } finally {
       if (lock.isHeldByCurrentThread())
           lock.unlock();
  }
}

這個方法比較長,但是邏輯比較清晰,整體分爲一個大的 if-else 主要脈絡如下:

1)最外層的 if 主要邏輯是:如果 free 隊列存在空閒內存,直接使用,否則創建一塊大小爲 size 的 ByteBuffer,可用內存會扣減相應值

2)else 主要邏輯是:如果總緩衝區的內存 32MB 都使用完了,線程需要通過 Condition 隊列進行排隊等待,獲取 ByteBuffer

整體如下圖所示:

我們分別來看下細節,首先是第一段邏輯:

    //如果free隊列存在空閒內存,直接使用
    if (size == poolableSize && !this.free.isEmpty())
           return this.free.pollFirst();
           
   // now check if the request is immediately satisfiable with the
   // memory on hand or if we need to block
   int freeListSize = this.free.size() * this.poolableSize;
   if (this.availableMemory + freeListSize >= size) {
      //創建一塊大小爲size的ByteBuffer,可用內存會扣減相應值
       // we have enough unallocated or pooled memory to immediately
       // satisfy the request
       freeUp(size);
       this.availableMemory -= size;
       lock.unlock();
       return ByteBuffer.allocate(size);
  }

這塊邏輯很簡單。獲取 ByteBuffer 的方式不是從 free 隊列就是新創建。

但是這裏有一個問題,free 隊列什麼時候有值的?

其實可以猜到,當從緩衝區發送出去消息後,會清空 ByteBuffer,之後就會空閒這塊內存,自然也就會加入 free 這個隊列中了。你可以搜索下這個 free 隊列的引用自己大體看下。之後分析如何發送緩衝器中的消息時會帶大家看到的。

剩下的第二段邏輯是總內存不夠用的時候線程排隊等待,之後喚醒的邏輯。這塊邏輯考慮很多特殊邏輯,看上去比較複雜。

// we are out of memory and will have to block
           int accumulated = 0;
           ByteBuffer buffer = null;
           Condition moreMemory = this.lock.newCondition();
           long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
           this.waiters.addLast(moreMemory);
           // loop over and over until we have a buffer or have reserved
           // enough memory to allocate one
           while (accumulated < size) {
               long startWaitNs = time.nanoseconds();
               long timeNs;
               boolean waitingTimeElapsed;
               try {
                   waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
              } catch (InterruptedException e) {
                   this.waiters.remove(moreMemory);
                   throw e;
              } finally {
                   long endWaitNs = time.nanoseconds();
                   timeNs = Math.max(0L, endWaitNs - startWaitNs);
                   this.waitTime.record(timeNs, time.milliseconds());
              }

               if (waitingTimeElapsed) {
                   this.waiters.remove(moreMemory);
                   throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
              }

               remainingTimeToBlockNs -= timeNs;
               // check if we can satisfy this request from the free list,
               // otherwise allocate memory
               if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                   // just grab a buffer from the free list
                   buffer = this.free.pollFirst();
                   accumulated = size;
              } else {
                   // we'll need to allocate memory, but we may only get
                   // part of what we need on this iteration
                   freeUp(size - accumulated);
                   int got = (int) Math.min(size - accumulated, this.availableMemory);
                   this.availableMemory -= got;
                   accumulated += got;
              }
          }

           // remove the condition for this thread to let the next thread
           // in line start getting memory
           Condition removed = this.waiters.removeFirst();
           if (removed != moreMemory)
               throw new IllegalStateException("Wrong condition: this shouldn't happen.");

           // signal any additional waiters if there is more memory left
           // over for them
           if (this.availableMemory > 0 || !this.free.isEmpty()) {
               if (!this.waiters.isEmpty())
                   this.waiters.peekFirst().signal();
          }

           // unlock and return the buffer
           lock.unlock();
           if (buffer == null)
               return ByteBuffer.allocate(size);
           else
               return buffer;
      }

但是當你梳理清楚後,發現**其實本質就是 Condition 的 await 和 signal 而已。而且這裏有一個最大的等待超時時間,超時後會拋出異常。**具體就不一步一步帶大家分析了,我們肯定是儘量避免這種情況的。大體邏輯總結如下圖:

Condition 這個 waiter 隊列如何被喚醒的呢?其實和 free 內存增加是一樣的,當發送消息之後,內存使用完成,有可用內存之後,自然會被喚醒,之後分析如何發送緩衝器中的消息時會帶大家看到的。如下所示:

小結

好了, 到這裏,內存緩衝器 RecordAccumulator 通過 BufferPool 申請內存的源碼原理基本就分析完了。你主要知道了:

BufferPool 的創建直接內存 ByteBuffer 的原因

兩個核心的參數 batchSize=16kb,bufferMemory=32MB

核心數據結構 Deque waiters 和 Dequefree。

每一塊 ByteBuffer 的大小計算邏輯

如何申請和重用內存 ByteBuffer 的邏輯

下一節我們繼續來分析發送消息的內存緩衝器原理—tryAppend 的邏輯。之後如何打包消息,並將打包好的消息發送出去的。消息的最終序列化格式和 NIO 的拆包粘包問題。大家敬請期待!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/2gyb1xhACwFElkyv6Go3Ow