圖解 Kafka 緩存架構

大家好,我是 蘇三, 又跟大家見面了。

上篇主要帶大家深度剖析了「Kafka 網絡層收發總流程」,今天主要聊聊 「Kafka 客戶端消息緩存架構設計」,深度剖析下消息是如何進行緩存的。

認真讀完這篇文章,我相信你會對 Kafka 客戶端緩存架構的源碼有更加深刻的理解。

這篇文章乾貨很多,希望你可以耐心讀完。

01 總體概述

通過場景驅動的方式,當被髮送消息通過網絡請求封裝、NIO 多路複用器監聽網絡讀寫事件並進行消息網絡收發後,回頭來看看消息是如何在客戶端緩存的?

大家都知道 Kafka 是一款超高吞吐量的消息系統,主要體現在「異步發送」、「批量發送」、「消息壓縮」。

跟本篇相關的是「批量發送」即生產者會將消息緩存起來,等滿足一定條件後,Sender 子線程再把消息批量發送給 Kafka Broker。

這樣好處就是「儘量減少網絡請求次數,提升網絡吞吐量」。

爲了方便大家理解,所有的源碼只保留骨幹。

02 消息如何在客戶端緩存的

既然是批量發送,那麼消息肯定要進行緩存的,那消息被緩存在哪裏呢?又是如何管理的?

通過下面簡化流程圖可以看出,待發送消息主要被緩存在 RecordAccumulator 裏。

我以一個真實生活場景類比解說一下會更好理解。

既然說 RecordAccumulator 像一個累積消息的倉庫,就拿快遞倉庫類比。

上圖是一個快遞倉庫,堆滿了貨物。可以看到分揀員不同目的地的包裹放入對應目的地的貨箱,每裝滿一箱就放置在對應的區域。

那麼分揀員就是指 RecordAccumulator,而貨箱以及各自所屬的堆放區域,就是 RecordAccumulator 中緩存消息的地方。所有封箱的都會等待 sender 來取貨發送出去。

如果你看懂了上圖,就大概理解了 RecordAccumulator 的架構設計和運行邏輯。

總結下倉庫裏有什麼:

  1. 分揀員

  2. 貨物

  3. 目的地

  4. 貨箱

  5. 堆放區域

記住這些概念,都會體現在源碼裏,流程如下圖所示:

從上面圖中可以看出:

  1. 至少有一個業務主線程和一個 sender 線程同時操作 RecordAccumulator,所以它必須是線程安全的。

  2. 在它裏面有一個 ConcurrentMap 集合「Kafka 自定義的 CopyOnWriteMap」。key:TopicPartiton, value:Deque,即以主題分區爲單元,把消息以 ProducerBatch 爲單位累積緩存,多個 ProducerBatch 保存在 Deque 隊列中。當 Deque 中最新的 batch 不能容納消息時,就會創建新的 batch 來繼續緩存,並將其加入 Deque。

  3. 通過 ProducerBatch 進行緩存數據,爲了減少頻繁申請銷燬內存造成 Full GC 問題,Kafka 設計了經典的「緩存池 BufferPool 機制」。

綜上可以得出 RecordAccumulator 類中有三個重要的組件:「消息批次 ProducerBatch」、「自定義 CopyOnWriteMap」、「緩存池 BufferPool 機制」。

由於篇幅原因,RecordAccumulator 類放到下篇來講解

先來看看 ProducerBatch,它是消息緩存及發送消息的最小單位

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java

通過調用關係可以看出,ProducerBatch 依賴 MemoryRecordsBuilder,而 MemoryRecordsBuilder 依賴 MemoryRecords 構建,所以 「MemoryRecords 纔是真正用來保存消息的地方」。

02.1 MemoryRecords

import java.nio.ByteBuffer;
public class MemoryRecords extends AbstractRecords {
  public static MemoryRecordsBuilder builder(..){
        // 重載builder 
        return builder(...);
  }
    
  public static MemoryRecordsBuilder builder(
    ByteBuffer buffer,
    // 消息版本
    byte magic,
    // 消息壓縮類型
    CompressionType compressionType,
    // 時間戳
    TimestampType timestampType,
    // 基本位移
    long baseOffset,
    // 日誌追加時間
    long logAppendTime,
    // 生產者id
    long producerId,
    // 生產者版本
    short producerEpoch,
    // 批次序列號
    int baseSequence,
    boolean isTransactional,
    // 是否是控制類的批次
    boolean isControlBatch,
    // 分區leader的版本
    int partitionLeaderEpoch) {
        // 初始化MemoryRecordsBuilder類
        return new MemoryRecordsBuilder(...);
  }
}

該類比較簡單,通過 builder 方法可以看出依賴 ByteBuffer 來存儲消息。MemoryRecordsBuilder 類的構建是通過 MemoryRecords.builder() 來初始化的。

