Kafka Producer 核心組件分析

**  Kafka Producer 源碼原理**

第 2 節

Producer 核心組件分析

(長章節,這章我寫了一個週末也沒有寫完, 真的比較長,還不太好拆,希望大家耐心看到最後,對後面分析源碼會很有幫助的)

上一節我們主要從 HelloWorld 開始,分析了 Kafka Producer 的創建,重點分析瞭如何解析生產者配置的源碼原理。

   public KafkaProducer(Properties properties) {
       this(new ProducerConfig(properties), null, null);
  }

Kafka Producer 的創建除了配置解析,還有關鍵的一步就是調用了一個重載的構造函數。這一節我們就來看下它主要做了什麼。

KafkaProducer 初始化的哪些組件?

既然是一個關鍵組件創建,分析的構造函數,我們首要做的就是分析它的代碼脈絡,看看核心的組件有哪些,畫一個組件圖先。

讓我們來看下構造函數的代碼:

 private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
       try {
           log.trace("Starting the Kafka producer");
           Map<String, Object> userProvidedConfigs = config.originals();
           this.producerConfig = config;
           this.time = new SystemTime();

           clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
           if (clientId.length() <= 0)
               clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
           Map<String, String> metricTags = new LinkedHashMap<String, String>();
           metricTags.put("client-id", clientId);
           MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                  .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                  .tags(metricTags);
           List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                   MetricsReporter.class);
           reporters.add(new JmxReporter(JMX_PREFIX));
           this.metrics = new Metrics(metricConfig, reporters, time);
           this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
           long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
           this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
           this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
           this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
           this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
           /* check for user defined settings.
            * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
            * This should be removed with release 0.9 when the deprecated configs are removed.
            */
           if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
               log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
                       "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
               boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
               if (blockOnBufferFull) {
                   this.maxBlockTimeMs = Long.MAX_VALUE;
              } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                   log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                           "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                   this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
              } else {
                   this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
              }
          } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
               log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                       "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
               this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
          } else {
               this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
          }

           /* check for user defined settings.
            * If the TIME_OUT config is set use that for request timeout.
            * This should be removed with release 0.9
            */
           if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
               log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
                       ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
               this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
          } else {
               this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
          }

           this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                   this.totalMemorySize,
                   this.compressionType,
                   config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                   retryBackoffMs,
                   metrics,
                   time);
           List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
           this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
           ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
           NetworkClient client = new NetworkClient(
                   new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                   this.metadata,
                   clientId,
                   config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                   config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                   config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                   config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                   this.requestTimeoutMs, time);
           this.sender = new Sender(client,
                   this.metadata,
                   this.accumulator,
                   config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                   config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                  (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                   config.getInt(ProducerConfig.RETRIES_CONFIG),
                   this.metrics,
                   new SystemTime(),
                   clientId,
                   this.requestTimeoutMs);
           String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
           this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
           this.ioThread.start();

           this.errors = this.metrics.sensor("errors");

           if (keySerializer == null) {
               this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                       Serializer.class);
               this.keySerializer.configure(config.originals(), true);
          } else {
               config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
               this.keySerializer = keySerializer;
          }
           if (valueSerializer == null) {
               this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                       Serializer.class);
               this.valueSerializer.configure(config.originals(), false);
          } else {
               config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
               this.valueSerializer = valueSerializer;
          }

           // load interceptors and make sure they get clientId
           userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
           List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                   ProducerInterceptor.class);
           this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

           config.logUnused();
           AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
           log.debug("Kafka producer started");
      } catch (Throwable t) {
           // call close methods if internal objects are already constructed
           // this is to prevent resource leak. see KAFKA-2121
           close(0, TimeUnit.MILLISECONDS, true);
           // now propagate the exception
           throw new KafkaException("Failed to construct kafka producer", t);
      }
  }

這個構造函數的代碼還是比較都多的,不過沒關係,先掃一下它的脈絡:

1)主要是根據之前解析好的 ProducerConfig 對象,設置了一堆 Producer 的參數

2)new Metadata(),它應該算一個組件,從名字上猜測,應該是負責元數據相關的

3)new RecordAccumulator() 應該也是一個組件,暫時不知道是啥意思, 名字是翻譯下是記錄累加器

