真的,關於 Kafka 入門看這一篇就夠了

初識 Kafka

什麼是 Kafka

Kafka 是由 Linkedin 公司開發的,它是一個分佈式的,支持多分區、多副本,基於 Zookeeper 的分佈式消息流平臺,它同時也是一款開源的基於發佈訂閱模式的消息引擎系統

Kafka 的基本術語

消息:Kafka 中的數據單元被稱爲消息,也被稱爲記錄,可以把它看作數據庫表中某一行的記錄。

批次:爲了提高效率, 消息會分批次寫入 Kafka,批次就代指的是一組消息。

主題:消息的種類稱爲 主題(Topic), 可以說一個主題代表了一類消息。相當於是對消息進行分類。主題就像是數據庫中的表。

分區:主題可以被分爲若干個分區(partition),同一個主題中的分區可以不在一個機器上,有可能會部署在多個機器上,由此來實現 kafka 的伸縮性,單一主題中的分區有序,但是無法保證主題中所有的分區有序

生產者:向主題發佈消息的客戶端應用程序稱爲生產者(Producer),生產者用於持續不斷的向某個主題發送消息。

消費者:訂閱主題消息的客戶端程序稱爲消費者(Consumer),消費者用於處理生產者產生的消息。

消費者羣組:生產者與消費者的關係就如同餐廳中的廚師和顧客之間的關係一樣,一個廚師對應多個顧客,也就是一個生產者對應多個消費者,消費者羣組(Consumer Group)指的就是由一個或多個消費者組成的羣體。

副本:Kafka 中消息的備份又叫做 副本(Replica),副本的數量是可以配置的,Kafka 定義了兩類副本:領導者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對外提供服務,後者只是被動跟隨。

重平衡:Rebalance。消費者組內某個消費者實例掛掉後,其他消費者實例自動重新分配訂閱主題分區的過程。Rebalance 是 Kafka 消費者端實現高可用的重要手段。

Kafka 的特性(設計原則)

Kafka 的使用場景

Kafka 的消息隊列

Kafka 的消息隊列一般分爲兩種模式:點對點模式和發佈訂閱模式

Kafka 是支持消費者羣組的,也就是說 Kafka 中會有一個或者多個消費者,如果一個生產者生產的消息由一個消費者進行消費的話,那麼這種模式就是點對點模式

點對點模式的消息隊列

如果一個生產者或者多個生產者產生的消息能夠被多個消費者同時消費的情況,這樣的消息隊列成爲發佈訂閱模式的消息隊列

發佈 - 訂閱模式的消息隊列

Kafka 系統架構

如上圖所示,一個典型的 Kafka 集羣中包含若干 Producer(可以是 web 前端產生的 Page View,或者是服務器日誌,系統 CPU、Memory 等),若干 broker(Kafka 支持水平擴展,一般 broker 數量越多,集羣吞吐率越高),若干 Consumer Group,以及一個 Zookeeper 集羣。Kafka 通過 Zookeeper 管理集羣配置,選舉 leader,以及在 Consumer Group 發生變化時進行 rebalance。Producer 使用 push 模式將消息發佈到 broker,Consumer 使用 pull 模式從 broker 訂閱並消費消息。

核心 API

Kafka 有四個核心 API,它們分別是

Kafka 爲何如此之快

Kafka 實現了零拷貝原理來快速移動數據,避免了內核之間的切換。Kafka 可以將數據記錄分批發送,從生產者到文件系統(Kafka 主題日誌)到消費者,可以端到端的查看這些批次的數據。

批處理能夠進行更有效的數據壓縮並減少 I/O 延遲,Kafka 採取順序寫入磁盤的方式,避免了隨機磁盤尋址的浪費,更多關於磁盤尋址的瞭解,請參閱 程序員需要了解的硬核知識之磁盤 。

總結一下其實就是四個要點

Kafka 安裝和重要配置

Kafka 安裝我在 Kafka 系列第一篇應該比較詳細了,詳情見帶你漲姿勢的認識一下 kafka 這篇文章。

那我們還是主要來說一下 Kafka 中的重要參數配置吧,這些參數對 Kafka 來說是非常重要的。

broker 端配置

每個 kafka broker 都有一個唯一的標識來表示,這個唯一的標識符即是 broker.id,它的默認值是 0。這個值在 kafka 集羣中必須是唯一的,這個值可以任意設定,

如果使用配置樣本來啓動 kafka,它會監聽 9092 端口。修改 port 配置參數可以把它設置成任意的端口。要注意,如果使用 1024 以下的端口,需要使用 root 權限啓動 kakfa。

