刨根問底: Kafka 到底會不會丟數據?
大家好,我是 華仔, 又跟大家見面了。
那麼 Kafka 到底會不會丟數據呢?如果丟數據,究竟該怎麼解決呢?
只有掌握了這些, 我們才能處理好 Kafka 生產級的一些故障,從而更穩定地服務業務。
認真讀完這篇文章,我相信你會對 Kafka 如何解決丟數據問題,有更加深刻的理解。
這篇文章乾貨很多,希望你可以耐心讀完。
01 總體概述
越來越多的互聯網公司使用消息隊列來支撐自己的核心業務。由於是核心業務,一般都會要求消息傳遞過程中最大限度的做到不丟失,如果中間環節出現數據丟失,就會引來用戶的投訴,年底績效就要背鍋了。
那麼使用 Kafka 到底會不會丟數據呢?如果丟數據了該怎麼解決呢?爲了避免類似情況發生,除了要做好補償措施,我們更應該在系統設計的時候充分考慮系統中的各種異常情況,從而設計出一個穩定可靠的消息系統。
大家都知道 Kafka 的整個架構非常簡潔,是分佈式的架構,主要由 Producer、Broker、Consumer 三部分組成,後面剖析丟失場景會從這三部分入手來剖析。
02 消息傳遞語義剖析
在深度剖析消息丟失場景之前,我們先來聊聊「消息傳遞語義」到底是個什麼玩意?
所謂的消息傳遞語義是 Kafka 提供的 Producer 和 Consumer 之間的消息傳遞過程中消息傳遞的保證性。主要分爲三種, 如下圖所示:
1)首先當 Producer 向 Broker 發送數據後,會進行 commit,如果 commit 成功,由於 Replica 副本機制的存在,則意味着消息不會丟失,但是 Producer 發送數據給 Broker 後,遇到網絡問題而造成通信中斷,那麼 Producer 就無法準確判斷該消息是否已經被提交(commit),這就可能造成 at least once 語義。
2)在 Kafka 0.11.0.0 之前, 如果 Producer 沒有收到消息 commit 的響應結果,它只能重新發送消息,確保消息已經被正確的傳輸到 Broker,重新發送的時候會將消息再次寫入日誌中;而在 0.11.0.0 版本之後, Producer 支持冪等傳遞選項,保證重新發送不會導致消息在日誌出現重複。爲了實現這個, Broker 爲 Producer 分配了一個 ID,並通過每條消息的序列號進行去重。也支持了類似事務語義來保證將消息發送到多個 Topic 分區中,保證所有消息要麼都寫入成功,要麼都失敗,這個主要用在 Topic 之間的 exactly once 語義。
其中啓用冪等傳遞的方法配置:enable.idempotence = true。
啓用事務支持的方法配置:設置屬性 transcational.id = "指定值"。
3)從 Consumer 角度來剖析, 我們知道 Offset 是由 Consumer 自己來維護的, 如果 Consumer 收到消息後更新 Offset, 這時 Consumer 異常 crash 掉, 那麼新的 Consumer 接管後再次重啓消費,就會造成 at most once 語義(消息會丟,但不重複)。
- 如果 Consumer 消費消息完成後, 再更新 Offset, 如果這時 Consumer crash 掉,那麼新的 Consumer 接管後重新用這個 Offset 拉取消息, 這時就會造成 at least once 語義(消息不丟,但被多次重複處理)。
總結: 默認 Kafka 提供 「at least once」語義的消息傳遞,允許用戶通過在處理消息之前保存 Offset 的方式提供 「at most once」 語義。如果我們可以自己實現消費冪等,理想情況下這個系統的消息傳遞就是嚴格的「exactly once」, 也就是保證不丟失、且只會被精確的處理一次,但是這樣是很難做到的。
從 Kafka 整體架構圖我們可以得出有三次消息傳遞的過程:
1)Producer 端發送消息給 Kafka Broker 端。
2)Kafka Broker 將消息進行同步並持久化數據。
3)Consumer 端從 Kafka Broker 將消息拉取並進行消費。
在以上這三步中每一步都可能會出現丟失數據的情況, 那麼 Kafka 到底在什麼情況下才能保證消息不丟失呢?
通過上面三步,我們可以得出:Kafka 只對 「已提交」的消息做「最大限度的持久化保證不丟失」。
怎麼理解上面這句話呢?
1)首先是 「已提交」的消息:當 Kafka 中 N 個 Broker 成功的收到一條消息並寫入到日誌文件後,它們會告訴 Producer 端這條消息已成功提交了,那麼這時該消息在 Kafka 中就變成 "已提交消息" 了。
這裏的 N 個 Broker 我們怎麼理解呢?這主要取決於對 "已提交" 的定義, 這裏可以選擇只要一個 Broker 成功保存該消息就算已提交,也可以是所有 Broker 都成功保存該消息纔算是已提交。
2)其次是 「最大限度的持久化保證不丟失」,也就是說 Kafka 並不能保證在任何情況下都能做到數據不丟失。即 Kafka 不丟失數據是有前提條件的。假如這時你的消息保存在 N 個 Broker 上,那麼前提條件就是這 N 個 Broker 中至少有 1 個是存活的,就可以保證你的消息不丟失。
也就是說 Kafka 是能做到不丟失數據的, 只不過這些消息必須是 「已提交」的消息,且還要滿足一定的條件纔可以。
瞭解了 Kafka 消息傳遞語義以及什麼情況下可以保證不丟失數據,下面我們來詳細剖析每個環節爲什麼會丟數據,以及如何最大限度的避免丟失數據。
03 消息丟失場景剖析
Producer 端丟失場景剖析
在剖析 Producer 端數據丟失之前,我們先來了解下 Producer 端發送消息的流程,對於不瞭解 Producer 的讀者們,可以查看 聊聊 Kafka Producer 那點事
消息發送流程如下:
1)首先我們要知道一點就是 Producer 端是直接與 Broker 中的 Leader Partition 交互的,所以在 Producer 端初始化中就需要通過 Partitioner 分區器從 Kafka 集羣中獲取到相關 Topic 對應的 Leader Partition 的元數據 。
2)待獲取到 Leader Partition 的元數據後直接將消息發送過去。
3)Kafka Broker 對應的 Leader Partition 收到消息會先寫入 Page Cache,定時刷盤進行持久化(順序寫入磁盤)。
- Follower Partition 拉取 Leader Partition 的消息並保持同 Leader Partition 數據一致,待消息拉取完畢後需要給 Leader Partition 回覆 ACK 確認消息。
5)待 Kafka Leader 與 Follower Partition 同步完數據並收到所有 ISR 中的 Replica 副本的 ACK 後,Leader Partition 會給 Producer 回覆 ACK 確認消息。
根據上圖以及消息發送流程可以得出:Producer 端爲了提升發送效率,減少 IO 操作,發送數據的時候是將多個請求合併成一個個 RecordBatch,並將其封裝轉換成 Request 請求「異步」將數據發送出去(也可以按時間間隔方式,達到時間間隔自動發送),所以 Producer 端消息丟失更多是因爲消息根本就沒有發送到 Kafka Broker 端。
導致 Producer 端消息沒有發送成功有以下原因:
-
網絡原因: 由於網絡抖動導致數據根本就沒發送到 Broker 端。
-
數據原因: 消息體太大超出 Broker 承受範圍而導致 Broker 拒收消息。
另外 Kafka Producer 端也可以通過配置來確認消息是否生產成功:
在 Kafka Producer 端的 acks 默認配置爲 1, 默認級別是 at least once 語義, 並不能保證 exactly once 語義。
既然 Producer 端發送數據有 ACK 機制, 那麼這裏就可能會丟數據的**!!!**
-
acks = 0: 由於發送後就自認爲發送成功,這時如果發生網絡抖動, Producer 端並不會校驗 ACK 自然也就丟了,且無法重試。
-
acks = 1: 消息發送 Leader Parition 接收成功就表示發送成功,這時只要 Leader Partition 不 Crash 掉,就可以保證 Leader Partition 不丟數據,但是如果 Leader Partition 異常 Crash 掉了, Follower Partition 還未同步完數據且沒有 ACK,這時就會丟數據。
-
acks = -1 或者 all: 消息發送需要等待 ISR 中 Leader Partition 和 所有的 Follower Partition 都確認收到消息纔算發送成功, 可靠性最高, 但也不能保證不丟數據, 比如當 ISR 中只剩下 Leader Partition 了, 這樣就變成 acks = 1 的情況了。
## Broker 端丟失場景剖析
接下來我們來看看 Broker 端持久化存儲丟失場景, 對於不瞭解 Broker 的讀者們,可以先看看 聊聊 Kafka Broker 那點事,數據存儲過程如下圖所示:
Kafka Broker 集羣接收到數據後會將數據進行持久化存儲到磁盤,爲了提高吞吐量和性能,採用的是「異步批量刷盤的策略」,也就是說按照一定的消息量和間隔時間進行刷盤。首先會將數據存儲到 「PageCache」 中,至於什麼時候將 Cache 中的數據刷盤是由「操作系統」根據自己的策略決定或者調用 fsync 命令進行強制刷盤,如果此時 Broker 宕機 Crash 掉,且選舉了一個落後 Leader Partition 很多的 Follower Partition 成爲新的 Leader Partition,那麼落後的消息數據就會丟失。
既然 Broker 端消息存儲是通過異步批量刷盤的,那麼這裏就可能會丟數據的**!!!**
-
由於 Kafka 中並沒有提供「同步刷盤」的方式,所以說從單個 Broker 來看還是很有可能丟失數據的。
-
kafka 通過「**多 Partition (分區)多 Replica(副本)機制」**已經可以最大限度的保證數據不丟失,如果數據已經寫入 PageCache 中但是還沒來得及刷寫到磁盤,此時如果所在 Broker 突然宕機掛掉或者停電,極端情況還是會造成數據丟失。
## Consumer 端丟失場景剖析
接下來我們來看看 Consumer 端消費數據丟失場景,對於不瞭解 Consumer 的讀者們,可以先看看 聊聊 Kafka Consumer 那點事, 我們先來看看消費流程:
1)Consumer 拉取數據之前跟 Producer 發送數據一樣, 需要通過訂閱關係獲取到集羣元數據, 找到相關 Topic 對應的 Leader Partition 的元數據。
2)然後 Consumer 通過 Pull 模式主動的去 Kafka 集羣中拉取消息。
3)在這個過程中,有個消費者組的概念(不瞭解的可以看上面鏈接文章),多個 Consumer 可以組成一個消費者組即 Consumer Group,每個消費者組都有一個 Group-Id。同一個 Consumer Group 中的 Consumer 可以消費同一個 Topic 下不同分區的數據,但是不會出現多個 Consumer 去消費同一個分區的數據。
4)拉取到消息後進行業務邏輯處理,待處理完成後,會進行 ACK 確認,即提交 Offset 消費位移進度記錄。
5)最後 Offset 會被保存到 Kafka Broker 集羣中的 __consumer_offsets 這個 Topic 中,且每個 Consumer 保存自己的 Offset 進度。
根據上圖以及消息消費流程可以得出消費主要分爲兩個階段:
-
獲取元數據並從 Kafka Broker 集羣拉取數據。
-
處理消息,並標記消息已經被消費,提交 Offset 記錄。
既然 Consumer 拉取後消息最終是要提交 Offset, 那麼這裏就可能會丟數據的**!!!**
-
可能使用的「自動提交 Offset 方式」
-
拉取消息後「先提交 Offset,後處理消息」,如果此時處理消息的時候異常宕機,由於 Offset 已經提交了, 待 Consumer 重啓後,會從之前已提交的 Offset 下一個位置重新開始消費, 之前未處理完成的消息不會被再次處理,對於該 Consumer 來說消息就丟失了。
-
拉取消息後「先處理消息,在進行提交 Offset」, 如果此時在提交之前發生異常宕機,由於沒有提交成功 Offset, 待下次 Consumer 重啓後還會從上次的 Offset 重新拉取消息,不會出現消息丟失的情況, 但是會出現重複消費的情況,這裏只能業務自己保證冪等性。
04 消息丟失解決方案
上面帶你從 Producer、Broker、Consumer 三端剖析了可能丟失數據的場景,下面我們就來看看如何解決才能最大限度的保證消息不丟失。
## Producer 端解決方案
在剖析 Producer 端丟失場景的時候, 我們得出其是通過「異步」方式進行發送的,所以如果此時是使用「發後即焚」的方式發送,即調用 Producer.send(msg) 會立即返回,由於沒有回調,可能因網絡原因導致 Broker 並沒有收到消息,此時就丟失了。
因此我們可以從以下幾方面進行解決 Producer 端消息丟失問題:
4.1.1 更換調用方式:
棄用調用發後即焚的方式,使用帶回調通知函數的方法進行發送消息,即 Producer.send(msg, callback), 這樣一旦發現發送失敗, 就可以做針對性處理。
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
(1)網絡抖動導致消息丟失,Producer 端可以進行重試。
(2)消息大小不合格,可以進行適當調整,符合 Broker 承受範圍再發送。
通過以上方式可以保證最大限度消息可以發送成功。
4.1.2 ACK 確認機制:
該參數代表了對 "已提交" 消息的定義。
需要將 request.required.acks 設置爲 -1/ all,-1/all 表示有多少個副本 Broker 全部收到消息,才認爲是消息提交成功的標識。
針對 acks = -1/ all , 這裏有兩種非常典型的情況:
(1)數據發送到 Leader Partition, 且所有的 ISR 成員全部同步完數據, 此時,Leader Partition 異常 Crash 掉,那麼會選舉新的 Leader Partition,數據不會丟失, 如下圖所示:
(2)數據發送到 Leader Partition,部分 ISR 成員同步完成,此時 Leader Partition 異常 Crash, 剩下的 Follower Partition 都可能被選舉成新的 Leader Partition,會給 Producer 端發送失敗標識, 後續會重新發送數據,數據可能會重複, 如下圖所示:
因此通過上面分析,我們還需要通過其他參數配置來進行保證:
replication.factor >= 2
min.insync.replicas > 1
這是 Broker 端的配置,下面會詳細介紹。
4.1.3 重試次數 retries:
該參數表示 Producer 端發送消息的重試次數。
需要將 retries 設置爲大於 0 的數, 在 Kafka 2.4 版本中默認設置爲 Integer.MAX_VALUE。另外如果需要保證發送消息的順序性,配置如下:
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
這樣 Producer 端就會一直進行重試直到 Broker 端返回 ACK 標識,同時只有一個連接向 Broker 發送數據保證了消息的順序性。
4.1.4 重試時間 retry.backoff.ms:
該參數表示消息發送超時後兩次重試之間的間隔時間,避免無效的頻繁重試,默認值爲 100ms, 推薦設置爲 300ms。
## Broker 端解決方案
在剖析 Broker 端丟失場景的時候, 我們得出其是通過「異步批量刷盤」的策略,先將數據存儲到 「PageCache」,再進行異步刷盤, 由於沒有提供 「同步刷盤」策略, 因此 Kafka 是通過「多分區多副本」的方式來最大限度的保證數據不丟失。
我們可以通過以下參數配合來保證:
4.2.1 unclean.leader.election.enable:
該參數表示有哪些 Follower 可以有資格被選舉爲 Leader , 如果一個 Follower 的數據落後 Leader 太多,那麼一旦它被選舉爲新的 Leader, 數據就會丟失,因此我們要將其設置爲 false,防止此類情況發生。
4.2.2 replication.factor:
該參數表示分區副本的個數。建議設置 replication.factor >=3, 這樣如果 Leader 副本異常 Crash 掉,Follower 副本會被選舉爲新的 Leader 副本繼續提供服務。
4.2.3 min.insync.replicas:
該參數表示消息至少要被寫入成功到 ISR 多少個副本纔算 "已提交",建議設置 min.insync.replicas > 1, 這樣纔可以提升消息持久性,保證數據不丟失。
另外我們還需要確保一下 replication.factor > min.insync.replicas, 如果相等,只要有一個副本異常 Crash 掉,整個分區就無法正常工作了,因此推薦設置成: replication.factor = min.insync.replicas +1, 最大限度保證系統可用性。
## Consumer 端解決方案
在剖析 Consumer 端丟失場景的時候,我們得出其拉取完消息後是需要提交 Offset 位移信息的,因此爲了不丟數據,正確的做法是:拉取數據、業務邏輯處理、提交消費 Offset 位移信息。
我們還需要設置參數 enable.auto.commit = false, 採用手動提交位移的方式。
另外對於消費消息重複的情況,業務自己保證冪等性, 保證只成功消費一次即可。
05 總結
這裏,我們一起來總結一下這篇文章的重點。
1、從 Kafka 整體架構上概述了可能發生數據丟失的環節。
2、帶你剖析了「消息傳遞語義」的概念, 確定了 Kafka 只對「已提交」的消息做「最大限度的持久化保證不丟失」。
3、帶你剖析了 Producer、Broker、Consumer 三端可能導致數據丟失的場景以及具體的高可靠解決方案。
堅持總結, 持續輸出高質量文章 關注我: 華仔聊技術
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/g7L3kN1FDitZ60EdmU2zxA