4)new NetworkClient()一看就是網絡通信相關的組件

5)new Sender()和 new new KafkaThread() 應該是創建了 Runnable, 並且使用 1 個線程啓動。看着像是發送消息的線程

6)new ProducerInterceptors() 貌似是攔截器相關的東西

你可以看到這個構造函數,基本核心脈絡就是上面 6 點了。我們可以畫一個組件圖小結下:

RecordAccumulator 到底是什麼?

知道了上面主要的組件主要有啥。RecordAccumulator 這個類沒看出來是啥意思,怎麼辦?看看有沒有類註釋。

/**
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
* instances to be sent to the server.
* 這個類可以使用隊列記錄Records,準備待發送的數據給Server(也就是Broker)
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
* 當沒有被禁用時,累加器由於使用了有限的內存,達到上限會阻塞。
*/
public final class RecordAccumulator {
   
}

看過註釋後,大體知道 RecordAccumulator, 它是個記錄累加器,這個記錄 Record 其實可以看做是一條消息的抽象封裝,也就是它是消息累加器,通過一個內存隊列緩存,做了一個緩衝,準備將這個數據發送給 Broker。所以我們就可以稱他爲發送消息的內存緩衝器

Metadata 元數據到底是什麼?

還有一個 Metadata 元數據這組件,有些人可能也不太清楚,元數據是指什麼,元數據就是指描述數據,比如我 mac 或 windows 文件的元數據,就是它的大小,位置,創建時間,修改時間等。

那 KafkaProducer 生產者的元數據是指什麼呢?這裏就要給大家回顧一個知識了:

Kafka 知識回顧 Tips:Topic、Partition、Record,Leader Partition、Follower Partition、Replica 是什麼?

這幾個是 kafka 管理消息涉及的基本概念。

Topic:Kafka 管理消息的邏輯結構,Topic 下可以有多個 Partition,用作分佈式存儲,用來支持海量數據。

Partition:多條消息存儲結構封裝,對應到磁盤上的一個個 log 文件。kafka 把消息存儲到磁盤的文件通常稱作 log,實際就是多條消息而已。

Record: 指每一條消息的抽象封裝。

Broker 通常有兩種角色,leader 和 follwer,爲了高可用。follower 是 leader 的副本。

Replica:副本,leader 和 follower 的都可以算是存放消息的一個副本,互爲備份。所以 replica 可以指 leader,也可以指 follower。

回顧了這幾個基本知識,來理解元數據就好多了。

要想發送消息給 Broker,起碼得知道發送到哪裏去。所以就需要描述信息,這些描述信息就是發送消息需要的元數據。

Producer 一般都需要從 broker 集羣去拉取元數據,包括了 Topic 中的 Partitions 信息,後面如果發送消息到 Topic,才知道這個 Topic 有哪些 Partitions,哪些是 Leader Partition 所在的 Broker。

組件圖最終如下所示:

Producer 核心組件—元數據 Metadata 剖析

既然我們知道了 Producer 主要初始化了上面的一些組件,那麼只要搞懂上面每個組件做了什麼,基本 Producer 的很多原理就能理解透徹了。

我們先來看下 Metadata 這個元數據組件做了什麼。

首先 Metadata 的創建很簡單,如下:

   /**
    * Create a new Metadata instance
    * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
    *       polling
    * 元數據刷新之間必須終止的最短時間,以避免繁忙的輪詢
    * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
    * 不刷新即可保留元數據的最長時間
    */
   public Metadata(long refreshBackoffMs, long metadataExpireMs) {
       this.refreshBackoffMs = refreshBackoffMs;
       this.metadataExpireMs = metadataExpireMs;
       this.lastRefreshMs = 0L;
       this.lastSuccessfulRefreshMs = 0L;
       this.version = 0;
       this.cluster = Cluster.empty();
       this.needUpdate = false;
       this.topics = new HashSet<String>();
       this.listeners = new ArrayList<>();
       this.needMetadataForAllTopics = false;
  }

這個構造函數,從註釋就說明了,這個元數據對象 Metadata 會被定時刷新,也就是說,它應該會定時的從 Broker 拉取核心的元數據到 Producer。

而它的脈絡就是

1)初始化了一些配置 ,根據名字和註釋基本都能從猜測出來含義