用於保存 broker 元數據的 Zookeeper 地址是通過 zookeeper.connect 來指定的。比如我可以這麼指定 localhost:2181表示這個 Zookeeper 是運行在本地 2181 端口上的。我們也可以通過 比如我們可以通過 zk1:2181,zk2:2181,zk3:2181來指定 zookeeper.connect 的多個參數值。該配置參數是用冒號分割的一組 hostname:port/path 列表,其含義如下

hostname 是 Zookeeper 服務器的機器名或者 ip 地址。

port 是 Zookeeper 客戶端的端口號

/path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了 chroot 環境,如果不指定默認使用跟路徑。

如果你有兩套 Kafka 集羣,假設分別叫它們 kafka1 和 kafka2,那麼兩套集羣的zookeeper.connect參數可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2

Kafka 把所有的消息都保存到磁盤上,存放這些日誌片段的目錄是通過 log.dirs來制定的,它是用一組逗號來分割的本地系統路徑,log.dirs 是沒有默認值的,你必須手動指定他的默認值。其實還有一個參數是 log.dir,如你所知,這個配置是沒有 s 的,默認情況下只用配置 log.dirs 就好了,比如你可以通過 /home/kafka1,/home/kafka2,/home/kafka3這樣來配置這個參數的值。

對於如下 3 種情況,Kafka 會使用可配置的線程池來處理日誌片段。

服務器正常啓動,用於打開每個分區的日誌片段;

服務器崩潰後重啓,用於檢查和截斷每個分區的日誌片段;

服務器正常關閉,用於關閉日誌片段。

默認情況下,每個日誌目錄只使用一個線程。因爲這些線程只是在服務器啓動和關閉時會用到,所以完全可以設置大量的線程來達到井行操作的目的。特別是對於包含大量分區的服務器來說,一旦發生崩憤,在進行恢復時使用井行操作可能會省下數小時的時間。設置此參數時需要注意,所配置的數字對應的是 log.dirs 指定的單個日誌目錄。也就是說,如果 num.recovery.threads.per.data.dir 被設爲 8,並且 log.dir 指定了 3 個路徑,那麼總共需要 24 個線程。

默認情況下,kafka 會使用三種方式來自動創建主題,下面是三種情況:

當一個生產者開始往主題寫入消息時

當一個消費者開始從主題讀取消息時

當任意一個客戶端向主題發送元數據請求時

auto.create.topics.enable參數我建議最好設置成 false,即不允許自動創建 Topic。在我們的線上環境裏面有很多名字稀奇古怪的 Topic,我想大概都是因爲該參數被設置成了 true 的緣故。

主題默認配置

Kafka 爲新創建的主題提供了很多默認配置參數,下面就來一起認識一下這些參數

num.partitions 參數指定了新創建的主題需要包含多少個分區。如果啓用了主題自動創建功能(該功能是默認啓用的),主題分區的個數就是該參數指定的值。該參數的默認值是 1。要注意,我們可以增加主題分區的個數,但不能減少分區的個數。

這個參數比較簡單,它表示 kafka 保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 default.replication.factor 的默認值爲 1,這個參數在你啓用了主題自動創建功能後有效。

Kafka 通常根據時間來決定數據可以保留多久。默認使用 log.retention.hours 參數來配置時間,默認是 168 個小時,也就是一週。除此之外,還有兩個參數 log.retention.minutes 和 log.retentiion.ms 。這三個參數作用是一樣的,都是決定消息多久以後被刪除,推薦使用 log.retention.ms。

另一種保留消息的方式是判斷消息是否過期。它的值通過參數 log.retention.bytes 來指定,作用在每一個分區上。也就是說,如果有一個包含 8 個分區的主題,並且 log.retention.bytes 被設置爲 1GB,那麼這個主題最多可以保留 8GB 數據。所以,當主題的分區個數增加時,整個主題可以保留的數據也隨之增加。

上述的日誌都是作用在日誌片段上,而不是作用在單個消息上。當消息到達 broker 時,它們被追加到分區的當前日誌片段上,當日志片段大小到達 log.segment.bytes 指定上限(默認爲 1GB)時,當前日誌片段就會被關閉,一個新的日誌片段被打開。如果一個日誌片段被關閉,就開始等待過期。這個參數的值越小,就越會頻繁的關閉和分配新文件,從而降低磁盤寫入的整體效率。

上面提到日誌片段經關閉後需等待過期,那麼 log.segment.ms 這個參數就是指定日誌多長時間被關閉的參數和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日誌片段會在大小或時間到達上限時被關閉,就看哪個條件先得到滿足。

