一種並行,背壓的 Kafka Consumer

 介紹

幾乎所有 Kafka Consumer 教程都是下面的代碼:

KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)
// Subscribe to Kafka topics
consumer.subscribe(topics);

while (true) {
    // Poll Kafka for new messages
    ConsumerRecords<String, String> records = consumer.poll(100);
    // Processing logic
    for (ConsumerRecord<String, String> record : records) {
        doSomething(record);
    }
  }
}

基本上就是創建一個 Kafka consumer,然後訂閱對應的 topics,然後就可以無限消費數據了,消費到數據後對每一條消息進行處理,這個過程我們叫做‘拉取然後循環處理’(poll-then-process loop)。

這相當簡單,易於實施,人們可能一直在生產中使用它而沒有任何問題。但是,此模型存在各種問題,我們將在下一節中詳細介紹。

 問題

 可能沒有按照預期的那樣獲取數據

看上面的代碼,我們開發者可能會認爲 poll 是一種向 Kafka 發出需求信號的方式。我們的消費者僅在完成對先前消息的處理後才進行輪詢以獲取更多消息。如果它的處理速度很慢,Kafka 將充當‘減震器’,確保即使在生產速度高得多的情況下我們也不會丟失任何消息。另一方面,當處理速度較慢時,連續獲取數據之間的間隔也會增加,這是有問題的,因爲 max.poll.interval.ms 配置有一個默認的(5 分鐘)上限:

max.poll.interval.ms

使用消費者組管理時調用 poll() 之間的最大延遲。這爲消費者在獲取更多記錄之前可以空閒的時間量設置了上限。如果在此超時到期之前未調用 poll(),則認爲消費者失敗,組將進行 rebalance,以便將分區重新分配給另一個成員。

換句話說,如果我們的消費者沒有在每個 max.poll.interval.ms 中至少調用一次 poll ,那它就像死了一樣。發生這種情況時,Kafka 會執行一個 rebalance 過程,將已死消費者的當前工作分配給其消費者組的其他成員。這在已經很慢的處理速率中引入了更多的開銷和延遲。

更糟糕的是,如果處理導致一個消費者的速度變慢,很可能會導致其他消費者接管其工作時出現同樣的問題。此外,假定的死亡消費者在下一次輪詢時嘗試重新加入組時也可能導致重新平衡(請記住,這是一個無限循環!)。這兩者都使得 rebalance 一次又一次地發生,進一步減緩了消費。

現在,還有另一種配置可以幫助解決這種情況:

max.poll.records

單次調用 poll() 返回的最大記錄數。請注意, max.poll.records 不會影響底層的獲取行爲。消費者將緩存來自每個獲取請求的記錄,並從每次輪詢中返回它們。

將此設置爲較低的值,我們的消費者將在每次輪詢時處理更少的消息。因此輪詢間隔將減少。或者,我們也可以將 max.poll.interval.ms 增加到更大的值。如果我們不能擺脫 poll-then-process 循環,這應該可以暫時解決問題。然而,它並不理想。

首先,這些配置是在我們啓動消費者時設置的,但它們是否工作取決於消息或應用程序。我們可能會爲每個應用程序專門設置它們,但最終,我們正在玩猜謎遊戲並祈禱我們很幸運。

其次,在最壞的情況下,rebalance 過程開始可能需要兩倍於 max.poll.interval.ms 的持續時間:

我們從不希望 rebalance 花費更多時間,因此設置更高的 max.poll.interval.ms 並不是很好。

最後,這些配置意味着我們的消費者被 “期望” 頻繁地輪詢,至少每 max.poll.interval.ms 一次,無論它在做什麼類型的處理。如果不包含這種期望,poll-then-process 循環不僅會誤導開發人員,而且註定會失敗。

 消息處理是異步的

Kafka 只保證一個分區內消息的順序。來自不同分區的消息是不相關的,可以並行處理。這就是爲什麼在 Kafka 中,一個主題中的分區數是並行度的單位。

理論上,我們可以通過運行與主題上的分區數量一樣多的消費者來輕鬆實現最大並行度。然而,實際上,這開銷太大,更不用說它對增加 rebalance 機會的影響,因爲有很多的消費者可能是來來去去(come and go)。

如果我們再次查看我們的消費者代碼,它可以訂閱多個主題並可能接收來自多個分區的消息。然而,在處理這些消息時,它會一一處理。這不是最優的。

現在,假設我們的處理邏輯非常簡單,我們可以只使用線程池來並行化它嗎?例如,通過向線程池提交一個處理任務,對於每條消息?

嗯,它僅在我們不關心處理排序和保證(例如最多一次、至少一次等)時纔有效。因此在實踐中它不是很有用。

 一個更好的模型

 概述