默認值就是在之前 ConfigDef 靜態變量初始化可以看到。

refreshBackoffMs 元數據刷新之間必須終止的最短時間,以避免繁忙的輪詢,默認 100ms

metadataExpireMs ,默認是每隔 5 分鐘拉取一次元數據。

lastRefreshMs 最近拉取元數據的時間戳

lastSuccessfulRefreshMs 最近拉取元數據成功的時間戳

version 元數據拉取的版本

Cluster 這個比較關鍵,是元數據信息的對象封裝

needUpdate 是否需要拉取標識

topics 記錄 topic 信息的集合

listeners 元數據變更的監聽回調

needMetadataForAllTopics 默認是一個 false,暫時不知道是做什麼的

2)初始化 Cluster 元數據對象

上面變量中,元數據最終封裝存放在了 Cluster 對象中。可以看下它會放了什麼數據:

/**
* A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/
public final class Cluster {

   private final boolean isBootstrapConfigured;
   //Kafka Broker節點
   private final List<Node> nodes;
   //沒有被授權訪問的Topic的列表
   private final Set<String> unauthorizedTopics;
   //TopicPartition:Topic和Partition基本關係信息
   //PartitionInfo:Partition的詳細信息,比如數據同步進度ISR列表、Leader、Follower節點信息等
   private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
   //每個topic有哪些分區
   private final Map<String, List<PartitionInfo>> partitionsByTopic;
   //每個topic有哪些當前可用的分區,如果某個分區沒有leader是存活的,此時那個分區就不可用了
   private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
   //每個broker上放了哪些分區
   private final Map<Integer, List<PartitionInfo>> partitionsByNode;
   //broker.id -> Node
   private final Map<Integer, Node> nodesById;
   
   //省略初始化方法
}

主要就是組成了整個 Kafka 集羣信息,比如

Node: 記錄了 Kafka Broker 的 ip,端口等

TopicPartition:Topic 和 Partition 基本關係信息

PartitionInfo:Partition 的詳細信息,比如數據同步進度 ISR 列表、Leader、Follower 節點信息等

其他的上面我也用註釋基本都標註了他們的大致意思了。大家大體有一個印象就行,其實只要知道都是 topic 的元數據就行了。

上面的信息你如果問我是怎麼知道的,很簡單,我 debug 了下,當後面拉取到元數據後,你可以看下數據,就明白了。debug 看源碼的方法在這個場景就比較適合,我們目前也沒有下載源碼,導入源碼,只需要寫一個 helloWorld,通過 maven 自動下載 jar 包的源碼,進行 debug 就可以分析客戶端的源碼。

之前我提到的源碼閱讀方法和思想,大家一定要活學活用。

所以元數據對象主要就是如下所示:

KafkaProducer 創建 Metadata 其實並沒有多麼複雜, 創建了之後做了什麼呢?KafkaProducer 的構造函數,執行了一個 metadata.update 方法。

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    // 一些參數設置,省略...
    this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

    // RecordAccumulator、NetworkClient、Sender等組件的初始化,省略...
    this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

    //省略...
}

這個難道就在進行元數據拉取麼?我們來看下這個 update 方法:

/**
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
   this.needUpdate = false;
   this.lastRefreshMs = now;
   this.lastSuccessfulRefreshMs = now;
   this.version += 1;

   for (Listener listener: listeners)
       listener.onMetadataUpdate(cluster);

   // Do this after notifying listeners as subscribed topics' list can be changed by listeners
   this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

   notifyAll();
   log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}

由於 listeners 之前初始化是空的,這個 needMetadataForAllTopics 參數也是 false,之後直接調用了 Metadata.notifyAll(),其實什麼都沒幹。沒有什麼元數據拉取或者更新的操作。

最終發現,這個方法說明其實幾乎什麼都沒有做,也就是說 KafkaProducer 創建的時候,沒有進行元數據拉取。只是初始化了一個 Metadata 對象,其中元數據對象 Cluster 的信息默認是空的

Metadata 的整個過程的關鍵,如下圖所示:

到這裏,你會發現閱讀源碼的時候,不是什麼時候都是一帆風順的,會被各種分支和代碼搞得暈頭轉向。像上面的 update() 方法,就會迷惑你。

但此時你不要灰心,一定要縷清核心脈絡思路,多畫圖,先記錄關鍵邏輯,把這裏放一放,可以嘗試繼續分析其他的場景和邏輯。當分析的邏輯和場景足夠多的時候,多重複分析幾次。你就會慢慢悟到之前不懂的邏輯,會串起來所有的邏輯的。

Producer 核心組件—RecordAccumulator 剖析

仔細分析過了元數據組件的創建之後,我們接着看下一個組件 RecordAccumulator 消息內存緩衝器。

之前通過註釋我們大體知道 RecordAccumulator, 它是個記錄累加器,這個記錄 Record 其實可以看做是一條消息的抽象封裝,也就是它是消息累加器,通過一個內存隊列緩存,做了一個緩衝,準備將這個數據發送給 Broker。所以我們就可以稱他爲發送消息的內存緩衝器

創建它的代碼主要如下:

 /**
    * Create a new record accumulator
    *
    * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
    * @param totalSize The maximum memory the record accumulator can use.
    * @param compression The compression codec for the records
    * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
    *       sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
    *       latency for potentially better throughput due to more batching (and hence fewer, larger requests).
    * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
    *       exhausting all retries in a short period of time.
    * @param metrics The metrics
    * @param time The time instance to use
    */
   public RecordAccumulator(int batchSize,
                            long totalSize,
                            CompressionType compression,
                            long lingerMs,
                            long retryBackoffMs,
                            Metrics metrics,
                            Time time) {
       this.drainIndex = 0;
       this.closed = false;
       this.flushesInProgress = new AtomicInteger(0);
       this.appendsInProgress = new AtomicInteger(0);
       this.batchSize = batchSize;
       this.compression = compression;
       this.lingerMs = lingerMs;
       this.retryBackoffMs = retryBackoffMs;
       this.batches = new CopyOnWriteMap<>();
       String metricGrpName = "producer-metrics";
       this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
       this.incomplete = new IncompleteRecordBatches();
       this.muted = new HashSet<>();
       this.time = time;
       registerMetrics(metrics, metricGrpName);
  }