來看看 MemoryRecordsBuilder 類的實現。

02.2 MemoryRecordBuilder

public class MemoryRecordsBuilder implements AutoCloseable {
    // 寫操作關閉的輸出流
    private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
        // 當向某個ByteBuffer關閉輸出流寫數據時拋異常
        public void write(int b) {
            throw new ...;
        }
    });
    // 日誌時間
    private final TimestampType timestampType;
    // 消息壓縮類型
    private final CompressionType compressionType;
    // kafka對OutputStream接口的實現類,對ByteBuffer實現了自動擴容功能
    private final ByteBufferOutputStream bufferStream;
    // 消息的版本
    private final byte magic;
    // ByteBuffer的最初始位置
    private final int initialPosition;
    // 基本位移
    private final long baseOffset;
    // 消息追加的時間
    private final long logAppendTime;
    // 是否是控制類的批次
    private final boolean isControlBatch;
    // 分區leader的版本
    private final int partitionLeaderEpoch;
    // 寫入上限
    private final int writeLimit;
    // batch頭大小字節數
    private final int batchHeaderSizeInBytes;
    // 評估壓縮率
    private float estimatedCompressionRatio = 1.0F;
    // 對bufferStream添加壓縮功能
    private DataOutputStream appendStream;
    // 是否是事務批次
    private boolean isTransactional;
    // 生產者id
    private long producerId;
    // 生產者版本
    private short producerEpoch;
    // 批次序列號
    private int baseSequence;
    // 壓縮前要寫入的消息體大小字節數
    private int uncompressedRecordsSizeInBytes = 0; 
    // 壓縮前寫入的記錄數(不包括頭)
    private int numRecords = 0;
    // 實際壓縮率
    private float actualCompressionRatio = 1;
    // 最大時間戳
    private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
    // 最大時間戳偏移量
    private long offsetOfMaxTimestamp = -1;
    // 最後的偏移量
    private Long lastOffset = null;
    // 第一次追加消息的時間戳
    private Long firstTimestamp = null;
    // 真正保存消息的地方
    private MemoryRecords builtRecords;

從該類屬性字段來看比較多,這裏只講 2 個關於字節流的字段。

  1. CLOSED_STREAM:當關閉某個 ByteBuffer 也會把它對應的寫操作輸出流設置爲 CLOSED_STREAM,目的就是防止再向該 ByteBuffer 寫數據,否則就拋異常。

  2. bufferStream:首先 MemoryRecordsBuilder 依賴 ByteBuffer 來完成消息存儲。它會將 ByteBuffer 封裝成 ByteBufferOutputStream 並實現了 Java NIO 的 OutputStream,這樣就可以按照流的方式寫數據了。同時 ByteBufferOutputStream 提供了自動擴容 ByteBuffer 能力

來看看它的初始化構造方法。

public MemoryRecordsBuilder(ByteBuffer buffer,...) {
    // 將MemoryRecordsBuilder關聯的ByteBuffer封裝成ByteBufferOutputStream流
    this(new ByteBufferOutputStream(buffer), ...);
}

// 構造方法
public MemoryRecordsBuilder(
    ByteBufferOutputStream bufferStream,
    ...
    int writeLimit) {
        ....
        // 初始位置
        this.initialPosition = bufferStream.position();
        // 1. 根據不同消息版本計算批次Batch頭的長度
        this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
        // 2. 調整對應的position
        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        this.bufferStream = bufferStream;
        // 3. 在bufferStream流外層套一層壓縮流,再套一層DataOutputStream流
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }
}

從構造函數可以看出,除了基本字段的賦值之外,會做以下 3 件事情

  1. 根據消息版本、壓縮類型來計算批次 Batch 頭的大小長度

  2. 通過調整 bufferStream 的 position,使其跳過 Batch 頭部位置,就可以直接寫入消息了。

  3. 對 bufferStream 增加壓縮功能

看到這裏,挺有意思的,不知讀者是否意識到這裏涉及到 「ByteBuffer」、「bufferStream」 、「appendStream」。

三者的關係是通過「裝飾器模式」實現的,即 bufferStream 對 ByteBuffer 裝飾實現擴容功能,而 appendStream 又對 bufferStream 裝飾實現壓縮功能。

來看看它的核心方法。

02.2.1 appendWithOffset()

// 追加新記錄
public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
   return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}

// 計算下一個連續偏移量
private long nextSequentialOffset() {
  // lastOffset用來記錄當前寫入Record的offset,每次當有新Record寫入時,都會遞增它。
  return lastOffset == null ? baseOffset : lastOffset + 1;
}

