Kafka Producer 內存緩衝器原理 1

第 6 節

內存緩衝器原理 1

**  Kafka Producer 源碼原理**

之前我們分析了 Producer 的配置解析、組件分析、拉取元數據、消息的初步序列化方式、消息的路由策略。如下圖:

這一節我們繼續分析發送消息的內存緩衝器原理—RecordAccumulator.append()。

如何將消息放入內存緩衝器的?

在 doSend 中的,拉取元數據、消息的初步序列化方式、消息的路由策略之後就是 accumulator.append()。

如下代碼所示:(去除了多餘的日誌和異常處理,截取了核心代碼)

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
       TopicPartition tp = null;
       try {
           //拉取元數據、消息的初步序列化方式、消息的路由策略
           long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
           long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
           byte[] serializedKey = keySerializer.serialize(record.topic(), record.key());
           byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value());
           int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
           ensureValidRecordSize(serializedSize);
           tp = new TopicPartition(record.topic(), partition);
           long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
           Callback interceptCallback = this.interceptors == null ?
               callback : new InterceptorCallback<>(callback, this.interceptors, tp);
          // 將路由結果、初步序列化的消息放入到消息內存緩衝器中
           RecordAccumulator.RecordAppendResult result =
               accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
           if (result.batchIsFull || result.newBatchCreated) {
               this.sender.wakeup();
          }
           return result.future;
      } catch (Exception e) {
           throw e;
      }
       //省略其他各種異常捕獲
  }

accumulator.append() 它主要是將路由結果、初步序列化的消息放入到消息內存緩衝器中。

分析如何將消息放入內存緩衝器之前,需要回顧下它內部的基本結構。之前組件分析的時候,我們初步分析過 RecordAccumulator 的大體結構,如下圖:

1)設置了一些參數 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等

2)初始化了一些數據結構,比如 batches 是一個 new CopyOnWriteMap<>()

3)初始化了 BufferPool 和 IncompleteRecordBatches

回顧了 RecordAccumulator 這個組件之後,我們就來看看到底如何將消息放入內存緩衝器的數據結構中的。

public RecordAppendResult append(TopicPartition tp,
                                long timestamp,
                                byte[] key,
                                byte[] value,
                                Callback callback,
                                long maxTimeToBlock) throws InterruptedException {
   // We keep track of the number of appending thread to make sure we do not miss batches in
   // abortIncompleteBatches().
   appendsInProgress.incrementAndGet();
   try {
       // check if we have an in-progress batch
       Deque<RecordBatch> dq = getOrCreateDeque(tp);
       synchronized (dq) {
           if (closed)
               throw new IllegalStateException("Cannot send after the producer is closed.");
           RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
           if (appendResult != null)
               return appendResult;
      }

       // we don't have an in-progress record batch try to allocate a new batch
       int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
       log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
       ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
       synchronized (dq) {
           // Need to check if producer is closed again after grabbing the dequeue lock.
           if (closed)
               throw new IllegalStateException("Cannot send after the producer is closed.");

           RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
           if (appendResult != null) {
               // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
               free.deallocate(buffer);
               return appendResult;
          }
           MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
           RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
           FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

           dq.addLast(batch);
           incomplete.add(batch);
           return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
      }
  } finally {
       appendsInProgress.decrementAndGet();
  }
}

整個方法的脈絡,看着邏輯比較多,涉及了很多數據結構,我們一步一步來分析下。第一次看的話,大體你可以梳理如下脈絡:

1)getOrCreateDeque 這個方法應該是才創建一個雙端隊列,隊列放的每一個元素不是單條消息 Record,而是消息的集合 RecordBatch。

2)free.allocate 應該是在分配內存緩衝器中的內存

3)tryAppend 應該是將消息放入內存中

創建存放消息集合的隊列

在將消息放入內存緩衝器之前,首先通過 getOrCreateDeque 創建的是一個存放消息集合的隊列。代碼如下:

private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
public RecordAccumulator(int batchSize,
                        long totalSize,
                        CompressionType compression,
                        long lingerMs,
                        long retryBackoffMs,
                        Metrics metrics,
                        Time time) {
    //省略...
   this.batches = new CopyOnWriteMap<>();
    //省略...
}
/**
* Get the deque for the given topic-partition, creating it if necessary.
*/
private Deque<RecordBatch> getOrCreateDeque(TopicPartition tp) {
   Deque<RecordBatch> d = this.batches.get(tp);
   if (d != null)
       return d;
   d = new ArrayDeque<>();
   Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
   if (previous == null)
       return d;
   else
       return previous;
}

這個創建的內存結構可以看到,是**一個變量 batches,它是一個 CopyOnWriteMap****。**這個數據結構之前我們組件圖初步分析過。再結合這段代碼,不難理解它的脈絡:

這個 map 主要根據 Topic 分區信息作爲 key,value 是一個隊列核心數據結構是 RecordBatch,由於是第一次給某個 topic 分區發送的消息,value 爲空,需要初始化隊列,否則說明曾經給這個 topic 的分區發送給數據,value 非空,直接返回之前的隊列。

由於我們這裏是第一次向 test-topic 發送消息,所以可以得到下圖的數據結構:

之後執行了一段加鎖邏輯,之前提到,tryAppend 應該是將消息放入內存中。但是由於隊列是剛創建的,deque.peekLast(); 肯定是空,所以這段加鎖的代碼不會執行。

  synchronized (dq) {
      if (closed)
      throw new IllegalStateException("Cannot send after the producer is closed.");
      RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
      if (appendResult != null)
      return appendResult;
  }
   private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque){
       RecordBatch last = deque.peekLast();
       if (last != null) {
           FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
           if (future == null)
               last.records.close();
           else
               return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
      }
       return null;
  }

但是到這裏你會發現代碼一個明顯的特點:

在這些代碼紅,使用了的 synchronized 加鎖和線程安全的內存結構 CopyOnWriteMap,這些都是明顯線程安全的控制。

爲什麼呢?因爲同一個 Producer 可以使用多線程進行發送消息,必然要考慮線程安全的很多東西。

爲什麼選用 CopyOnWriteMap, 而不用 ConcurrentHashMap 呢?你可以思考下。(這裏給個提示,JDK 成長記提到過,CopyOnWriteMap 它的底層是寫時複製,適合讀多寫少的場景)

synchronized 加鎖代碼塊使用了,分段加鎖,並沒有暴力的在方法上加 synchronized。這也是一個使用亮點。

寫在結尾的話

到這裏,你會發現在中間件會大量的見到併發包下的組件的使用,工作中你用到可能都是鳳毛麟角,這些組件的使用是我們研究中間件源碼值得學習的一點。

你一定要多思考爲什麼,不要停留在是什麼,怎麼用上,這個思想需要刻意訓練,希望你可以慢慢養成。

好了,今天的內容就到這裏,之前有同學反饋,每一節的只是太過於幹了,實實在在的乾貨!看起來有時候會比較費勁,所以之後的章節儘量會避免上萬字的大章節,會控制在 6000 字左右。

另外,等端午過後,每週除了 2 更的 java 或者大數據技術的乾貨成長記外,我會分享我自己的故事和行業中遇見的事情,希望大家從我的經歷中可以有另一番成長和收穫,比如我是如何學習和提升技術的?我是如何畫圖的?我如何做技術分享的等等。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/E2X-gxMF9otxQFipThEIGA