kafka rebalance 機制詳解
首先我們回憶一下 kafka 的基本的架構,kafka 分爲 producer、broker 和 consumer。 爲了能夠橫向擴展提升消息發送消費性能, 通過增加 partition 的方式,讓消息能夠保存到多個 broker 上。 爲了提升消費性能,增加了 consumer group 的概念,一個 topic 下同一個 consumer group 中的不同 consumer 實例可以共同消費提升消費能力
多個 consumer 消費同一個 topic 上的不同 partition,就需要對 consumer 進行協調,控制哪個 consumer 消費哪個 partition,並且要對異常情況進行處理, 比如新增 consumer、consumer 重啓、consumer 機器宕機等情況。
rebalance protocol
rebalance protocol 定義了一套資源分配協調的協議,consumer 消費 partition 是協議的一種應用。
其中的概念有
members: 成員,比如同一個 consumer group 中的 consumer resources: 資源,比如 consumer 要消費的 partition
FindCoordinator
consumer 啓動後,會向 broker 發送 FindCoordinator 請求查找 topic 和 consumer group 的 coordinator。 之後 consumer 的請求都會和這個 coordinator 進行通信。
JoinGroup
consumer 獲取到 coordinator 後,會向 coordinator 發送 JoinGroup 請求,表示希望加入到 consumer group 中, 請求參數中會包含
groupId: consumer group, consumer 參數配置裏的 group.id sessionTimeoutMs: consumer 參數配置裏的 session.timeout.ms,默認 45 秒,如果超過這個時間 consumer 沒有發送心跳給 coordinator,coordinator 就會將 consumer 從 group 中移除,發起新的 rebalance。 rebalanceTimeoutMs: 對於 consumer 場景,使用的是 consumer 的 max.poll.interval.ms 配置,具體的作用後面會介紹 protocolType: 表示具體的協議類型,consumer 場景發送 CONSUMER protocols: 包含 consumer subscribe 的 topic、支持的 assignor 等信息。
發送完 JoinGroup 請求後,如果當前的 consumer group 中沒有 member,coordinator 會等待group.initial.rebalance.delay.ms
默認 3 秒,再返回 JoinGroup 的結果。 等待一段時間是爲了等待一下其他的 consumer,第一個加入到 group 中的 member 會被認定爲 group 中的 leader,leader 會負責進行 partition 的 assignment 分配。
PartitionAssignor assign
consumer 收到 JoinGroup 的 response 後,如果發現自己是 leader,則會從 response 中拿到所有的 partition 和 members 信息以及 protocolName(assignor 的名稱) 然後使用 assignor 對 partition 進行分配綁定,綁定到各個 consumer 中。
默認的 Assignor 是 RangeAssignor,會按照範圍分配 partition 給各個 consumer,如果不能平分,則前幾個 consumer 會各自多分配一個 partition。 比如現在有 3 個 consumer,8 個 partition。 第一個 consumer 會獲得 3 個 partition,第二個 consumer 獲得連續的 3 個 partition, 第三個 consumer 獲得 2 個 partition。
SyncGroup
在接收到 JoinGroup 的結果後,leader 會對 partition 進行 assign 分配,然後發起 SyncGroup 請求將 assignment 結果發送給 coordinator。 其他的 consumer 也會向 coordinator 發送 SyncGroup 請求,assignments 參數是空的,目的是獲取 leader 的分配結果。
SyncGroup 的返回結果中會包含 partition 的分配結果,收到分配結果後,consumer 會保存到內存中,後面 consumer 通過 poll 拉取消息時就會向這些 partition 拉取。 同時因爲 leader 計算可能有一定延遲,所以其他的 consumer 的 SyncGroup 請求不一定能獲取到 assignment 結果,沒有結果時 coordinator 會返回一個可以重試的錯誤碼, consumer 會進行重試(具有迴避策略)。
Heartbeat 心跳機制
rebalance protocol 需要考慮到 consumer 可能重啓、故障、或新增減少 consumer 節點等情況。 每個 consumer 會定時向 coordinator 發送心跳,發送心跳可以表明當前 consumer 的存活狀態(間隔是 heartbeat.interval.ms),另外還可以從 coordinator 獲取信息,比如是否要進行 rebalance。
當發生如下情況時,會觸發 rebalance
-
consumer 重啓,比如升級服務,consumer 關閉前會向 coordinator 發送 LeaveGroup 請求,表示退出 group
-
consumer 長時間沒有發送心跳,比如因爲宕機或 GC 長時間沒有發送心跳(間隔超過 session.timeout.ms)
-
新增或減少 consumer 節點,擴縮容
-
partition 數量變化
也就是當 coordinator 發現 members 或 partition 出現變化,都會發起 rebalance。
coordinator 會爲每個 group 維護一個 GroupState,分爲 Empty, Stable, PreparingRebalance, CompletingRebalance, Dead。
-
Empty: group 中沒有 member,比如所有的 member 都從 group 中離開。
-
Stable: 現在 group 狀態穩定,消費者可以正常消費
-
PreparingRebalance: 當 coordinator 發現 group 的 member 或 partition 發生變化,會轉入到 PreparingRebalance 階段,consumer 發送心跳會得知這一狀態,然後重新在下次 poll 時發起 JoinGroup 請求。PreparingRebalance 階段會等待 JoinGroup 請求收集 group 中的 member,直到所有已知的 member(除去已知因爲超時不在 group 中的 member)已經加入或者等待時間超過了 rebalanceTimeoutMs(這是第一次 JoinGroup 時發送的參數默認 5 分鐘)。
-
CompletingRebalance: 當 PreparingRebalance 狀態下,所有的已知的 member 已經發送了 JoinGroup 請求或者超過了等待時間,會轉變狀態爲 CompletingRebalance,然後給 leader 的 JoinGroup 請求返回結果,包含當前 group 中的 member 信息,leader 收到 JoinGroup 的 Response 後計算 assignment,然後通過 SyncGroup 發送給 coordinator,收到 leader 的 SyncGroup 請求後,狀態變爲 Stable。然後 coordinator 將 assignment 通過 consumer 的 SyncGroup 請求,返回給各個 consumer,
所以 rebalance 主要包含兩個部分,一個是 PreparingRebalance 收集 group 的 member 信息,收集完成後進入到 CompletingRebalance,等待 leader 計算 assignment,leader 通過 SyncGroup 發送 assignment 後進入到 Stable 狀態。
新節點加入觸發 rebalance 的時序圖如下
stop the word 問題
開始 rebalance 後,consumer 發現現在在 rebalance,需要放下手中的活,也就是本地清除消費的 partition,然後調用 JoinGroup,等待 JoinGroup 返回,再發送 SyncGroup,等待 SyncGrup 的結果後,才能從中找到新的 partition 開始消費。所以在中間這段時間內,所有的 consumer 都是停頓的,沒有消費消息,可能會有一段時間的中斷。爲了優化這個問題,kafka 增加了一種 COOPERATIVE 模式的協議。
rebalance 協議有兩種模式,EAGER 和 COOPERATIVE。 默認是 EAGER,EAGER 模式下在 JoinGroup 之後,重新 SyncGroup 之前,consumer 會放棄 (revoke) 自己本地的所有 partition,暫定從這些 partition 中消費。 COOPERATIVE 模式下,consumer 只會 revoke 之前分配給自己但是新的 assignment 沒有分配給自己的 partition。
相比之下,EAGER 模式會有一段時間的消費中斷,COOPERATIVE 模式對 consumer 消費的影響更小。
思考
爲什麼 rebalance protocol 要由 consumer 中的 leader 負責分配而不是由 coordinator 分配?
是爲了增加擴展性,開發者可以在使用 consumer 時實現自定義的 Assignor 分配策略。
可能出現重複消費、消息消費丟失嗎
重複消費是有可能出現的,比如一個 consumer 因爲 GC 停住導致 rebalance,其他 consumer 消費它的 partition,但是原來的 consumer 在 GC 恢復後繼續處理之前拉取的那一批消息就會出現重複。 當然還有很多其他的重複的場景,如果要求比較嚴格的程序,要注意做好冪等處理。
消息沒有消費到也是有可能的,比如如果 consumer 拉取完任務後提交到另一個線程池處理任務,因爲 consumer 默認會異步提交 offset,如果這時 consumer 異常掛掉(比如容器或機器重啓),則沒有消費完的消息就會丟失,因爲其他的 consumer 拉取 partition 時會從提交的最新 offset 開始拉取。 如果對消息可靠性要求比較嚴格,consumer 最好通過程序手動控制 offset 的提交,結合冪等性保證可靠性。
總結
rebalance 協議能夠讓 consumer 更加均衡的分配 partition,能夠對 consumer、partition 的變化進行響應。整體流程依賴 JoinGroup, SyncGroup, Heartbeat 請求通信, group 中的 leader 通過 assignor 分配 partition。
更多參考
- https://bytejava.cn/md/kafka/kafka-rebalance.html
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/W5ecJWji3WdUegsvjTCcpQ