// 根據偏移量追加消息
private Long appendWithOffset(
  long offset,
  boolean isControlRecord, 
  long timestamp, 
  ByteBuffer key,
  ByteBuffer value, 
  Header[] headers) {
    try {
        // 檢查isControl標誌是否一致
        if (isControlRecord != isControlBatch)
            throw new ...;
        // 保證offset是遞增的
        if (lastOffset != null && offset <= lastOffset)
            throw new ...;
        // 檢查時間戳      
        if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
           throw new ...;
        // 只有V2版本纔有header
        if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
           throw new ...;
        // 更新firstTimestamp
        if (firstTimestamp == null)
           firstTimestamp = timestamp;
        // V2版本消息寫入
        if (magic > RecordBatch.MAGIC_VALUE_V1)         {
            appendDefaultRecord(offset, timestamp, key, value, headers);
            return null;
        } else {
            //V0、V1 版本消息寫入(此處不進行剖析)
            return appendLegacyRecord(offset, timestamp, key, value, magic);
        }
    } catch (IOException e) {
        // 拋異常
    }
}

該方法主要用來根據偏移量追加寫消息,會根據消息版本來寫對應消息,但需要明確的是 ProducerBatch 對標 V2 版本

來看看 V2 版本消息寫入邏輯。

private void appendDefaultRecord(
  long offset, 
  long timestamp, 
  ByteBuffer key, 
  ByteBuffer value,
  Header[] headers) throws IOException {
    // 1. 檢查appendStream狀態是否可以寫
    ensureOpenForRecordAppend();
    // 2. 計算寫入多少偏移量
    int offsetDelta = (int) (offset - baseOffset);
    // 3.計算本次寫與第一次寫之間時間差
    long timestampDelta = timestamp - firstTimestamp;
    // 4.使用DefaultRecord.writeTo()方法會按照V2 版本格式寫入appendStream流中,並返回壓縮前的消息大小
    int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
    // 5. 消息寫入成功後更新RecordBatch的元信息
    recordWritten(offset, timestamp, sizeInBytes);
}

// 判斷appendStream狀態是否爲CLOSED_STREAM 
private void ensureOpenForRecordAppend() {
    if (appendStream == CLOSED_STREAM)
        throw new ...;
}

// 消息寫入成功後更新RecordBatch的元信息
private void recordWritten(long offset, long timestamp, int size) {
  ....
  // 壓縮前寫入的記錄數 + 1
  numRecords += 1;
  // 壓縮前要寫入的消息體大小字節數 + size
  uncompressedRecordsSizeInBytes += size;
  // 最後的偏移量 + offset
  lastOffset = offset;
  if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
      // 賦值最大時間戳
      maxTimestamp = timestamp;
      // 賦值最大時間戳偏移量
      offsetOfMaxTimestamp = offset;
  }
}

該方法主要用來寫入 V2 版本消息的,主要做以下 5 件事情

  1. 檢查是否可寫:判斷 appendStream 狀態是否爲 CLOSED_STREAM,如果不是就可寫,否則拋異常。

  2. 計算本次要寫入多少偏移量。

  3. 計算本次寫入和第一次寫的時間差。

  4. 按照 V2 版本格式寫入 appendStream 流中,並返回壓縮前的消息大小。

  5. 成功後更新 RecordBatch 的元信息

02.2.2 hasRoomFor()

public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
    // 檢查兩個狀態
    // (1)appendStream流狀態
    // (2)當前已經寫入的預估字節數是否超過了writeLimit寫入上限
    if (isFull())
        return false;
    // 每個RecordBatch至少可以寫入一個Record,此時如果一個Record都沒有,則可以繼續寫入
    if (numRecords == 0)
        return true;
    final int recordSize;
    if (magic < RecordBatch.MAGIC_VALUE_V2) {
        // 預估V0、V1舊版本的Record大小
        recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
    } else {
        // 預估V2版本寫入的Record大小
        int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
        ...
        recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
    }

    // 已寫入字節數 + 本次寫入Record的預估字節數不能超過writeLimit寫入上限
    return this.writeLimit >= estimatedBytesWritten() + recordSize;
}

public boolean isFull() {
      return appendStream == CLOSED_STREAM || 
      (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}

該方法主要用來估計當前 MemoryRecordsBuilder 是否還有空間來容納要寫入的 Record,會在下面 ProducerBatch.tryAppend() 裏面調用。

最後來看看小節開始提到的自動擴容功能

02.2.3 expandBuffer()

public class ByteBufferOutputStream extends OutputStream {
   // 擴容因子1.1倍
   private static final float REALLOCATION_FACTOR = 1.1f;
   // 初始容量
   private final int initialCapacity;
   // 初始位置
   private final int initialPosition;
   // 計算是否需要擴容
   public void ensureRemaining(int remainingBytesRequired) {
     // 當寫入字節數大於buffer當前剩餘字節數就開啓擴容
     if (remainingBytesRequired > buffer.remaining())
     expandBuffer(remainingBytesRequired);
  }
  
  // 擴容
  private void expandBuffer(int remainingRequired) {
    // 1. 評估需要多少空間
    int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired);
    // 2. 申請新的ByteBuffer
    ByteBuffer temp = ByteBuffer.allocate(expandSize);
    // 3. 獲取寫入上限
    int limit = limit();
    // 4. 寫狀態轉換爲讀狀態
    buffer.flip();
    // 5. 將buffer讀到新申請的temp裏
    temp.put(buffer);
    // 6. 修改寫模式的limit上限
    buffer.limit(limit);
    // 7. 更新原來的buffer的position,防止被重複消費
    buffer.position(initialPosition);
    // 8. 將引用指向新申請的ByteBuffer
    buffer = temp;
  }
}

