Kafka 消費組 rebalance 原理

消費者組是 Kafka 分佈式消息處理的一個重要特徵,用於管理消費者並促進擴展應用程序的能力。它們將任何一個主題的消費者組合在一起,並且主題內的分區被分配給這些消費者。當組的參與者發生變化時,消費者組 rebalance 可能由許多因素觸發,這會導致在消費者之間重新分配分區。在 rebalance 期間,消息處理暫停,影響吞吐量。

在本文中,將介紹消費者組的角色、消費者組 rebalance 以及導致 rebalance 的觸發器。詳細說明了影響 rebalance 持續時間和觸發 rebalance 時間的配置。在下一篇文章中,將介紹 rebalance 期間對應用程序消息處理的影響以及可以應用的 rebalance 策略。探討了減少不必要的 rebalance 和減輕 rebalance 影響的選項。

消費羣體

當應用程序實現了一個 Kafka 消費者來消費來自某個主題的消息時,該消費者屬於一個消費者組。在消費者組中,消費者被分配主題分區以進行消費。組成員在代理端(broker)進行管理,分區分配在客戶端進行管理。代理不知道資源是什麼以及它們是如何在消費者之間分配的。這是 Kafka 客戶端被視爲 "胖" 客戶端的一個很好的例子。

消費者配置了 group.id ,因此具有相同 group.id 的任何其他消費者實例都將屬於同一個消費者組。這有助於擴展消費者的能力,並且這與增加主題中的分區數量相結合提供了一種增加消息吞吐量的機制。

Group Coordinator 管理消費者組和消費者。這是一個位於代理端的 Kafka 組件。它將讓一個消費者成爲領導者,這將負責計算主題分區分配。這些將返回給 Group Coordinator,然後由 Group Coordinator 將分區分配給消費者。

給定一個應用程序實例,其中 group.id 爲'foo' 的消費者正在監聽特定主題,並且該主題有六個分區,然後消費者將輪詢所有六個分區中的消息。

圖 1:單個消費者組和一個消費者

現在啓動應用程序的第二個實例。因此,這將啓動具有相同 group.id 的 “foo” 的第二個消費者實例。第二個消費者實例向 Group Coordinator 發送 JoinGroup 請求,並且在消費者組中重新分配分區以分散負載。消費者組中有兩個成員,每個消費者實例分配三個分區。

圖 2:具有兩個消費者的單個消費者組 “foo”

啓動第三個應用程序,組協調器再次重新分配分區,每個消費者現在輪詢來自兩個分區的消息。

如果消費者實例多於分區,那麼這些額外的消費者將不會分配任何分區。一個主題分區將永遠只有一個消費者從給定的消費者組中收聽它。所以一個由 5 個消費者組成的消費者組,監聽一個具有 3 個分區的主題,將有 2 個空閒消費者。

如果一個消費者以不同的 group.id 配置啓動(就像不同服務的情況一樣),並且它正在偵聽相同的主題,那麼這將是一個單獨的消費者組的一部分。它的分區分配獨立於任何其他消費者組的分配。

圖 3:兩個消費者組'foo' 和'bar'

Rebalance 觸發器

發生消費者組 rebalance 的原因有多種。

除此之外,任何其他重新分配資源的需求都將觸發重新平衡。一個示例是創建一個主題,其中爲消費者配置了與該主題名稱匹配的模式訂閱。

當一個新的消費者加入一個消費者組時,它會向代理上的組協調器發送一個 JoinGroup 請求。然後在組中的所有一個或多個消費者之間重新分配主題分區。同樣,當消費者離開組時,它會通過 LeaveGroup 請求通知組協調器,該請求再次在剩餘的消費者之間重新分配主題分區(如果有的話)。

當 Group Coordinator 在預期的時間範圍內沒有收到消費者的消息時,無論是心跳還是下一次 poll() 調用,它都會將消費者從組中驅逐,認爲它可能已經失敗。主題分區再次被重新分配給組中剩餘的任何其他消費者。

如果一個服務有多個訂閱互斥主題但共享同一個 group.id 的消費者,那麼任何一個消費者觸發的 rebalance 仍然會影響組中的其他消費者。在以下場景中,消費者 A 訂閱了主題 abc,而消費者 B 訂閱了主題 def。他們在同一個消費者組 foo 中。如果消費者 A 處理一個批次的時間過長並且超時,那麼它將從消費者組中刪除,從而觸發 rebalance。組中的所有分區分配都被撤銷和重新分配,包括 Consumer B 的分配。