這個方法的脈絡其實註釋已經告訴我們了,主要就是:

1)設置了一些參數 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等

2)初始化了一些數據結構,比如 batches 是一個 new CopyOnWriteMap<>()

3)初始化了 BufferPool 和 IncompleteRecordBatches

我們來仔細看下細節:

1)設置了一些參數 batchSize、totalSize、retryBackoffMs、lingerMs、compression 等

首先是設置了一些參數 ,從上一節 ConfigDef 初始化可以看到默認值和基本作用

batchSize  默認是 16kb,批量打包消息發送給 Broker 的大小控制

totalSize 默認是 32MB,表示消息內存緩衝區的大小

retryBackoffMs  默認每隔 100ms 重試一次

lingerMs 10ms 內還沒有湊成 1 個 batch 發送出去,必須立即發送出去

compression 壓縮請求方式,默認 none

2)初始化了一些數據結構,比如 batches 是一個 new CopyOnWriteMap<>()

應該是存放 Record 消息的一個集合,看着像是按照某個 topic 某個分區下,存放一些消息,用到了一個雙端隊列

batches = new ConcurrentMap<TopicPartition, Deque<RecordBatch>>()

3)初始化了 BufferPool 和 IncompleteRecordBatches

IncompleteRecordBatches 的創建比較簡單。如下:

 /*
    * A threadsafe helper class to hold RecordBatches that haven't been ack'd yet
    */
   private final static class IncompleteRecordBatches {
       private final Set<RecordBatch> incomplete;

       public IncompleteRecordBatches() {
           this.incomplete = new HashSet<RecordBatch>();
      }
       
       public void add(RecordBatch batch) {
           synchronized (incomplete) {
               this.incomplete.add(batch);
          }
      }
       
       public void remove(RecordBatch batch) {
           synchronized (incomplete) {
               boolean removed = this.incomplete.remove(batch);
               if (!removed)
                   throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
          }
      }
       
       public Iterable<RecordBatch> all() {
           synchronized (incomplete) {
               return new ArrayList<>(this.incomplete);
          }
      }
  }

註釋可以看出來,它是一個線程安全的輔助類,通過 synchronized 操作 HashSet 保證用於保存 Broker 尚未確認(ack)的 RecordBatches。