該方法主要用來判斷是否需要擴容 ByteBuffer 的,即當寫入字節數大於 buffer 當前剩餘字節數就開啓擴容,擴容需要做以下 3 件事情

  1. 評估需要多少空間: 在「擴容空間」、「真正需要多少字節」之間取最大值,此處通過「擴容因子」來計算主要是因爲擴容是需要消耗系統資源的,如果每次都按實際數據大小來進行分配空間,會浪費不必要的系統資源。

  2. 申請新的空間:根據擴容多少申請新的 ByteBuffer,然後將原來的 ByteBuffer 數據拷貝進去,對應源碼步驟:「3 - 7」。

  3. 最後將引用指向新申請的 ByteBuffer。

接下來看看 ProducerBatch 的實現。

02.3 ProducerBatch

public final class ProducerBatch {
    // 批次最終狀態
    private enum FinalState { ABORTED, FAILED, SUCCEEDED }
    // 批次創建時間  
    final long createdMs;
    // 批次對應的主題分區
    final TopicPartition topicPartition;
    // 請求結果的future
    final ProduceRequestResult produceFuture;
    // 用來存儲消息的callback和響應數據
    private final List<Thunk> thunks = new ArrayList<>();
    // 封裝MemoryRecords對象,用來存儲消息的ByteBuffer
    private final MemoryRecordsBuilder recordsBuilder;
    // batch的失敗重試次數
    private final AtomicInteger attempts = new AtomicInteger(0);
    // 是否是被分裂的批次
    private final boolean isSplitBatch;
    // ProducerBatch的最終狀態
    private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
    // Record個數
    int recordCount;
    // 最大Record字節數
    int maxRecordSize;
    // 最後一次失敗重試發送的時間戳
    private long lastAttemptMs;
    // 最後一次向該ProducerBatch追加Record的時間戳
    private long lastAppendTime;
    // Sender子線程拉取批次的時間
    private long drainedMs;
    // 是否正在重試過,如果ProducerBatch中的數據發送失敗,則會重新嘗試發送
    private boolean retry;
}

// 構造函數
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
    ...
    // 請求結果的future
    this.produceFuture = new ProduceRequestResult(topicPartition);
    ...
}

一個 ProducerBatch 會存放一條或多條消息,通常把它稱爲「批次消息」。

先來看看幾個重要字段:

  1. topicPartition:批次對應的主題分區,當前 ProducerBatch 中緩存的 Record 都會發送給該 TopicPartition。

  2. produceFuture:請求結果的 Future,通過 ProduceRequestResult 類實現。

  3. thunks:Thunk 對象集合,用來存儲消息的 callback 和每個 Record 關聯的 Feture 響應數據。

  4. recordsBuilder:封裝 MemoryRecords 對象,用來存儲消息的 ByteBuffer。

  5. attemps:batch 的失敗重試次數,通過 AtomicInteger 提供原子操作來進行 Integer 的使用,適合高併發情況下的使用

  6. isSplitBatch:是否是被分裂的批次,因單個消息過大導致一個 ProducerBatch 存不下,被分裂成多個 ProducerBatch 來存儲的情況。

  7. drainedMs:Sender 子線程拉取批次的時間。

  8. retry:如果 ProducerBatch 中的數據發送失敗,則會重新嘗試發送。

在構造函數中,有個重要的依賴組件就是 「ProduceRequestResult」,而它是「異步獲取消息生產結果的類」,簡單剖析下。

02.3.1 ProduceRequestResult 類

public class ProduceRequestResult {
    // 通過一個count爲1的CountDownLatch對象間接地實現了Future的功能。
    private final CountDownLatch latch = new CountDownLatch(1);
    private final TopicPartition topicPartition;
    // 用來記錄broker端關聯ProducerBatch中第一條Record分配的offset值
    // 這樣每個Record的真實offset就可以根據自身在ProducerBatch的位置計算出來了(baseOffset + relativeOffset)
    private volatile Long baseOffset = null;
    