broker 通過設置 message.max.bytes參數來限制單個消息的大小,默認是 1000 000, 也就是 1MB,如果生產者嘗試發送的消息超過這個大小,不僅消息不會被接收,還會收到 broker 返回的錯誤消息。跟其他與字節相關的配置參數一樣,該參數指的是壓縮後的消息大小,也就是說,只要壓縮後的消息小於 mesage.max.bytes,那麼消息的實際大小可以大於這個值

這個值對性能有顯著的影響。值越大,那麼負責處理網絡連接和請求的線程就需要花越多的時間來處理這些請求。它還會增加磁盤寫入塊的大小,從而影響 IO 吞吐量。

規定了該主題消息被保存的時常,默認是 7 天,即該主題只能保存 7 天的消息,一旦設置了這個值,它會覆蓋掉 Broker 端的全局參數值。

retention.bytes:規定了要爲該 Topic 預留多大的磁盤空間。和全局參數作用相似,這個值通常在多租戶的 Kafka 集羣中會有用武之地。當前默認值是 -1,表示可以無限使用磁盤空間。

JVM 參數配置

JDK 版本一般推薦直接使用 JDK1.8,這個版本也是現在中國大部分程序員的首選版本。

說到 JVM 端設置,就繞不開這個話題,業界最推崇的一種設置方式就是直接將 JVM 堆大小設置爲 6GB,這樣會避免很多 Bug 出現。

JVM 端配置的另一個重要參數就是垃圾回收器的設置,也就是平時常說的 GC設置。如果你依然在使用 Java 7,那麼可以根據以下法則選擇合適的垃圾回收器:

當然了,如果你已經在使用 Java 8 了,那麼就用默認的 G1 收集器就好了。在沒有任何調優的情況下,G1 表現得要比 CMS 出色,主要體現在更少的 Full GC,需要調整的參數更少等,所以使用 G1 就好了。

一般 G1 的調整隻需要這兩個參數即可

該參數指定每次垃圾回收默認的停頓時間。該值不是固定的,G1 可以根據需要使用更長的時間。它的默認值是 200ms,也就是說,每一輪垃圾回收大概需要 200 ms 的時間。

該參數指定了 G1 啓動新一輪垃圾回收之前可以使用的堆內存百分比,默認值是 45,這就表明 G1 在堆使用率到達 45 之前不會啓用垃圾回收。這個百分比包括新生代和老年代。

Kafka Producer

在 Kafka 中,我們把產生消息的那一方稱爲生產者,比如我們經常回去淘寶購物,你打開淘寶的那一刻,你的登陸信息,登陸次數都會作爲消息傳輸到 Kafka 後臺,當你瀏覽購物的時候,你的瀏覽信息,你的搜索指數,你的購物愛好都會作爲一個個消息傳遞給 Kafka 後臺,然後淘寶會根據你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那麼這些生產者產生的消息是怎麼傳到 Kafka 應用程序的呢?發送過程是怎麼樣的呢?

儘管消息的產生非常簡單,但是消息的發送過程還是比較複雜的,如圖

我們從創建一個ProducerRecord 對象開始,ProducerRecord 是 Kafka 中的一個核心類,它代表了一組 Kafka 需要發送的 key/value 鍵值對,它由記錄要發送到的主題名稱(Topic Name),可選的分區號(Partition Number)以及可選的鍵值對構成。

在發送 ProducerRecord 時,我們需要將鍵值對對象由序列化器轉換爲字節數組,這樣它們才能夠在網絡上傳輸。然後消息到達了分區器。

如果發送過程中指定了有效的分區號,那麼在發送記錄時將使用該分區。如果發送過程中未指定分區,則將使用 key 的 hash 函數映射指定一個分區。如果發送的過程中既沒有分區號也沒有,則將以循環的方式分配一個分區。選好分區後,生產者就知道向哪個主題和分區發送數據了。

ProducerRecord 還有關聯的時間戳,如果用戶沒有提供時間戳,那麼生產者將會在記錄中使用當前的時間作爲時間戳。Kafka 最終使用的時間戳取決於 topic 主題配置的時間戳類型。

然後,這條消息被存放在一個記錄批次裏,這個批次裏的所有消息會被髮送到相同的主題和分區上。由一個獨立的線程負責把它們發到 Kafka Broker 上。

Kafka Broker 在收到消息時會返回一個響應,如果寫入成功,會返回一個 RecordMetaData 對象,它包含了主題和分區信息,以及記錄在分區裏的偏移量,上面兩種的時間戳類型也會返回給用戶。如果寫入失敗,會返回一個錯誤。生產者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還是失敗的話,就返回錯誤消息。

創建 Kafka 生產者

要向 Kafka 寫入消息,首先需要創建一個生產者對象,並設置一些屬性。Kafka 生產者有 3 個必選的屬性