而 new BufferPool 初始化緩衝區,代碼如下:

public final class BufferPool {

   private final long totalMemory;
   private final int poolableSize;
   private final ReentrantLock lock;
   private final Deque<ByteBuffer> free;
   private final Deque<Condition> waiters;
   private long availableMemory;
   private final Metrics metrics;
   private final Time time;
   private final Sensor waitTime;

   /**
    * Create a new buffer pool
    *
    * @param memory The maximum amount of memory that this buffer pool can allocate
    * @param poolableSize The buffer size to cache in the free list rather than deallocating
    * @param metrics instance of Metrics
    * @param time time instance
    * @param metricGrpName logical group name for metrics
    */
   public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
       this.poolableSize = poolableSize;
       this.lock = new ReentrantLock();
       this.free = new ArrayDeque<ByteBuffer>();
       this.waiters = new ArrayDeque<Condition>();
       this.totalMemory = memory;
       this.availableMemory = memory;
       this.metrics = metrics;
       this.time = time;
       this.waitTime = this.metrics.sensor("bufferpool-wait-time");
       MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
                                                  metricGrpName,
                                                  "The fraction of time an appender waits for space allocation.");
       this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
  }

主要是有一把鎖和有兩個隊列,應該是存放消息的真正的內存緩存區域。

整個過程如下所示:

你看過這些的組件的內部結構,其實可能並不知道它們到底是幹嘛的,沒關係,這裏我們主要的目的本來就是初步就是對這些組件有個印象就可以了,之後分析某個組件的行爲和作用的時候,才能更好的理解。

Producer 核心組件—NetworkClient 剖析

如果要拉去元數據或者發送消息,首先肯定要和 Broker 建立連接。之前分析 KafkaProducer 的源碼脈絡時,有一個網絡通信組件 NetworkClient,我們可以分析下這個組件怎麼創建,做了哪些事情。看看元數據拉取會不會在這裏呢?

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
   // 一些參數設置,省略...

   // RecordAccumulator、Metadata、Sender等組件的初始化,省略...

   NetworkClient client = new NetworkClient(
       // Kafka將原生的Selector略微包裝了下,包裝成Kafka自已的一個Selector網絡通信組件
       new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
       this.metadata,
       clientId,
       config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
       config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
       config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
       config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
       this.requestTimeoutMs, time);
   
   //省略...
  }

private NetworkClient(MetadataUpdater metadataUpdater,
                         Metadata metadata,
                         Selectable selector,
                         String clientId,
                         int maxInFlightRequestsPerConnection,
                         long reconnectBackoffMs,
                         int socketSendBuffer,
                         int socketReceiveBuffer,
                         int requestTimeoutMs,
                         Time time) {

       /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
        * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
        * super constructor is invoked.
        */
       if (metadataUpdater == null) {
           if (metadata == null)
               throw new IllegalArgumentException("`metadata` must not be null");
           //更新元數據的一個組件?
           this.metadataUpdater = new DefaultMetadataUpdater(metadata);
      } else {
           this.metadataUpdater = metadataUpdater;
      }
       this.selector = selector;
       this.clientId = clientId;
       // 已發送或正在發送但尚未收到響應的請求集
       this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
       // Producer與集羣中每個節點的連接狀態
       this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
       this.socketSendBuffer = socketSendBuffer;
       this.socketReceiveBuffer = socketReceiveBuffer;
       this.correlation = 0;
       this.randOffset = new Random();
       this.requestTimeoutMs = requestTimeoutMs;
       this.time = time;
  }

上面的 NetworkClient 創建,主要是

1)創建了一個 Selector,Selector 這個名稱,如果你熟悉 Java NIO 的 API 的話,應該不會陌生,它是 NIO 三大組件之一 Selector、Buffer、Channel。Kafka 將原生的 Selector 略微包裝了下,包裝成 Kafka 自已的一個 Selector 網絡通信組件。

這裏我不展開將 NIO 的原理,Selector 這個組件,你可以簡單的理解爲是用來監聽網絡連接是否有建立和讀寫請求的。

2)設置了一堆配置參數。