    // 構造函數
    public ProduceRequestResult(TopicPartition topicPartition) {
        this.topicPartition = topicPartition;
    }
    // 當等到響應會會調該函數喚醒阻塞的主線程
    public void done() {
        if (baseOffset == null)
            throw new ...;
        this.latch.countDown();
    }
    // 調用await()方法的線程會被掛起,它會等待直到count值爲0才繼續執行
    public void await() throws InterruptedException {
        latch.await();
    }
    // 和await()類似,只不過等待一定的時間後count值還沒變爲0的話就會繼續執行
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

該類通過 CountDownLatch(1) 間接地實現了 Future 功能,並讓其他所有線程都在這個鎖上等待,此時只需要調用一次 countDown() 方法就可以讓其他所有等待的線程同時恢復執行。

當 Producer 發送消息時會間接調用「ProduceRequestResult.await」,此時線程就會等待服務端的響應。當服務端響應時調用「ProduceRequestResult.done」,該方法調用了「CountDownLatch.countDown」喚醒了阻塞在「CountDownLatch.await」上的主線程。這些線程後續可以通過 ProduceRequestResult 的 error 字段來判斷本次請求成功還是失敗。

接下來看看 ProducerBatch 類的重要方法。

02.3.2 tryAppend()

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    // 1.檢查MemoryRecordsBuilder是否還有空間寫入
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        // 2.調用append()方法寫入Record
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        // 3. 更新最大Record字節數
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),recordsBuilder.compressionType(), key, value, headers));
        ...
        // 4.構建FutureRecordMetadata對象
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length, Time.SYSTEM);
        // 5. 將Callback和FutureRecordMetadata記錄到thunks集合中
        thunks.add(new Thunk(callback, future));
        // 6. 更新Record記錄數
        this.recordCount++;
        // 7. 返回FutureRecordMetadata
        return future;
    }
}

該方法主要用來嘗試追加寫消息的,主要做以下 6 件事情

  1. 通過 MemoryRecordsBuilder 的 hasRoomFor() 檢查當前 ProducerBatch 是否還有足夠的空間來存儲此次寫入的 Record。

  2. 調用 MemoryRecordsBuilder.append() 方法將 Record 追加到 ByteBuffer 中

  3. 創建 FutureRecordMetadata 對象,底層繼承了 Future 接口,對應此次 Record 的發送。

  4. 將 Future 和消息的 callback 回調封裝成 Thunk 對象,放入 thunks 集合中

  5. 更新 Record 記錄數。

  6. 返回 FutureRecordMetadata。

可以看出該方法只是讓 Producer 主線程完成了消息的緩存,並沒有實現真正的網絡發送

接下來簡單看看 FutureRecordMetadata,它實現了 JDK 中 concurrent 的 Future 接口。除了維護 ProduceRequestResult 對象外還維護了 relativeOffset 等字段,其中 relativeOffset 用來記錄對應 Record 在 ProducerBatch 中的偏移量

該類有 2 個值得注意的方法,get() 和 value()。

public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    ...
    // 依賴ProduceRequestResult的CountDown來實現阻塞等待
    boolean occurred = this.result.await(timeout, unit);
    ...
    // 調用value()方法返回RecordMetadata對象
    return valueOrError();
}
    
RecordMetadata valueOrError() throws ExecutionException {
    ...
    return value();
}

該方法主要依賴 ProduceRequestResult 的 CountDown 來實現阻塞等待,最後調用 value() 返回 RecordMetadata 對象。

RecordMetadata value() {
    ...
     // 將 partition、baseOffset、relativeOffset、時間戳(LogAppendTime | CreateTimeStamp)等信息封裝成 RecordMetadata 對象返回
    return new RecordMetadata(
      result.topicPartition(), 
      ...);
}

private long timestamp() {
    return result.hasLogAppendTime() ? result.logAppendTime() : createTimestamp;
}

該方法主要通過各種參數封裝成 RecordMetadata 對象返回。

瞭解了 ProducerBatch 是如何寫入數據的,我們再來看看 done() 方法。當 Producer 收到 Broker 端「正常」|「超時」|「異常」|「關閉生產者」等響應都會調用 ProducerBatch 的 done() 方法。

02.3.3 done()

public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
    // 1.根據exception決定本次ProducerBatch發送的最終狀態
    final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
    ....
    // 2.通過CAS操作更新finalState狀態,只有第一次更新的時候,纔會觸發completeFutureAndFireCallbacks()方法
    if (this.finalState.compareAndSet(null, tryFinalState)) {
        // 3.執行回調
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
    ....
    return false;
}

該方法主要用來是否可以執行回調操作,即當收到該批次響應後,判斷批次 Batch 最終狀態是否可以執行回調操作。