該屬性指定 broker 的地址清單,地址的格式爲 host:port。清單裏不需要包含所有的 broker 地址,生產者會從給定的 broker 裏查找到其他的 broker 信息。不過建議至少要提供兩個 broker 信息,一旦其中一個宕機,生產者仍然能夠連接到集羣上。

broker 需要接收到序列化之後的 key/value值,所以生產者發送的消息需要經過序列化之後才傳遞給 Kafka Broker。生產者需要知道採用何種方式把 Java 對象轉換爲字節數組。key.serializer 必須被設置爲一個實現了org.apache.kafka.common.serialization.Serializer 接口的類,生產者會使用這個類把鍵對象序列化爲字節數組。這裏拓展一下 Serializer 類

Serializer 是一個接口,它表示類將會採用何種方式序列化,它的作用是把對象轉換爲字節,實現了 Serializer 接口的類主要有 ByteArraySerializerStringSerializerIntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認使用的序列化器,其他的序列化器還有很多,你可以通過 這裏 查看其他序列化器。要注意的一點:key.serializer 是必須要設置的,即使你打算只發送值的內容

與 key.serializer 一樣,value.serializer 指定的類會將值序列化。

下面代碼演示瞭如何創建一個 Kafka 生產者,這裏只指定了必要的屬性,其他使用默認的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

來解釋一下這段代碼

Kafka 消息發送

實例化生產者對象後,接下來就可以開始發送消息了,發送消息主要由下面幾種方式

簡單消息發送

Kafka 最簡單的消息發送如下:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");
producer.send(record);

代碼中生產者 (producer) 的 send() 方法需要把 ProducerRecord 的對象作爲參數進行發送,ProducerRecord 有很多構造函數,這個我們下面討論,這裏調用的是

public ProducerRecord(String topic, K key, V value) {}

這個構造函數,需要傳遞的是 topic 主題,key 和 value。

把對應的參數傳遞完成後,生產者調用 send() 方法發送消息(ProducerRecord 對象)。我們可以從生產者的架構圖中看出,消息是先被寫入分區中的緩衝區中,然後分批次發送給 Kafka Broker。

發送成功後,send() 方法會返回一個 Future(java.util.concurrent) 對象,Future 對象的類型是 RecordMetadata類型,我們上面這段代碼沒有考慮返回值,所以沒有生成對應的 Future 對象,所以沒有辦法知道消息是否發送成功。如果不是很重要的信息或者對結果不會產生影響的信息,可以使用這種方式進行發送。

我們可以忽略發送消息時可能發生的錯誤或者在服務器端可能發生的錯誤,但在消息發送之前,生產者還可能發生其他的異常。這些異常有可能是 SerializationException(序列化失敗)BufferedExhaustedException 或 TimeoutException(說明緩衝區已滿),又或是 InterruptedException(說明發送線程被中斷)

同步發送消息

第二種消息發送機制如下所示

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");
try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace()}

這種發送消息的方式較上面的發送方式有了改進,首先調用 send() 方法,然後再調用 get() 方法等待 Kafka 響應。如果服務器返回錯誤,get() 方法會拋出異常,如果沒有發生錯誤,我們會得到 RecordMetadata 對象,可以用它來查看消息記錄。

生產者(KafkaProducer)在發送的過程中會出現兩類錯誤:其中一類是重試錯誤,這類錯誤可以通過重發消息來解決。比如連接的錯誤,可以通過再次建立連接來解決;無錯誤則可以通過重新爲分區選舉首領來解決。KafkaProducer 被配置爲自動重試,如果多次重試後仍無法解決問題,則會拋出重試異常。另一類錯誤是無法通過重試來解決的,比如消息過大對於這類錯誤,KafkaProducer 不會進行重試,直接拋出異常。

異步發送消息

同步發送消息都有個問題,那就是同一時間只能有一個消息在發送,這會造成許多消息無法直接發送,造成消息滯後,無法發揮效益最大化。

比如消息在應用程序和 Kafka 集羣之間一個來回需要 10ms。如果發送完每個消息後都等待響應的話,那麼發送 100 個消息需要 1 秒,但是如果是異步方式的話,發送 100 條消息所需要的時間就會少很多很多。大多數時候,雖然 Kafka 會返回 RecordMetadata 消息,但是我們並不需要等待響應。

