Kafka Consumer 消費消息和 Rebalance 機制
Kafka Consumer
Kafka 有消費組的概念,每個消費者只能消費所分配到的分區的消息,每一個分區只能被一個消費組中的一個消費者所消費,所以同一個消費組中消費者的數量如果超過了分區的數量,將會出現有些消費者分配不到消費的分區。消費組與消費者關係如下圖所示:
Kafka Consumer Client 消費消息通常包含以下步驟:
-
配置客戶端,創建消費者
-
訂閱主題
-
拉去消息並消費
-
提交消費位移
-
關閉消費者實例
因爲 Kafka 的 Consumer 客戶端是線程不安全的,爲了保證線程安全,並提升消費性能,可以在 Consumer 端採用類似 Reactor 的線程模型來消費數據。
Kafka consumer 參數
-
bootstrap.servers:連接 broker 地址,
host:port
格式。 -
group.id:消費者隸屬的消費組。
-
key.deserializer:與生產者的
key.serializer
對應,key 的反序列化方式。 -
value.deserializer:與生產者的
value.serializer
對應,value 的反序列化方式。 -
session.timeout.ms:coordinator 檢測失敗的時間。默認 10s 該參數是 Consumer Group 主動檢測 (組內成員 comsummer) 崩潰的時間間隔,類似於心跳過期時間。
-
auto.offset.reset:該屬性指定了消費者在讀取一個沒有偏移量後者偏移量無效(消費者長時間失效當前的偏移量已經過時並且被刪除了)的分區的情況下,應該作何處理,默認值是 latest,也就是從最新記錄讀取數據(消費者啓動之後生成的記錄),另一個值是 earliest,意思是在偏移量無效的情況下,消費者從起始位置開始讀取數據。
-
enable.auto.commit:否自動提交位移,如果爲
false
,則需要在程序中手動提交位移。對於精確到一次的語義,最好手動提交位移 -
fetch.max.bytes:單次拉取數據的最大字節數量
-
max.poll.records:單次 poll 調用返回的最大消息數,如果處理邏輯很輕量,可以適當提高該值。但是
max.poll.records
條數據需要在在 session.timeout.ms 這個時間內處理完 。默認值爲 500 -
request.timeout.ms:一次請求響應的最長等待時間。如果在超時時間內未得到響應,kafka 要麼重發這條消息,要麼超過重試次數的情況下直接置爲失敗。
Kafka Rebalance
rebalance 本質上是一種協議,規定了一個 consumer group 下的所有 consumer 如何達成一致來分配訂閱 topic 的每個分區。比如某個 group 下有 20 個 consumer,它訂閱了一個具有 100 個分區的 topic。正常情況下,Kafka 平均會爲每個 consumer 分配 5 個分區。這個分配的過程就叫 rebalance。
什麼時候 rebalance?
這也是經常被提及的一個問題。rebalance 的觸發條件有三種:
-
組成員發生變更(新 consumer 加入組、已有 consumer 主動離開組或已有 consumer 崩潰了——這兩者的區別後面會談到)
-
訂閱主題數發生變更
-
訂閱主題的分區數發生變更
如何進行組內分區分配?
Kafka 默認提供了兩種分配策略:Range 和 Round-Robin。當然 Kafka 採用了可插拔式的分配策略,你可以創建自己的分配器以實現不同的分配策略。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/bgLQQO2uNKbHxd9OSRvqpg