03.3.4 completeFutureAndFireCallbacks()

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
  // 1.更新ProduceRequestResult中的相關字段
  produceFuture.set(baseOffset, logAppendTime, exception);

  // 2.遍歷thunks集合,觸發每個Record的Callback回調
  for (Thunk thunk : thunks) {
      try {
          if (exception == null) {
           // 3.獲取消息元數據
           RecordMetadata metadata = thunk.future.value();
           if (thunk.callback != null)
             //4.調用回調方法
             thunk.callback.onCompletion(metadata, null);
          } else {
              if (thunk.callback != null)
                  // 4.調用回調方法
                  thunk.callback.onCompletion(null, exception);
          }
      } 
      ....
  }
  // 4.調用底層 CountDownLatch.countDown()方法,阻塞在其上的主線程。
  produceFuture.done();
}

該方法主要用來調用回調方法和完成 future,主要做以下 3 件事情

  1. 更新 ProduceRequestResult 中的相關字段,包括基本位移、消息追加的時間、異常。

  2. 遍歷 thunks 集合,觸發每個 Record 的 Callback 回調。

  3. 調用底層 CountDownLatch.countDown() 方法,阻塞在其上的主線程。

至此我們已經講解了 ProducerBatch 「如何緩存消息」、「如何處理響應」、「如何處理回調」三個最重要方法。

通過一張圖來描述下緩存消息的存儲結構:

接下來看看 Kafka 生產端最經典的 「緩衝池架構」。

03 客戶端緩存池架構設計

爲什麼客戶端需要緩存池這個經典架構設計呢?

主要原因就是頻繁的創建和釋放 ProducerBatch 會導致 Full GC 問題,所以 Kafka 針對這個問題實現了一個非常優秀的機制,就是「緩存池 BufferPool 機制」。即每個 Batch 底層都對應一塊內存空間,這個內存空間就是專門用來存放消息,用完歸還就行。

接下來看看緩存池的源碼設計。

03.1 BufferPool

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java

public class BufferPool {
  // 整個BufferPool總內存大小 默認32M
  private final long totalMemory;
  // 當前BufferPool管理的單個ByteBuffer大小,16k 
  private final int poolableSize;
  // 因爲有多線程併發分配和回收ByteBuffer,用鎖控制併發,保證線程安全。
  private final ReentrantLock lock;
  // 對應一個ArrayDeque<ByteBuffer> 隊列,其中緩存了固定大小的 ByteBuffer 對象
  private final Deque<ByteBuffer> free;
  // 此隊列記錄因申請不到足夠空間而阻塞的線程對應的Condition 對象
  private final Deque<Condition> waiters;
  // 非池化可用的內存即totalMemory減去free列表中的全部ByteBuffer的大小
  private long nonPooledAvailableMemory;
  // 構造函數
  public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
    ...
    // 總的內存
    this.totalMemory = memory;
    // 默認的池外內存,就是總的內存
    this.nonPooledAvailableMemory = memory;
  }
}

先來看看上面幾個重要字段:

  1. totalMemory:整個 BufferPool 內存大小「buffer.memory」,默認是 32M。

  2. poolableSize:池化緩存池一塊內存塊的大小「batch.size」,默認是 16k。

  3. lock:當有多線程併發分配和回收 ByteBuffer 時,爲了保證線程的安全,使用鎖來控制併發。

  4. free:池化的 free 隊列,其中緩存了指定大小的 ByteBuffer 對象。

  5. waiters:阻塞線程對應的 Condition 隊列,當有申請不到足夠內存的線程時,爲了等待其他線程釋放內存而阻塞等待,對應的 Condition 對象會進入該隊列。

  6. nonPooledAvailableMemory:非池化可用內存。

可以看出它只會針對固定大小「poolableSize 16k」的 ByteBuffer 進行管理,ArrayDeque 的初始化大小是 16,此時 BufferPool 的狀態如下圖:

接下來看看 BufferPool 的重要方法。

03.1.1 allocate()