poll-then-process 循環的許多挫折來自不同的關注點——輪詢、處理、偏移提交——混合在一起的情況。結果,當我們將它們分成獨立的組件時,我們最終得到了一個改進的模型,它可以適當地支持並行處理和背壓。下面更詳細地描述了每個組件。

 Work Queues

Work Queues 是 Poller 和 Executor 之間的通信通道:

 Poller

簡而言之,Poller 封裝了 Kafka 中與 poll 相關的一切:

pause(Collection partitions)

暫停從請求的分區中提取。未來對 poll(Duration) 的調用將不會從這些分區返回任何記錄,直到使用 resume(Collection) 恢復它們。

 Executor

Executor 就像一個線程池,它在其中維護多個 worker 來處理消息:

通過這種設置,一個分區內的消息按順序處理,而來自不同分區的消息並行處理。

 Offset Manager

Kafka 中的每條消息都與一個偏移量 (offset) 相關聯——一個整數,表示它在當前分區中的位置。通過存儲這個數字,我們實質上爲我們的消費者提供了一個檢查點。如果它失敗並返回,它知道從哪裏繼續。因此,在 Kafka 中實現各種處理保證至關重要:

Kafka 的自動提交呢?Confluent 聲稱:

使用自動提交可以讓您 “至少一次”(at least once) 交付:Kafka 保證不會丟失任何消息,但重複消息是可能的。

這適用於交付,但是,它不爲處理提供任何保證:

因此,我們總是將 enable.auto.commit 設置爲 false 並讓 Offset Manager 手動管理偏移量。

 實現處理保證

讓我們通過幾個示例用例來了解組件如何協同工作以滿足不同的處理保證。

 最多一次 (At-most-once)

對於最多一次,我們只需要在處理消息之前提交偏移量。我們可以在處理每條消息之前立即執行此操作。但是,在引入更多成本的同時,並沒有給我們更強的保證。因此,Poller 對此負責。每次輪詢後,它將告訴偏移管理器保存這些偏移量並等待來自 Kafka 的成功確認,然後再將消息排隊以進行處理。

在 rebalance 事件之前,它只需要向 Executor 發送一個即發即棄的信號以停止處理。然後它取消工作隊列並返回等待 rebalance。丟失的消息是那些仍在隊列中或正在處理中的消息。如果我們想在不影響 rebalance 持續時間的情況下優化更少的丟失,我們可以使用更小的隊列大小。

 至少一次 (At-least-once)

對於至少一次,我們只需要確保僅在成功處理消息後才保存偏移量。因此,如果我們要處理 10 條消息,我們不需要爲所有消息保存偏移量,而只需要保存最後一條消息。

在此設置中,Executor 將在每次完成對消息的處理時向 Offset Manager 發出信號。偏移量管理器跟蹤每個分區的最新偏移量 - 並決定何時將它們提交給 Kafka。例如,我們可以將 Offset Manager 設置爲每 5 秒提交一次。無論新消息是否出現,都會發生這種情況。

在 rebalance 事件之前,Poller 設置了一個硬性截止日期,並通知 Executor 結束其正在進行的處理,並通知 Offset Manager 以跟進最後一次提交。如果截止日期已經過去,或者 Poller 收到了其他人的響應,它會取消工作隊列並返回等待 rebalance。

爲了優化減少重複處理,我們可以:

 確切一次 (Exactly-once),外部管理的偏移量

在這種情況下,需要在一個事務中進行偏移保存和消息處理。這意味着 Executor 和 Offset Manager 使用同步調用緊密合作以實現它。

在 rebalance 事件之後,輪詢器向偏移管理器詢問當前分配的已保存偏移量。然後它會在恢復輪詢之前嘗試恢復保存的位置。

public void seek(TopicPartition partition, long offset)

覆蓋消費者將在下一次輪詢(超時)時使用的獲取偏移量。

在 rebalance 事件之前,Poller 會通知 Executor 並等待其響應。Executor 回滾其正在進行的事務並返回到 Poller。Poller 然後取消工作隊列並返回等待 rebalance。

 總結

我們分析了 loop-then-process 循環的各種問題,並提出了一個更合適的模型來理解和實現 Kafka Consumer。缺點是它要複雜得多,對於初學者來說可能並不容易。我們將這種複雜性歸咎於 Kafka 及其低級 API。

在實踐中,我們可能不會自己做,而是使用一個現成的庫,它可能基於也可能不基於類似模型:Alpakka Kafka、Spring for Kafka、zio-kafka 等...... 即便如此,所提出的模型對於評估這些解決方案或實施新的解決方案也很有用。

來源:

https://www.toutiao.com/article/7095235111150879262/?log_from=f5c5aad449665_1652927718906

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/doRVtk4-N-dLcryQ7s84GQ