Kafka 爲什麼會丟消息?

引入 MQ 消息中間件最直接的目的:系統解耦以及流量控制(削峯填谷)

引入 MQ 同樣帶來其他問題:數據一致性。

在分佈式系統中,如果兩個節點之間存在數據同步,就會帶來數據一致性的問題。消息生產端發送消息到 MQ 再到消息消費端需要保證消息不丟失。

所以在使用 MQ 消息隊列時,需要考慮這 3 個問題:

1、如何知道有消息丟失?

如何感知消息是否丟失了?可總結如下:

  1. 他人反饋: 運營、PM 反饋消息丟失。

  2. 監控報警: 監控指定指標,即時報警人工調整。Kafka 集羣異常、Broker 宕機、Broker 磁盤掛載問題、消費者異常導致消息積壓等都會給用戶直接感覺是消息丟失了。

案例:輿情分析中數據採集同步

當感知消息丟失了,那就需要一種機制來檢查消息是否丟失。

檢索消息

運維工具有:

  1. 查看 Kafka 消費位置:
# 查看某個topic的message數量
$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test_topic
# 查看consumer Group列表
$ ./kafka-consumer-groups.sh  --list  --bootstrap-server 192.168.88.108:9092
# 查看 offset 消費情況
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-1152 --describe
GROUP                 TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                           HOST            CLIENT-ID
console-consumer-1152 test_topic      0          -               4               -               consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942 /127.0.0.1      consumer-console-consumer-1152-1
  1. 利用工具:Kafka Tools

  1. 其他可見化界面工具

2、哪些環節可能丟消息?

一條消息從生產到消費完成經歷 3 個環節:消息生產者、消息中間件、消息消費者。

哪個環節都有可能出現消息丟失問題。

1)生產端

首先要認識到 Kafka 生產端發送消息流程:

調用 send() 方法時,不會立刻把消息發送出去,而是緩存起來,選擇恰當時機把緩存裏的消息劃分成一批數據,通過 Sender 線程按批次發送給服務端 Broker

此環節丟失消息的場景有: 即導致 Producer 消息沒有發送成功

  1. 網絡波動: 生產者與服務端之間的鏈路不可達,發送超時。現象是:各端狀態正常,但消費端就是沒有消費消息,就像丟失消息一樣。
  1. 不恰當配置: 發送消息無 ack 確認; 發送消息失敗無回調,無日誌。

    producer.send(new ProducerRecord<>(topic, messageKey, messageStr), 
                              new CallBack(){...});

回顧下重要的參數:****acks

2)服務端

先來了解下 Kafka Broker 寫入數據的過程:

  1. Broker 接收到一批數據,會先寫入內存 PageCacheOS Cache)中。

  2. 操作系統會隔段時間把 OS Cache 中數據進行刷盤,這個過程會是 「異步批量刷盤」

這裏就有個隱患,如果數據寫入 PageCacheKafka Broker宕機會怎樣?機子宕機 / 掉電?

  • 解決方案**:使用帶蓄電池後備電源的緩存 cache,防止系統斷電異常。**
  1. 對比學習 MySQL 的 “雙 1” 策略,基本不使用這個策略,因爲 “雙 1” 會導致頻繁的 I/O 操作,也是最慢的一種。

  2. 對比學習 RedisAOF 策略,默認且推薦的策略:Everysec(AOF_FSYNC_EVERYSEC) 每一秒鐘保存一次(默認):。每個寫命令執行完, 只是先把日誌寫到 AOF 文件的內存緩衝區, 每隔一秒把緩衝區中的內容寫入磁盤。

拓展:Kafka 日誌刷盤機制

# 推薦採用默認值,即不配置該配置,交由操作系統自行決定何時落盤,以提升性能。
# 針對 broker 配置:
log.flush.interval.messages=10000 # 日誌落盤消息條數間隔,即每接收到一定條數消息,即進行log落盤。
log.flush.interval.ms=1000        # 日誌落盤時間間隔,單位ms,即每隔一定時間,即進行log落盤。
# 針對 topic 配置:
flush.messages.flush.ms=1000  # topic下每1s刷盤
flush.messages=1              # topic下每個消息都落盤
# 查看 Linux 後臺線程執行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10      # 表示當髒頁佔總內存的的百分比超過這個值時,後臺線程開始刷新髒頁。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000    # 表示髒數據多久會被刷新到磁盤上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500  # 表示多久喚醒一次刷新髒頁的後臺線程(5秒)。
vm.dirtytime_expire_seconds = 43200

Broker 的可靠性需要依賴其多副本機制: 一般副本數 3 個(配置參數:replication.factor=3

副本之間的數據同步也可能出現問題:數據丟失問題和數據不一致問題。

解決方案:ISREpoch 機制具體可看之前的文章

對應需要的配置參數如下:

  1. acks=-1 或者 acks=all 必須所有副本均同步到消息,才能表明消息發送成功。

  2. replication.factor >= 3 副本數至少有 3 個。

  3. min.insync.replicas > 1 代表消息至少寫入 2 個副本纔算發送成功。前提需要 acks=-1

    舉個栗子:Leader 宕機了,至少要保證 ISR 中有一個 Follower,這樣這個Follwer被選舉爲Leader 且不會丟失數據。

    公式:replication.factor = min.insync.replicas + 1

  4. unclean.leader.election.enable=false 防止不在 ISR 中的 Follower 被選舉爲 Leader

    **Kafka 0.11.0.0 版本開始默認 unclean.leader.election.enable=false **

3)消費端

消費端消息丟失場景有:

  1. 消息堆積: 幾個分區的消息都沒消費,就跟丟消息一樣。
  1. 自動提交: 消費端拉下一批數據,正在處理中自動提交了 offset,這時候消費端宕機了; 重啓後,拉到新一批數據,而上一批數據卻沒處理完。
  1. 心跳超時,引發 Rebalance 客戶端心跳超時,觸發 Rebalance被踢出消費組。如果只有這一個客戶端,那消息就不會被消費了。

    同時避免兩次 poll 的間隔時間超過閾值:

案例:凡凡曾遇到數據同步時,消息中的文本需經過 NLPNER 分析,再同步到 ES

這個過程的主要流程是:

  1. 數據同步程序從 Kafka 中拉取消息。

  2. 數據同步程序將消息內的文本發送的 NER 進行分析,得到特徵數組。

  3. 數據同步程序將消息同步給 ES

現象:線上數據同步程序運行一段時間後,消息就不消費了。

當時解決措施是:

  1. session.timeout.ms 設置爲 25s,當時沒有升級客戶端版本,怕帶來其他問題。

  2. 熔斷機制: 增加 Hystrix,超過 3 次服務調用異常就熔斷,保護客戶端正常消費數據。

3、如何確保消息不丟失?

掌握這些技能:

  1. 熟悉消息從發送到消費的每個階段

  2. 監控報警 Kafka 集羣

  3. 熟悉方案 “MQ 可靠消息投遞”

怎麼確保消息 100% 不丟失?

到這,總結下:

  1. 生產端:
  1. Broker:
  1. 消費端

作者:格格步入

來源:juejin.cn/post/7135101805179961352

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/FbIaTM8xsdsn2hk3uPO7Vg