3)創建了一個 DefaultMetadataUpdater 組件,將 metadata 傳遞給了它。從名字連蒙帶猜下,好像是更新元數據的一個組件。難道找到元數據拉取的邏輯了?一會可以重點關注下這個類的使用

4)創建了 InFlightRequests 和 ClusterConnectionStates   從這兩個類的註釋我們可以看出來,InFlightRequests 是已發送或正在發送但尚未收到響應的請求集,ClusterConnectionStates   是 Producer 與集羣中每個節點的連接狀態。

上面的 NetworkClient 的初始化,整個過程可以總結如下圖:

看過了創建的脈絡,下面我們看下細節(先脈絡後細節的思想),上面的信息如果你不是一下在就能看出來的話,你就需要看下每個類的細節,確認下了。

細節 1:首先是創建 Selector 代碼如下:

public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
  this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
}

public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
       try {
           //本質還是創建了一個NIO的Selector
           this.nioSelector = java.nio.channels.Selector.open();
      } catch (IOException e) {
           throw new KafkaException(e);
      }
       this.maxReceiveSize = maxReceiveSize;
       this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000 * 1000;
       this.time = time;
       this.metricGrpPrefix = metricGrpPrefix;
       this.metricTags = metricTags;
       this.channels = new HashMap<>();
       this.completedSends = new ArrayList<>();
       this.completedReceives = new ArrayList<>();
       this.stagedReceives = new HashMap<>();
       this.immediatelyConnectedKeys = new HashSet<>();
       this.connected = new ArrayList<>();
       this.disconnected = new ArrayList<>();
       this.failedSends = new ArrayList<>();
       this.sensors = new SelectorMetrics(metrics);
       this.channelBuilder = channelBuilder;
       // initial capacity and load factor are default, we set them explicitly because we want to set accessOrder = true
       this.lruConnections = new LinkedHashMap<>(16, .75F, true);
       currentTimeNanos = time.nanoseconds();
       nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
       this.metricsPerConnection = metricsPerConnection;
  }

可以看到,創建 Kafka 的 Selector 本質還是創建了一個 NIO 的 Selector:java.nio.channels.Selector.open();