圖 4:跨越主題的消費者組

當消費者 A 最終完成其輪詢並重新加入消費者組時,將觸發進一步的 rebalance,並且隨着分區被撤銷和重新分配,所有處理再次停止。因此,爲收聽不同主題的消費者定義單獨的消費者組可能是謹慎的。例如 [service]-[topic]-consumer-group。

Rebalance 配置

概述

對於 Apache Java Kafka 客戶端,以下是消費者的關鍵配置,這些配置會影響 rebalance 需要多長時間才能完成,以及何時消費者可能被代理視爲失敗,從而觸發 rebalance。

以下部分檢查這些配置參數的影響。

心跳和會話超時

消費者定期向 Group Coordinator(位於 broker 上)發送心跳。這允許 Group Coordinator 監控組中消費者的健康狀況。必須在 session.timeout.ms 內收到心跳,並根據 heartbeat.interval.ms 發送心跳。當 Group Coordinator 收到心跳時 session.timeout.ms 會重置,它會響應消費者,並且必須在此重置超時內接收下一個消費者心跳。

圖 5:消費者的心跳

建議將 heartbeat.interval.ms 配置爲不超過 session.timeout.ms 的三分之一。這確保瞭如果由於例如瞬態網絡問題而丟失一兩個心跳,則不會認爲消費者失敗。在此圖中,有兩個心跳丟失,但第三個在會話超時之前到達,因此 Group Coordinator 知道消費者仍然健康。

圖 6:失敗的心跳

如果消費者確實失敗並停止心跳,那麼一旦會話超時到期,它就會從消費者組中被逐出,從而導致消費者組 rebalance。

圖 7:消費者失敗

輪詢間隔

心跳在與主處理線程不同的線程上執行。消費者在主處理線程上輪詢其主題分區,每次調用 poll() 都必須在配置的 max.poll.interval.ms 內發生。下圖添加了消費者處理線程,顯示了該線程的職責以及心跳線程的職責。

圖 8:消費者心跳和輪詢

對 poll() 的第一次調用,以及對 poll() 的任何調用,包括分區分配等更改,都會導致啓動心跳線程。每個後續的 poll() 調用都會重新開始輪詢時間,這樣它就有這個完整的 max.poll.interval.ms 可以在其中完成。

心跳線程檢查消費者處理的狀態,如果在輪詢之間超過了 max.poll.interval.ms,那麼它會發送一個 LeaveGroup 請求而不是心跳。Group Coordinator 將消費者從消費者組中移除,從而觸發 rebalance。

圖 9:消費者超過輪詢間隔

當觸發 rebalance 時,現有消費者將收到對其下一個 “rebalance” 心跳的響應。每個消費者在 max.poll.interval.ms 超時之前通過調用 poll() 重新加入組,因爲這會觸發對組協調器的 JoinGroup 請求。請注意,對於 Kafka Connect,爲此提供了單獨的超時,即 rebalance.timeout.ms。

因此,配置 max.poll.interval.ms 需要仔細考慮。將其設置得太低,風險在於單個輪詢中消耗的一批消息未及時處理,導致 rebalance 和重複消息傳遞。將間隔設置得太高,這意味着當消費者確實失敗時,代理需要更長的時間才能意識到並重新分配消費者的分區。在此處理期間,分配給失敗消費者的主題分區上的消息被卡住。

消費者健康

因此,有兩個超時需要考慮,這與消費者何時被認爲是健康的或失敗並被逐出消費者組有關。如果主處理線程死亡,而心跳線程仍在運行,則通過超出 max.poll.interval.ms 來檢測故障。如果整個應用程序死了,那麼這將通過 session.timeout.ms 內沒有收到心跳來檢測。

max.poll.interval.ms 本質上是消費者處理的主要健康檢查。但是,通過在單獨的線程上使用心跳檢查,這意味着可以更快地檢測到整個應用程序發生故障。

來源:

https://www.toutiao.com/article/7165489289122497027/?log_from=1b0698bc67f8b_1668395837555

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/BhRpUzV-ZaKBQEorF-Z8zQ