// 分配指定空間的緩存,如果緩衝區中沒有足夠的空閒空間,那麼會阻塞線程,直到超時或得到足夠空間
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
  // 1.判斷申請的內存是否大於總內存
  if (size > this.totalMemory)
      throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of "+ this.totalMemory + " on memory allocations.");
  // 初始化buffer
  ByteBuffer buffer = null;
  // 2.加鎖,保證線程安全。
  this.lock.lock();
  // 如果當前BufferPool處於關閉狀態,則直接拋出異常
  if (this.closed) {
      this.lock.unlock();
      throw new KafkaException("Producer closed while allocating memory");
  }
  ....
  try {
      // 3.申請內存大小恰好爲16k 且free緩存池不爲空
      if (size == poolableSize && !this.free.isEmpty())
      // 從free隊列取出一個ByteBuffer
      return this.free.pollFirst();
      
      // 對於申請內存大小非16k情況
      // 先計算free緩存池總空間大小,判斷是否足夠
      int freeListSize = freeSize() * this.poolableSize;
      // 4.當前BufferPool能夠釋放出申請內存大小的空間
      if (this.nonPooledAvailableMemory + freeListSize >= size) {
          // 5.如果size大於非池化可用內存大小,就循環從free緩存池裏釋放出來空閒Bytebuffer補充到nonPooledAvailableMemory中,直到滿足size大小爲止。
          freeUp(size);
          // 釋放非池化可用內存大小
          this.nonPooledAvailableMemory -= size;
      } else {
          // 如果當前BufferPool不夠提供申請內存大小,則需要阻塞當前線程
          // 累計已經釋放的內存
          int accumulated = 0;
          // 創建對應的Condition,阻塞自己等待別的線程釋放內存
          Condition moreMemory = this.lock.newCondition();
          try {
              // 計算當前線程最大阻塞時長
              long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
              // 把自己添加到等待隊列中末尾,保持公平性,先來的先獲取內存,防止飢餓
              this.waiters.addLast(moreMemory);
              // 循環等待直到分配成功或超時
              while (accumulated < size) {
                  ....
                  try {
                    // 當前線程阻塞等待,返回結果爲false則表示阻塞超時
                   waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                  } finally {
                      ....
                  }
                  ....   
                  // 申請內存大小是16k,且free緩存池有了空閒的ByteBuffer
                  if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                    // 從free隊列取出一個ByteBuffer
                    buffer = this.free.pollFirst();
                    // 計算累加器
                    accumulated = size;
                  } else {
                      // 釋放空間給非池化可用內存,並繼續等待空閒空間,如果分配多了只取夠size的空間
                      freeUp(size - accumulated);
                      int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                      // 釋放非池化可用內存大小
                      this.nonPooledAvailableMemory -= got;
                      // 累計分配了多少空間
                      accumulated += got;
                  }
              }
              accumulated = 0;
          } finally {
              // 如果循環有異常,將已釋放的空間歸還給非池化可用內存
              this.nonPooledAvailableMemory += accumulated;
              //把自己從等待隊列中移除並結束
              this.waiters.remove(moreMemory);
          }
      }
  } finally {
     // 當非池化可用內存有內存或free緩存池有空閒ByteBufer且等待隊列裏有線程正在等待
      try {
          if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
           // 喚醒隊列里正在等待的線程
           this.waiters.peekFirst().signal();
      } finally {
          // 解鎖
          lock.unlock();
      }
  }
  // 說明空間足夠,並且有足夠空閒的了。可以執行真正的分配空間了。
  if (buffer == null)
      // 沒有正好的buffer,從緩衝區外(JVM Heap)中直接分配內存
      return safeAllocateByteBuffer(size);
  else
      // 直接複用free緩存池的ByteBuffer
      return buffer;
}

private ByteBuffer safeAllocateByteBuffer(int size) {
  boolean error = true;
  try {
      //分配空間
      ByteBuffer buffer = allocateByteBuffer(size);
      error = false;
      //返回buffer
      return buffer;
  } finally {
    if (error) {
        //分配失敗了, 加鎖,操作內存pool
        this.lock.lock();
        try {
            //歸還空間給非池化可用內存
            this.nonPooledAvailableMemory += size;
            if (!this.waiters.isEmpty())
                //有其他在等待的線程的話,喚醒其他線程
                this.waiters.peekFirst().signal();
        } finally {
            // 加鎖不忘解鎖
            this.lock.unlock();
        }
    }
  }
}

protected ByteBuffer allocateByteBuffer(int size) {
    // 從JVM Heap中分配空間
    return ByteBuffer.allocate(size);
}