細節 2:DefaultMetadataUpdater 這個類的初始化,什麼都沒做,就是引用了下 Metadata

   class DefaultMetadataUpdater implements MetadataUpdater {

       //引用了下Metadata
       /* the current cluster metadata */
       private final Metadata metadata;

       /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
       private boolean metadataFetchInProgress;

       /* the last timestamp when no broker node is available to connect */
       private long lastNoNodeAvailableMs;

       DefaultMetadataUpdater(Metadata metadata) {
           this.metadata = metadata;
           this.metadataFetchInProgress = false;
           this.lastNoNodeAvailableMs = 0;
      }

細節 3:InFlightRequests 的註釋的確是已發送或正在發送但尚未收到響應的請求集的意思。不理解也沒關係,後面我們會看到它使用的地方的。

/**
* The set of requests which have been sent or are being sent but haven't yet received a response
* 已發送或正在發送但尚未收到響應的請求集的意思
*/
final class InFlightRequests {

   private final int maxInFlightRequestsPerConnection;
   private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
   
   public InFlightRequests(int maxInFlightRequestsPerConnection) {
       this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
  }
}

細節 4:ClusterConnectionStates 這個類註釋也就是 Producer 與集羣中每個節點的連接狀態的意思。連接狀態主要有已連接、連接中、斷開。

/**
* The state of our connection to each node in the cluster.
* Producer與集羣中每個節點的連接狀態的意思
*
*/
final class ClusterConnectionStates {
   private final long reconnectBackoffMs;
   private final Map<String, NodeConnectionState> nodeState;

   public ClusterConnectionStates(long reconnectBackoffMs) {
       this.reconnectBackoffMs = reconnectBackoffMs;
       this.nodeState = new HashMap<String, NodeConnectionState>();
  }
   /**
    * The state of our connection to a node
    */
   private static class NodeConnectionState {

       ConnectionState state;
       long lastConnectAttemptMs;

       public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
           this.state = state;
           this.lastConnectAttemptMs = lastConnectAttempt;
      }

       public String toString() {
           return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
      }
  }
   /**
    * The states of a node connection
    * 連接狀態主要有已連接、連接中、斷開
    */
   public enum ConnectionState {
       DISCONNECTED, CONNECTING, CONNECTED
  }

上面整個 NeworkClient 的初始化,就完成了。至於網絡組件的相關參數這裏先不做解釋,當使用到的時候我再給大家解釋。目前解釋了大家可能也太能理解。

整個細節,我大致整理如下圖:

Producer 核心組件—Sender 線程剖析

網絡組件 NeworkClient 和元數據 Metadata、RecordAccumulator 發送消息的內存緩衝器,我們都剖析了下它們的初始化過程。主要知道它們初始化了那些東西。我們總結了組件圖,記錄了關鍵信息。我們可以繼續往下分析最後一個核心的組件 Send 線程。我們來看看它搞了哪些事情。

Sender 的初始化邏輯如下所示:

 private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    // 一些參數設置,省略...
   // RecordAccumulator、NetworkClient、Metadata等組件的初始化,省略...
   this.sender = new Sender(client,
                   this.metadata,
                   this.accumulator,
                   config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                   config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                  (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                   config.getInt(ProducerConfig.RETRIES_CONFIG),
                   this.metrics,
                   new SystemTime(),
                   clientId,
                   this.requestTimeoutMs);
           String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
           this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
           this.ioThread.start();
   // 省略...
}
public Sender(KafkaClient client,
             Metadata metadata,
             RecordAccumulator accumulator,
             boolean guaranteeMessageOrder,
             int maxRequestSize,
             short acks,
             int retries,
             Metrics metrics,
             Time time,
             String clientId,
             int requestTimeout) {
   this.client = client;
   this.accumulator = accumulator;
   this.metadata = metadata;
   this.guaranteeMessageOrder = guaranteeMessageOrder;
   this.maxRequestSize = maxRequestSize;
   this.running = true;
   this.acks = acks;
   this.retries = retries;
   this.time = time;
   this.clientId = clientId;
   this.sensors = new SenderMetrics(metrics);
   this.requestTimeout = requestTimeout;
}

public KafkaThread(final String name, Runnable runnable, boolean daemon) {
   super(runnable, name);
   setDaemon(daemon);
   setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
       public void uncaughtException(Thread t, Throwable e) {
           log.error("Uncaught exception in " + name + ": ", e);
      }
  });
}

這個初始化核心脈絡很簡單,主要就是將其他組件交給了 Sender 去使用。

1) 設置了 sender 的一些核心參數

retries:重試次數,默認是 0,不重試

acks:"all", "-1", "0", "1" 確認策略 默認是 1,leader broker 寫入成功,就算髮送成功。(可能導致消息丟失)

max.request.size:最大的請求大小 默認 1mb

max.in.flight.requests.per.connection 參數默認值是 5,每個 Broker 最多隻能有 5 個請求是發送出去但是還沒接收到響應的(重試可能導致消息順序錯亂)

2)引用了其他三個關鍵組件:網絡組件 NeworkClient 和元數據 Metadata、RecordAccumulator 發送消息的內存緩衝器

3)之後通過 KafkaThread 包裝了 Runnable 線程,啓動了線程,開始執行 Sender 的 run 方法了

整個過程如下所示:

run 方法的執行,不是這一節我們主要關心的了。我後面幾節會詳細分析的。

小結

最後我們小結下吧,今天我們主要熟悉瞭如下的內容:

KafkaProducer 初始化的哪些組件

Producer 核心組件—RecordAccumulator 剖析

Producer 核心組件—元數據 Metadata 剖析

Producer 核心組件—NetworkClient 剖析

Producer 核心組件—Sender 線程剖析

我們只是基本認識了下,每個組件是什麼,主要幹什麼,內部主要有些什麼東西。我把圖今天熟悉的只是,給大家彙總一了一張大圖:

有了這張圖,後面幾節我們就來重點開始分析 Kafka Prodcuer 核心流程就容易很多了。比如

元數據拉取機制 wait+notifyAll 的靈活使用、發送消息的路由策略

網絡通信機制,基於原生 NIO 發送消息機制 + 粘包拆包問題的巧妙解決

Producer 的高吞吐:內存緩衝區的雙端隊列 + 批量打包 Batch 發送機制

大家敬請期待吧,好了今天就到這裏,我們下節見!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/dI5azPFJuShKgwe7c8ZEhg