Kafka 爲什麼會丟消息?
引入 MQ
消息中間件最直接的目的:系統解耦以及流量控制(削峯填谷)
-
系統解耦: 上下游系統之間的通信相互依賴,利用
MQ
消息隊列可以隔離上下游環境變化帶來的不穩定因素。 -
流量控制: 超高併發場景中,引入
MQ
可以實現流量 “削峯填谷” 的作用以及服務異步處理,不至於打崩服務。
引入 MQ
同樣帶來其他問題:數據一致性。
在分佈式系統中,如果兩個節點之間存在數據同步,就會帶來數據一致性的問題。消息生產端發送消息到
MQ
再到消息消費端需要保證消息不丟失。
所以在使用 MQ
消息隊列時,需要考慮這 3 個問題:
-
如何知道有消息丟失?
-
哪些環節可能丟消息?
-
如何確保消息不丟失?
1、如何知道有消息丟失?
如何感知消息是否丟失了?可總結如下:
-
他人反饋: 運營、
PM
反饋消息丟失。 -
監控報警: 監控指定指標,即時報警人工調整。
Kafka
集羣異常、Broker
宕機、Broker
磁盤掛載問題、消費者異常導致消息積壓等都會給用戶直接感覺是消息丟失了。
案例:輿情分析中數據採集同步
-
PM
可自己下發採集調度指令,去採集特定數據。 -
PM
可通過ES
近實時查詢對應數據,若沒相應數據可再次下發指令。
當感知消息丟失了,那就需要一種機制來檢查消息是否丟失。
檢索消息
運維工具有:
- 查看
Kafka
消費位置:
# 查看某個topic的message數量
$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic
# 查看consumer Group列表
$ ./kafka-consumer-groups.sh --list --bootstrap-server 192.168.88.108:9092
# 查看 offset 消費情況
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-1152 test_topic 0 - 4 - consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1 consumer-console-consumer-1152-1
- 利用工具:
Kafka Tools
- 其他可見化界面工具
2、哪些環節可能丟消息?
一條消息從生產到消費完成經歷 3 個環節:消息生產者、消息中間件、消息消費者。
哪個環節都有可能出現消息丟失問題。
1)生產端
首先要認識到 Kafka
生產端發送消息流程:
調用
send()
方法時,不會立刻把消息發送出去,而是緩存起來,選擇恰當時機把緩存裏的消息劃分成一批數據,通過Sender
線程按批次發送給服務端Broker
。
此環節丟失消息的場景有: 即導致 Producer
消息沒有發送成功
- 網絡波動: 生產者與服務端之間的鏈路不可達,發送超時。現象是:各端狀態正常,但消費端就是沒有消費消息,就像丟失消息一樣。
- _解決措施:_重試
props.put("retries", "10");
-
不恰當配置: 發送消息無
ack
確認; 發送消息失敗無回調,無日誌。producer.send(new ProducerRecord<>(topic, messageKey, messageStr), new CallBack(){...});
- _解決措施:_設置
acks=1
或者acks=all
。發送消息設置回調。
回顧下重要的參數:****acks
-
acks=0
:不需要等待服務器的確認. 這是retries
設置無效. 響應裏來自服務端的offset
總是-1
,producer
只管發不管發送成功與否。延遲低,容易丟失數據。 -
acks=1
:表示leader
寫入成功(但是並沒有刷新到磁盤)後即向producer
響應。延遲中等,一旦leader
副本掛了,就會丟失數據。 -
acks=all
:等待數據完成副本的複製, 等同於-1
. 假如需要保證消息不丟失, 需要使用該設置. 同時需要設置unclean.leader.election.enable
爲true
, 保證當ISR
列表爲空時, 選擇其他存活的副本作爲新的leader
.
2)服務端
先來了解下 Kafka Broker
寫入數據的過程:
-
Broker
接收到一批數據,會先寫入內存PageCache
(OS Cache
)中。 -
操作系統會隔段時間把
OS Cache
中數據進行刷盤,這個過程會是 「異步批量刷盤」。
這裏就有個隱患,如果數據寫入 PageCache
後 Kafka Broker
宕機會怎樣?機子宕機 / 掉電?
-
Kafka Broker
宕機: 消息不會丟失。因爲數據已經寫入PageCache
,只等待操作系統刷盤即可。 -
機子宕機 / 掉電: 消息會丟失。因爲數據仍在內存裏,內存
RAM
掉電後就會丟失數據。
- 解決方案**:使用帶蓄電池後備電源的緩存
cache
,防止系統斷電異常。**
對比學習
MySQL
的 “雙 1” 策略,基本不使用這個策略,因爲 “雙 1” 會導致頻繁的I/O
操作,也是最慢的一種。對比學習
Redis
的AOF
策略,默認且推薦的策略:Everysec
(AOF_FSYNC_EVERYSEC
) 每一秒鐘保存一次(默認):。每個寫命令執行完, 只是先把日誌寫到AOF
文件的內存緩衝區, 每隔一秒把緩衝區中的內容寫入磁盤。
拓展:Kafka
日誌刷盤機制
# 推薦採用默認值,即不配置該配置,交由操作系統自行決定何時落盤,以提升性能。
# 針對 broker 配置:
log.flush.interval.messages=10000 # 日誌落盤消息條數間隔,即每接收到一定條數消息,即進行log落盤。
log.flush.interval.ms=1000 # 日誌落盤時間間隔,單位ms,即每隔一定時間,即進行log落盤。
# 針對 topic 配置:
flush.messages.flush.ms=1000 # topic下每1s刷盤
flush.messages=1 # topic下每個消息都落盤
# 查看 Linux 後臺線程執行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10 # 表示當髒頁佔總內存的的百分比超過這個值時,後臺線程開始刷新髒頁。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000 # 表示髒數據多久會被刷新到磁盤上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500 # 表示多久喚醒一次刷新髒頁的後臺線程(5秒)。
vm.dirtytime_expire_seconds = 43200
Broker
的可靠性需要依賴其多副本機制: 一般副本數 3 個(配置參數:replication.factor=3
)
-
Leader Partition
副本:提供對外讀寫機制。 -
Follower Partition
副本:同步Leader
數據。
副本之間的數據同步也可能出現問題:數據丟失問題和數據不一致問題。
解決方案:ISR
和 Epoch
機制 (具體可看之前的文章)
-
ISR
(In-Sync Replicas
) : 當Le``ader
宕機,可以從ISR
中選擇一個Follower
作爲Leader
。 -
Epoch
機制: 解決Leader
副本高水位更新和Follower
副本高水位更新在時間上是存在錯配問題。Tips
:Kafka 0.11.x
版本才引入leader epoch
機制解決高水位機制弊端。
對應需要的配置參數如下:
-
acks=-1
或者acks=all
: 必須所有副本均同步到消息,才能表明消息發送成功。 -
replication.factor >= 3
: 副本數至少有 3 個。 -
min.insync.replicas > 1
: 代表消息至少寫入 2 個副本纔算發送成功。前提需要acks=-1
。舉個栗子:
Leader
宕機了,至少要保證ISR
中有一個Follower
,這樣這個Follwer
被選舉爲Leader
且不會丟失數據。公式:
replication.factor = min.insync.replicas + 1
-
unclean.leader.election.enable=false
: 防止不在ISR
中的Follower
被選舉爲Leader
。**Kafka 0.11.0.0 版本開始默認
unclean.leader.election.enable=false
**
3)消費端
消費端消息丟失場景有:
- 消息堆積: 幾個分區的消息都沒消費,就跟丟消息一樣。
- 解決措施: 一般問題都出在消費端,儘量提高客戶端的消費速度,消費邏輯另起線程進行處理。
- 自動提交: 消費端拉下一批數據,正在處理中自動提交了
offset
,這時候消費端宕機了; 重啓後,拉到新一批數據,而上一批數據卻沒處理完。
- 解決措施: 取消自動提交
auto.commit = false
,改爲手動ack
。
-
心跳超時,引發
Rebalance
: 客戶端心跳超時,觸發Rebalance
被踢出消費組。如果只有這一個客戶端,那消息就不會被消費了。同時避免兩次
poll
的間隔時間超過閾值:
-
max.poll.records
:降低該參數值,建議遠遠小於<單個線程每秒消費的條數> * <消費線程的個數> * <max.poll.interval.ms>
的積。 -
max.poll.interval.ms
: 該值要大於<max.poll.records> / (<單個線程每秒消費的條數> * <消費線程的個數>)
的值。 -
解決措施: 客戶端版本升級至
0.10.2
以上版本。
案例:凡凡曾遇到數據同步時,消息中的文本需經過 NLP
的 NER
分析,再同步到 ES
。
這個過程的主要流程是:
-
數據同步程序從
Kafka
中拉取消息。 -
數據同步程序將消息內的文本發送的
NER
進行分析,得到特徵數組。 -
數據同步程序將消息同步給
ES
。
現象:線上數據同步程序運行一段時間後,消息就不消費了。
-
排查日誌: 發現有
Rebalance
日誌,懷疑是客戶端消費太慢被踢出了消費組。 -
本地測試: 發現運行一段時間也會出現
Rebalance
,且NLP
的NER
服務訪問HTTP 500
報錯。 -
得出結論: 因
NER
服務異常,導致數據同步程序消費超時。且當時客戶端版本爲v0.10.1
,Consumer
沒有獨立線程維持心跳,而是把心跳維持與poll
接口耦合在一起,從而也會造成心跳超時。
當時解決措施是:
-
session.timeout.ms
: 設置爲25s
,當時沒有升級客戶端版本,怕帶來其他問題。 -
熔斷機制: 增加
Hystrix
,超過 3 次服務調用異常就熔斷,保護客戶端正常消費數據。
3、如何確保消息不丟失?
掌握這些技能:
-
熟悉消息從發送到消費的每個階段
-
監控報警
Kafka
集羣 -
熟悉方案 “MQ 可靠消息投遞”
怎麼確保消息 100% 不丟失?
到這,總結下:
- 生產端:
-
設置重試:
props.put("retries", "10");
-
設置
acks=all
-
設置回調:
producer.send(msg, new CallBack(){...});
- Broker:
-
內存:使用帶蓄電池後備電源的緩存
cache
。 -
Kafka
版本0.11.x
以上:支持Epoch
機制。 -
replication.factor >= 3
: 副本數至少有 3 個。 -
min.insync.replicas > 1
: 代表消息至少寫入 2 個副本纔算發送成功。前提需要acks=-1
。 -
unclean.leader.election.enable=false
: 防止不在ISR
中的Follower
被選舉爲Leader
。
- 消費端
-
客戶端版本升級至
0.10.2
以上版本。 -
取消自動提交
auto.commit = false
,改爲手動ack
。 -
儘量提高客戶端的消費速度,消費邏輯另起線程進行處理。
作者:格格步入
來源:juejin.cn/post/7135101805179961352
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/FbIaTM8xsdsn2hk3uPO7Vg