// 不斷從free隊列中釋放空閒的ByteBuffer來補充非池化可用內存
private void freeUp(int size) {
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

該方法主要用來嘗試分配 ByteBuffer,這裏分 4 種情況說明下:

情況 1:申請 16k 且 free 緩存池有可用內存

此時會直接從 free 緩存池中獲取隊首的 ByteBuffer 分配使用,用完後直接將 ByteBuffer 放到 free 緩存池的隊尾中,並調用 clear() 清空數據,以便下次重複使用。

情況 2:申請 16k 且 free 緩存池無可用內存

此時 free 緩存池無可用內存,只能從非池化可用內存中獲取 16k 內存來分配,用完後直接將 ByteBuffer 放到 free 緩存池的隊尾中,並調用 clear() 清空數據,以便下次重複使用。

情況 3:申請非 16k 且 free 緩存池無可用內存

此時 free 緩存池無可用內存,且申請的是非 16k,只能從非池化可用內存 (空間夠分配) 中獲取一部分內存來分配,用完後直接將申請到的內存空間釋放到非池化可用內存中,後續會被 GC 掉

情況 4:申請非 16k 且 free 緩存池有可用內存,但非池化可用內存不夠

此時 free 緩存池有可用內存,但申請的是非 16k,先嚐試從 free 緩存池中將 ByteBuffer 釋放到非池化可用內存中,直到滿足申請內存大小 (size),然後從非池化可用內存獲取對應內存大小來分配,用完後直接將申請到的內存空間釋放到到非池化可用內存中,後續會被 GC 掉

03.1.2 deallocate()

public void deallocate(ByteBuffer buffer, int size) {
    // 1.加鎖,保證線程安全。
    lock.lock();
    try {
    // 2.如果待釋放的size大小爲16k,則直接放入free隊列中
        if (size == this.poolableSize && size == buffer.capacity()) {
            // 清空buffer
            buffer.clear();
            // 釋放buffer到free隊列裏
            this.free.add(buffer);
        } else {
            //如果非16k,則由JVM GC來回收ByteBuffer並增加非池化可用內存
            this.nonPooledAvailableMemory += size;
        }
        // 3.喚醒waiters中的第一個阻塞線程
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}

該方法主要用來嘗試釋放 ByteBuffer 空間,主要做以下幾件事情:

  1. 先加鎖,保證線程安全。

  2. 如果待釋放的 size 大小爲 16k,則直接放入 free 隊列中。

  3. 否則由 JVM GC 來回收 ByteBuffer 並增加 nonPooledAvailableMemory。

  4. 當有 ByteBuffer 回收了,喚醒 waiters 中的第一個阻塞線程。

最後來看看 kafka 自定義的支持「讀寫分離場景」CopyOnWriteMap 的實現。

03.2 CopyOnWriteMap

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java

通過 RecordAccumulator 類的屬性字段中可以看到,CopyOnWriteMap 中 key 爲主題分區,value 爲向這個分區發送的 Deque 隊列集合

我們知道生產消息時,要發送的分區是很少變動的,所以寫操作會很少。大部分情況都是先獲取分區對應的隊列,然後將 ProducerBatch 放入隊尾,所以讀操作是很頻繁的,這就是個典型的「讀多寫少」的場景。

所謂 「CopyOnWrite」 就是當寫的時候會拷貝一份來進行寫操作,寫完了再替換原來的集合。

來看看它的源碼實現。

  public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
    // volatile Map
    private volatile Map<K, V> map;
    // 構造函數
    public CopyOnWriteMap() {
        this.map = Collections.emptyMap();
    }

該類只有一個重要的字段 Map,是通過「volatile」來修飾的,目的就是在多線程的場景下,當 Map 發生變化的時候其他的線程都是可見的

接下來看幾個重要方法,都比較簡單,但是實現非常經典

03.2.1 get()

// 獲取集合中隊列
public V get(Object k) {
    return map.get(k);
}

該方法主要用來讀取集合中的隊列,可以看到讀操作並沒有加鎖,多線程併發讀取的場景並不會阻塞,可以實現高併發讀取。如果隊列已經存在了就直接返回即可。

03.2.2 putIfAbsent()

public synchronized V putIfAbsent(K k, V v) {
    if (!containsKey(k))
        return put(k, v);
    else
        return get(k);
}

// 判斷隊列是否存在
public boolean containsKey(Object k) {
    return map.containsKey(k);
}

該方法主要用來獲取或者設置隊列,會被多個線程併發執行,通過「synchronized」來修飾可以保證線程安全的,除非隊列不存在纔會去設置。

03.2.3 put()

public synchronized V put(K k, V v) {
    Map<K, V> copy = new HashMap<K, V>(this.map);
    V prev = copy.put(k, v);
    this.map = Collections.unmodifiableMap(copy);
    return prev;
}

該方法主要用來設置隊列的, put 時也是通過「synchronized」來修飾的,可以保證同一時間只有一個線程會來更新這個值。

那爲什麼說寫操作不會阻塞讀操作呢?

  1. 首先重新創建一個 HashMap 集合副本。

  2. 通過「volatile」寫的方式賦值給對應集合裏。

  3. 把新的集合設置成「不可修改的 map」, 並賦值給字段 map。

這就實現了讀寫分離。對於 Producer 最最核心,會出現多線程併發訪問的就是緩存池。因此這塊的高併發設計相當重要。

04 總結

這裏,我們一起來總結一下這篇文章的重點。

1、帶你先整體的梳理了 Kafka 客戶端消息批量發送的好處。

2、通過一個真實生活場景類比來帶你理解 RecordAccumulator 內部構造,並且深度剖析了消息是如何在客戶端緩存的,以及內部各組件實現原理。

3、帶你深度剖析了 Kafka 客戶端非常重要的 BufferPool 、CopyOnWriteMap 的實現原理。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/4EW-voCLF_oo8wAXtXNypQ