kafka producer 在 aws 又掛了

之前公司因爲 aws 的 kafka 服務上的副本數配置不正確,所以在 aws 例行重啓時會導致 producer hang,連鎖導致消費斷連,當時總結了一篇簡單的文章:

aws 上 kafka 服務更新導致斷連一例 [1]

然而在將隊列的副本數都修正之後,發現 producer hang 從高概率必現變成了低概率必現。。這就讓人頭痛了。

雖然我們也保留了問題的現場,把各種日誌多種姿勢 Google 檢索,但始終沒有找到任何線索,本來還想偷個懶,看看能不能直接照抄解決方案,看來沒得參考了。

好吧,自己動手豐衣足食,不讀代碼是不行了,之前 sarama 的 producer 內部邏輯用了比較多的 channel,看着比較煩一直沒有認真讀,現在只能自己搞了。

我們先和公司內的隊列運維同學以及 aws 的 msk 售後經過了多輪溝通,確定了 producer hang 的一些場景特徵:

出問題的時候,producer 的日誌長這樣:

send msg succ 584
send msg succ 585
send msg succ 586
send msg succ 587
2022/10/21 06:20:54 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:54 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:54 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:54 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:55 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:55 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:55 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 producer/txnmanager rolling over epoch due to publish failure on t5/11

之後日誌便不再滾動,注意這裏最後一行的日誌 producer/txnmanager rolling over epoch due to publish failure on t5/11 因爲比較特殊,直接用文本去代碼裏搜索,可以找到代碼的位置在:

這裏 msg.hasSequence 是在 producer 設置了 idempotent 時纔會爲 true 的:

這說明我們部門目前的 producer 設置了 idempotent,和之前讀代碼時的認知相符。

既然知道了 idempotent 的特徵,就需要看看 idempotent 的流程和非 idempotent 的流程有什麼不同,經過一番確認和篩選,最終看到區別在 retry 流程。當 produce 發生錯誤時,有一類是 sdk 認爲可以通過重試自動恢復的錯誤:

case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:

從日誌中可以得知,我們觸發的 producer error 就是 ErrNotLeaderForPartition  和 ErrLeaderNotAvailable,都是可以通過重試恢復的錯誤。

重試時,若用戶設置了 produce 爲 idempotent,則會進入 retryBatch 邏輯:

這裏的 retryBatch 判斷批量重試過程中,某條消息如果重試次數超標了,那麼就會直接從函數中返回,而我們在閱讀這段代碼的時候會發現,緊跟着超 retry.max 的邏輯,當獲取 partition 的 leader 邏輯出錯時,每一條消息都返回了錯誤,這已經能夠給我們足夠的提示了:這裏不應該在一次 returnError 時直接 return,而應該將批量消息中的每一條消息都返回錯誤。

可能大多數讀者對 sarama 的 producer 邏輯不太瞭解,我們這裏簡單畫一個圖:

簡單來說,用戶調用 producer 的 SendMessages 接口,sarama 的 sdk 會給每一條 message 生成一個 ProducerMessage 對象,且對象內部會自帶一個 channel,成功時,該 channel 需要返回 nil,失敗時,需要返回 error 信息,每一條消息都必須有明確的成功 or 失敗,因爲 SendMessages 函數中會等待每一條消息的 expectation channel 有內容返回才能正常向下執行:

在 producer 的 sdk 中,這條 ProducerMessage 雖然傳遞鏈路較長,會從 asyncProducer.input -> topicProducer.input -> partitionProducer.input -> brokerProducer.input 一路傳遞,但最終的 response 都是從 msg 的 expectation channel 中返回的。

如果某條消息的 expectation channel 沒返回,那麼就會導致用戶的 syncProducer 無限 hang 下去。

閱讀代碼和流程分析到這裏,我們已經基本可以知道原因了,這個問題的觸發流程是這樣的:

  1. aws 執行安全更新,broker 滾動重啓

  2. broker 下線期間,某些 topic 的 leader election 較慢,經過了 1-2s 才把新的 leader 選出來

  3. 我們部門的 kafka producer 使用了 idempotent = true 和 sendmessages 批量發送接口和默認的 producer.retry = 3,producer.backoff = 100ms

  4. 當 broker 下線且 leader 未選舉出時,經過 3 次後,leader 依然未恢復,這時由於 sarama 的 bug,導致某些消息的 expectation channel 一直沒有 resp/err 返回

  5. 之後 producer 就永遠 hang 在 SendMessages 函數上了

這個問題後來也提給了 sarama 官方,不過外企很 wlb,至今依然沒有回覆:

https://github.com/Shopify/sarama/issues/2377

https://github.com/Shopify/sarama/pull/2378

我反思了一下,爲什麼這個問題其它公司沒怎麼遇到過,Google 又搜不出來呢?

所以這個坑只有我們踩到了,沒有辦法。

[1]

aws 上 kafka 服務更新導致斷連一例: https://xargin.com/aws-produce-hang-case/

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/U5Okkf9FzLxxSYcsUI2DZg