爲了在異步發送消息的同時能夠對異常情況進行處理,生產者提供了回掉支持。下面是回調的一個例子

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());
class DemoProducerCallBack implements Callback {
  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

首先實現回調需要定義一個實現了org.apache.kafka.clients.producer.Callback的類,這個接口只有一個 onCompletion方法。如果 kafka 返回一個錯誤,onCompletion 方法會拋出一個非空 (non null) 異常,這裏我們只是簡單的把它打印出來,如果是生產環境需要更詳細的處理,然後在 send() 方法發送的時候傳遞一個 Callback 回調的對象。

生產者分區機制

Kafka 對於數據的讀寫是以分區爲粒度的,分區可以分佈在多個主機(Broker)中,這樣每個節點能夠實現獨立的數據寫入和讀取,並且能夠通過增加新的節點來增加 Kafka 集羣的吞吐量,通過分區部署在多個 Broker 來實現負載均衡的效果。

上面我們介紹了生產者的發送方式有三種:不管結果如何直接發送發送並返回結果發送並回調。由於消息是存在主題(topic)的分區(partition)中的,所以當 Producer 生產者發送產生一條消息發給 topic 的時候,你如何判斷這條消息會存在哪個分區中呢?

這其實就設計到 Kafka 的分區機制了。

分區策略

Kafka 的分區策略指的就是將生產者發送到哪個分區的算法。Kafka 爲我們提供了默認的分區策略,同時它也支持你自定義分區策略。

如果要自定義分區策略的話,你需要顯示配置生產者端的參數 Partitioner.class,我們可以看一下這個類它位於 org.apache.kafka.clients.producer 包下

public interface Partitioner extends Configurable, Closeable {
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
  public void close();
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner 類有三個方法,分別來解釋一下

其中與分區策略息息相關的就是 partition() 方法了,分區策略有下面這幾種

順序輪詢

順序分配,消息是均勻的分配給每個 partition,即每個分區存儲一次消息。就像下面這樣

上圖表示的就是輪詢策略,輪訓策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機輪詢

隨機輪詢簡而言之就是隨機的向 partition 中保存消息,如下圖所示

實現隨機分配的代碼只需要兩行,如下

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先計算出該主題總的分區數,然後隨機地返回一個小於它的正整數。

本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分佈,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改爲輪詢了。

按照 key 進行消息保存

這個策略也叫做 key-ordering 策略,Kafka 中每條消息都會有自己的 key,一旦消息被定義了 Key,那麼你就可以保證同一個 Key 的所有消息都進入到相同的分區裏面,由於每個分區下的消息處理都是有順序的,故這個策略被稱爲按消息鍵保序策略,如下圖所示

實現這個策略的 partition 方法同樣簡單,只需要下面兩行代碼即可:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面這幾種分區策略都是比較基礎的策略,除此之外,你還可以自定義分區策略。

生產者壓縮機制

壓縮一詞簡單來講就是一種互換思想,它是一種經典的用 CPU 時間去換磁盤空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁盤佔用或更少的網絡 I/O 傳輸。如果你還不瞭解的話我希望你先讀完這篇文章 程序員需要了解的硬核知識之壓縮算法,然後你就明白壓縮是怎麼回事了。

Kafka 壓縮是什麼

Kafka 的消息分爲兩層:消息集合 和 消息。一個消息集合中包含若干條日誌項,而日誌項纔是真正封裝消息的地方。Kafka 底層的消息日誌由一系列消息集合日誌項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。

在 Kafka 中,壓縮會發生在兩個地方:Kafka Producer 和 Kafka Consumer,爲什麼啓用壓縮?說白了就是消息太大,需要變小一點 來使消息發的更快一些。

Kafka Producer 中使用 compression.type 來開啓壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代碼表明該 Producer 的壓縮算法使用的是 GZIP

有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息後併發送給服務器後,由 Consumer 消費者進行解壓縮,因爲採用的何種壓縮算法是隨着 key、value 一起發送過去的,所以消費者知道採用何種壓縮算法。

Kafka 重要參數配置

在上一篇文章 帶你漲姿勢的認識一下 kafka 中,我們主要介紹了一下 kafka 集羣搭建的參數,本篇文章我們來介紹一下 Kafka 生產者重要的配置,生產者有很多可配置的參數,在文檔裏(http://kafka.apache.org/documentation/#producerconfigs)都有說明,我們介紹幾個在內存使用、性能和可靠性方面對生產者影響比較大的參數進行說明

key.serializer

用於 key 鍵的序列化,它實現了 org.apache.kafka.common.serialization.Serializer 接口

value.serializer

用於 value 值的序列化,實現了 org.apache.kafka.common.serialization.Serializer 接口

acks

acks 參數指定了要有多少個分區副本接收消息,生產者才認爲消息是寫入成功的。此參數對消息丟失的影響較大

buffer.memory

此參數用來設置生產者內存緩衝區的大小,生產者用它緩衝要發送到服務器的消息。如果應用程序發送消息的速度超過發送到服務器的速度,會導致生產者空間不足。這個時候,send() 方法調用要麼被阻塞,要麼拋出異常,具體取決於 block.on.buffer.null 參數的設置。

compression.type

此參數來表示生產者啓用何種壓縮算法,默認情況下,消息發送時不會被壓縮。該參數可以設置爲 snappy、gzip 和 lz4,它指定了消息發送給 broker 之前使用哪一種壓縮算法進行壓縮。下面是各壓縮算法的對比

retries

生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區找不到首領),在這種情況下,reteis 參數的值決定了生產者可以重發的消息次數,如果達到這個次數,生產者會放棄重試並返回錯誤。默認情況下,生產者在每次重試之間等待 100ms,這個等待參數可以通過 retry.backoff.ms 進行修改。

batch.size

當有多個消息需要被髮送到同一個分區時,生產者會把它們放在同一個批次裏。該參數指定了一個批次可以使用的內存大小,按照字節數計算。當批次被填滿,批次裏的所有消息會被髮送出去。不過生產者井不一定都會等到批次被填滿才發送,任意條數的消息都可能被髮送。

client.id

此參數可以是任意的字符串,服務器會用它來識別消息的來源,一般配置在日誌裏

max.in.flight.requests.per.connection

此參數指定了生產者在收到服務器響應之前可以發送多少消息,它的值越高,就會佔用越多的內存,不過也會提高吞吐量。把它設爲 1 可以保證消息是按照發送的順序寫入服務器。

timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生產者在發送數據時等待服務器返回的響應時間,metadata.fetch.timeout.ms 指定了生產者在獲取元數據(比如目標分區的首領是誰)時等待服務器返回響應的時間。如果等待時間超時,生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms 指定了 broker 等待同步副本返回消息確認的時間,與 asks 的配置相匹配 ---- 如果在指定時間內沒有收到同步副本的確認,那麼 broker 就會返回一個錯誤。

max.block.ms

此參數指定了在調用 send() 方法或使用 partitionFor() 方法獲取元數據時生產者的阻塞時間當生產者的發送緩衝區已捕,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

max.request.size

該參數用於控制生產者發送的請求大小。它可以指能發送的單個消息的最大值,也可以指單個請求裏所有消息的總大小。

receive.buffer.bytes 和 send.buffer.bytes

Kafka 是基於 TCP 實現的,爲了保證可靠的消息傳輸,這兩個參數分別指定了 TCP Socket 接收和發送數據包的緩衝區的大小。如果它們被設置爲 -1,就使用操作系統的默認值。如果生產者或消費者與 broker 處於不同的數據中心,那麼可以適當增大這些值。

Kafka Consumer

應用程序使用 KafkaConsumer 從 Kafka 中訂閱主題並接收來自這些主題的消息,然後再把他們保存起來。應用程序首先需要創建一個 KafkaConsumer 對象,訂閱主題並開始接受消息,驗證消息並保存結果。一段時間後,生產者往主題寫入的速度超過了應用程序驗證數據的速度,這時候該如何處理?如果只使用單個消費者的話,應用程序會跟不上消息生成的速度,就像多個生產者像相同的主題寫入消息一樣,這時候就需要多個消費者共同參與消費主題中的消息,對消息進行分流處理。

Kafka 消費者從屬於消費者羣組。一個羣組中的消費者訂閱的都是相同的主題,每個消費者接收主題一部分分區的消息。下面是一個 Kafka 分區消費示意圖

上圖中的主題 T1 有四個分區,分別是分區 0、分區 1、分區 2、分區 3,我們創建一個消費者羣組 1,消費者羣組中只有一個消費者,它訂閱主題 T1,接收到 T1 中的全部消息。由於一個消費者處理四個生產者發送到分區的消息,壓力有些大,需要幫手來幫忙分擔任務,於是就演變爲下圖

這樣一來,消費者的消費能力就大大提高了,但是在某些環境下比如用戶產生消息特別多的時候,生產者產生的消息仍舊讓消費者喫不消,那就繼續增加消費者。

如上圖所示,每個分區所產生的消息能夠被每個消費者羣組中的消費者消費,如果向消費者羣組中增加更多的消費者,那麼多餘的消費者將會閒置,如下圖所示

向羣組中增加消費者是橫向伸縮消費能力的主要方式。總而言之,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。這也是爲什麼建議創建主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數量不應該比分區數多,因爲多出來的消費者是空閒的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。爲了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對於上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼就演變爲下圖這樣

在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬於不同的應用。

總結起來就是如果應用需要讀取全量消息,那麼請爲該應用設置一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裏增加消費者

消費者組和分區重平衡

消費者組是什麼

消費者組(Consumer Group)是由一個或多個消費者實例(Consumer Instance)組成的羣組,具有可擴展性和可容錯性的一種機制。消費者組內的消費者共享一個消費者組 ID,這個 ID 也叫做 Group ID,組內的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區的消息,多餘的消費者會閒置,派不上用場。

我們在上面提到了兩種消費方式

消費者重平衡

我們從上面的消費者演變圖中可以知道這麼一個過程:最初是一個消費者訂閱一個主題並消費其全部分區的消息,後來有一個消費者加入羣組,隨後又有更多的消費者加入羣組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區的所有權通過一個消費者轉到其他消費者的行爲稱爲重平衡,英文名也叫做 Rebalance 。如下圖所示

重平衡非常重要,它爲消費者羣組帶來了高可用性 和 伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們並不希望發生這樣的行爲。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當分區被重新分配給另一個消費者時,消息當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。

消費者通過向組織協調者(Kafka Broker)發送心跳來維護自己是消費者組的一員並確認其擁有的分區。對於不同不的消費羣體來說,其組織協調者可以是不同的。只要消費者定期發送心跳,就會認爲消費者是存活的並處理其分區中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。

如果過了一段時間 Kafka 停止發送心跳了,會話(Session)就會過期,組織協調者就會認爲這個 Consumer 已經死亡,就會觸發一次重平衡。如果消費者宕機並且停止發送消息,組織協調者會等待幾秒鐘,確認它死亡了纔會觸發重平衡。在這段時間裏,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協調者它要離開羣組,組織協調者會觸發一次重平衡,儘量降低處理停頓。

重平衡是一把雙刃劍,它爲消費者羣組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點 (bug),而這些 bug 到現在社區還無法修改。

重平衡的過程對消費者組有極大的影響。因爲每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關於 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結束。Stop The World這個名字聽起來很帥,但這項工作實際上是由虛擬機在後臺自動發起並完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應用來說都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......

創建消費者

上面的理論說的有點多,下面就通過代碼來講解一下消費者是如何消費的

在讀取消息之前,需要先創建一個 KafkaConsumer 對象。創建 KafkaConsumer 對象與創建 KafkaProducer 對象十分相似 --- 把需要傳遞給消費者的屬性放在 properties 對象中,後面我們會着重討論 Kafka 的一些配置,這裏我們先簡單的創建一下,使用 3 個屬性就足矣,分別是 bootstrap.serverkey.deserializervalue.deserializer 。

這三個屬性我們已經用過很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢是認識一下 Kafka Producer

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬於哪個消費者羣組。創建不屬於任何一個羣組的消費者也是可以的

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主題訂閱

創建好消費者之後,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作爲參數,使用起來比較簡單

consumer.subscribe(Collections.singletonList("customerTopic"));

爲了簡單我們只訂閱了一個主題 customerTopic,參數傳入的是一個正則表達式,正則表達式可以匹配多個主題,如果有人創建了新的主題,並且主題的名字與正則表達式相匹配,那麼會立即觸發一次重平衡,消費者就可以讀取新的主題。

要訂閱所有與 test 相關的主題,可以這樣做

consumer.subscribe("test.*");

輪詢

我們知道,Kafka 是支持訂閱 / 發佈模式的,生產者發送數據給 Kafka Broker,那麼消費者是如何知道生產者發送了數據呢?其實生產者產生的數據消費者是不知道的,KafkaConsumer 採用輪詢的方式定期去 Kafka Broker 中進行數據的檢索,如果有數據就用來消費,如果沒有就再繼續輪詢等待,下面是輪詢等待的具體實現

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}

線程安全性

在同一個羣組中,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全的共享一個消費者。按照規則,一個消費者使用一個線程,如果一個消費者羣組中多個消費者都想要運行的話,那麼必須讓每個消費者在自己的線程中運行,可以使用 Java 中的 ExecutorService 啓動多個消費者進行進行處理。

消費者配置

到目前爲止,我們學習瞭如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費者相關的配置說明。大部分參數都有合理的默認值,一般不需要修改它們,下面我們就來介紹一下這些參數。

該屬性指定了消費者從服務器獲取記錄的最小字節數。broker 在收到消費者的數據請求時,如果可用的數據量小於 fetch.min.bytes 指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因爲它們在主題使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數據,但消費者的 CPU 使用率很高,那麼就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值調大可以降低 broker 的工作負載。

我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時纔會把它返回給消費者。而 fetch.max.wait.ms 則用於指定 broker 的等待時間,默認是 500 毫秒。如果沒有足夠的數據流入 kafka 的話,消費者獲取的最小數據量要求就得不到滿足,最終導致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數值設置的小一些。如果 fetch.max.wait.ms 被設置爲 100 毫秒的延遲,而 fetch.min.bytes 的值設置爲 1MB,那麼 Kafka 在收到消費者請求後,要麼返回 1MB 的數據,要麼在 100 ms 後返回所有可用的數據。就看哪個條件首先被滿足。

該屬性指定了服務器從每個分區裏返回給消費者的最大字節數。它的默認值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區裏返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節。如果一個主題有 20 個分區和 5 個消費者,那麼每個消費者需要至少4 MB 的可用內存來接收記錄。在爲消費者分配內存時,可以給它們多分配一些,因爲如果羣組裏有消費者發生崩潰,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節數 (通過 max.message.size 屬性配置大),否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。在設置該屬性時,另外一個考量的因素是消費者處理數據的時間。消費者需要頻繁的調用 poll() 方法來避免會話過期和發生分區再平衡,如果單次調用 poll() 返回的數據太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

這個屬性指定了消費者在被認爲死亡之前可以與服務器斷開連接的時間,默認是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內發送心跳給羣組協調器,就會被認定爲死亡,協調器就會觸發重平衡。把它的分區分配給消費者羣組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向羣組協調器發送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設置的比默認值小,可以更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能導致非預期的重平衡。把該屬性的值設置得大一些,可以減少意外的重平衡,不過檢測節點崩潰需要更長的時間。

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下的該如何處理。它的默認值是 latest,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分區的記錄。

我們稍後將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,爲了儘量避免出現重複數據和數據丟失,可以把它設置爲 false,由自己控制何時提交偏移量。如果把它設置爲 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率

我們知道,分區會分配給羣組中的消費者。PartitionAssignor 會根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者,Kafka 有兩個默認的分配策略Range 和 RoundRobin

該屬性可以是任意字符串,broker 用他來標識從客戶端發送過來的消息,通常被用在日誌、度量指標和配額中

該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢中需要處理的數據量。

socket 在讀寫數據時用到的 TCP 緩衝區也可以設置大小。如果它們被設置爲 -1,就使用操作系統默認值。如果生產者或消費者與 broker 處於不同的數據中心內,可以適當增大這些值,因爲跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費者在每次調用poll() 方法進行定時輪詢的時候,會返回由生產者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被羣組裏的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤消息在分區中的位置(偏移量)

消費者會向一個叫做 _consumer_offset 的特殊主題中發送消息,這個主題會保存每次所發送消息中的分區偏移量,這個主題的主要作用就是消費者觸發重平衡後記錄偏移使用的,消費者每次向這個主題發送消息,正常情況下不觸發重平衡,這個主題是不起作用的,當觸發重平衡後,消費者停止工作,每個消費者可能會分到對應的分區,這個主題就是讓消費者能夠繼續處理消息所設置的。

如果提交的偏移量小於客戶端最後一次處理的偏移量,那麼位於兩個偏移量之間的消息就會被重複處理

如果提交的偏移量大於最後一次消費時的偏移量,那麼處於兩個偏移量中間的消息將會丟失

既然_consumer_offset 如此重要,那麼它的提交方式是怎樣的呢?下面我們就來說一下提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設置爲 true,那麼每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者裏的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那麼就會提交從上一次輪詢中返回的偏移量。

提交當前偏移量

把 auto.commit.offset 設置爲 false,可以讓應用程序決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功後馬上返回,如果提交失敗就拋出異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄後要確保調用了 commitSync(),否則還是會有丟失消息的風險,如果發生了在均衡,從最近一批消息到發生在均衡之間的所有消息都將被重複處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區別在於異步提交不會進行重試,同步提交會一致進行重試。

同步和異步組合提交

一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,因爲如果提交失敗是因爲臨時問題導致的,那麼後續的提交總會有成功的。但是如果在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。

因此,在消費者關閉之前一般會組合使用 commitAsync 和 commitSync 提交偏移量

提交特定的偏移量

消費者 API 允許調用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

後記

此篇文章是關於 kafka 的入門總結,是 cxuan Kafka 系列的第三篇文章,文章大部分是對之前三篇的彙總,也補充了一些新家的東西,如果覺得好,歡迎轉發和在看。

文章參考:

Kafka 史上最詳細原理總結

《Kafka 權威指南》

https://kafka.apache.org/

http://kafka.apache.org/documentation/

https://www.tutorialkart.com/apache-kafka-tutorial/

https://dzone.com/articles/what-is-kafka

《極客時間 - Kafka 核心技術與實戰》

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/3A4AIKcWKpQzG9nJzRmZJQ