你可能用錯了 kafka 的重試機制

Apache Kafka 已成爲跨微服務異步通信的主流平臺。它有很多強大的特性,讓我們能夠構建健壯、有彈性的異步架構。

同時,我們在使用它的過程中也需要小心很多潛在的陷阱。如果未能提前發現可能發生(換句話說就是遲早會發生)的問題,我們就要面對一個容易出錯和損壞數據的系統了。

在本文中,我們將重點介紹其中的一個陷阱:嘗試處理消息時遭遇失敗。首先,我們需要意識到消息消費可能會,而且遲早會遭遇失敗。其次,我們需要確保在處理此類故障時不會引入更多問題。

Kafka 簡介

閱讀本文的讀者應該都對 Kafka 有所瞭解。網上也有一些介紹 Kafka 及其使用方法的深度文章。話雖如此,我們這裏還是先簡要回顧一下對我們的討論很重要的一些概念。

事件日誌、發佈者和消費者

Kafka 是用來處理數據流的系統。從概念上講,我們可以認爲 Kafka 包含三個基本組件:

與 RabbitMQ 之類的傳統消息隊列不同,Kafka 由消費者來決定何時讀取消息(也就是說,Kafka 採用了拉取而非推送模式)。每條消息都有一個偏移量(offset),每個消費者都跟蹤(或提交)其最近消費消息的偏移量。這樣,消費者就可以通過這條消息的偏移量請求下一條消息。

主題

事件日誌分爲幾個主題(topic),每個主題都定義了要發佈給它的消息類型。定義主題是我們這些工程師的責任,所以我們應該記住一些經驗法則:

分區和分區鍵

主題被進一步細分爲多個分區(partition)。分區使消息可以被並行消費。Kafka 允許通過一個 ** 分區鍵(partition key)** 來確定性地將消息分配給各個分區。分區鍵是一段數據(通常是消息本身的某些屬性,例如 ID),其上會應用一個算法以確定分區。

這裏,我們將消息的 UUID 字段分配爲分區鍵。生產者應用一種算法(例如按照分區數修改每個 UUID 值)來將每條消息分配給一個分區。

以這種方式使用分區鍵,使我們能夠確保與給定 ID 關聯的每條消息都會發布到單個分區上。

還需要注意的是,可以將一個消費者的多個實例部署爲一個消費者組。Kafka 將確保給定分區中的任何消息將始終由組中的同一消費者實例讀取。

在微服務中使用 Kafka

Kafka 非常強大。所以它可用於多種環境中,涵蓋衆多用例。在這裏,我們將重點介紹微服務架構中最常見的用法。

跨有界上下文傳遞消息

當我們剛開始構建微服務時,我們許多人一開始採用的是某種中心化模式。每條數據都有一個駐留的單一微服務(即單一真實來源)。如果其他任何微服務需要訪問這份數據,它將發起一個同步調用以檢索它。

這種方法導致了許多問題,包括同步調用鏈較長、單點故障、團隊自主權下降等。

最後我們找到了更好的辦法。在今天的成熟架構中,我們將通信分爲命令處理和事件處理。

命令處理通常在單個有界上下文中執行,並且往往還是會包含同步通信。

另一方面,事件通常由一個有界上下文中的服務發出,並異步發佈到 Kafka,以供其他有界上下文中的服務消費。

左側是我們以前設計微服務通信的方式:一個有界上下文(由虛線框表示)中的服務從其他有界上下文中的服務接收同步調用。右邊是我們如今的做法:一個有界上下文中的服務發佈事件,其他有界上下文中的服務在自己空閒時消費它們。

例如,以一個 User 有界上下文爲例。我們的 User 團隊會構建負責啓用新用戶、更新現有用戶帳戶等任務的應用程序和服務。

創建或修改用戶帳戶後,UserAccount 服務會將一個相應的事件發佈到 Kafka。其他感興趣的有界上下文可以消費該事件,將其存儲在本地,使用其他數據增強它,等等。例如,我們的 Login 有界上下文可能想知道用戶的當前名稱,以便在登錄時向他們致意。

我們將這種用例稱爲跨邊界事件發佈。

在執行跨邊界事件發佈時,我們應該發佈聚合(Aggregate)。聚合是自包含的實體組,每個實體都被視爲一個單獨的原子實體。每個聚合都有一個 “根” 實體,以及一些提供附加數據的從屬實體。

當管理聚合的服務發佈一條消息時,該消息的負載將是一個聚合的某種表示形式(例如 JSON 或 Avro)。重要的是,該服務將指定聚合的唯一標識符作爲分區鍵。這將確保對任何給定聚合實體的更改都將發佈到同一分區。

出問題的時候怎麼辦?

儘管 Kafka 的跨邊界事件發佈機制顯得相當優雅,但畢竟這是一個分佈式系統,因此係統可能會有很多錯誤。我們將關注也許是最常見的惱人問題:消費者可能無法成功處理其消費的消息。

我們現在該怎麼辦?

確定這是一個問題

團隊做錯的第一件事就是根本沒有意識到這是一個潛在的問題。消息失敗時有發生,我們需要制定一種策略來處理它…… 要未雨綢繆,而非亡羊補牢。

因此,瞭解這是一種遲早會發生的問題並設計針對性的解決方案是我們要做的第一步。如果我們做到了這一點,就應該向自己表示一點祝賀。現在最大的問題仍然存在:我們該如何處理這種情況?

我們不能一直重試那條消息嗎?

默認情況下,如果消費者沒有成功消費一條消息(也就是說消費者無法提交當前偏移量),它將重試同一條消息。那麼,難道我們不能簡單地讓這種默認行爲接管一切,然後重試消息直到成功嗎?

問題是這條消息可能永遠不會成功。至少,沒有某種形式的手動干預它是不會成功的。於是乎,消費者就永遠不會繼續處理後續的任何消息,並且我們的消息處理將陷入困境。

好吧,我們不能簡單地跳過那條消息嗎?

我們通常允許同步請求失敗。例如,對我們的 UserAccount 服務所做的一個 “create-user”POST 可能包含錯誤或丟失的數據。在這種情況下,我們可以簡單地返回一個錯誤代碼(例如 HTTP 400),然後要求調用方重試。

雖然這種辦法並不不理想,但這不會對我們的數據完整性造成任何長期問題。那個 POST 代表一條命令,是還沒有發生的事情。即使我們讓它失敗,我們的數據也將保持一致狀態。

當我們丟棄消息時情況並非如此。消息表示已經發生的事件。任何忽略這些事件的消費者都將與生成事件的上游服務不再同步。

所有這些都表明,我們不想丟棄消息。

那麼我們如何解決這個問題呢?

對我們來說這不是什麼容易解決的問題。因此,一旦我們認識到它需要解決,就可以向互聯網諮詢解決方案。但這引出了我們的第二個問題:網上有一些我們可能不應該遵循的建議。

重試主題:流行的解決方案

你會發現最受歡迎的一種解決方案就是重試主題(retry topics)的概念。具體細節因實現而異,但總體概念是這樣的:

概念上講,重試主題模式定義了失敗的消息將被分流到的多個主題。如果主要主題的消費者消費了它無法處理的消息,它會將該消息發佈到重試主題 1 並提交當前偏移量,從而將自身釋放給下一條消息。重試主題的消費者將是主消費者的副本,但如果它無法處理該消息,它將發佈到一個新的重試主題。最終,如果最後一個重試消費者也無法處理該消息,它將把該消息發佈到一個死信隊列(DLQ)。

問題出在哪裏?

看起來這種方法似乎很合理。實際上,它在許多用例中都能正常工作。問題在於它不能充當一種通用解決方案。現實中存在一些特殊用例(例如我們的跨邊界事件發佈),對於這些用例來說,這種方法實際上是危險的。

它忽略了不同類型的錯誤

第一個問題是,它沒有考慮到導致事件消費失敗的兩大原因:可恢復錯誤和不可恢復錯誤。

可恢復錯誤指的是,如果我們多次重試,這些錯誤最終將得以解決。一個簡單的示例是將數據保存到數據庫的消費者。如果數據庫暫時不可用,那麼當下一條消息通過時,消費者將失敗。一旦數據庫再次變得可用,消費者就能夠再次處理該消息。

從另一個角度來看:可恢復錯誤指的是那些根源在消息和消費者外部的錯誤。解決這種錯誤後,我們的消費者將繼續前進,好像無事發生一樣。(很多人在這裏被弄糊塗了。“可恢復” 一詞並不意味着應用程序本身——在我們的示例中爲消費者——可以恢復。相反,它指的是某些外部資源——在此示例中爲數據庫——會失敗並最終恢復。)

關於可恢復錯誤需要注意的是,它們將困擾主題中的幾乎每一條消息。回想一下,主題中的所有消息都應遵循相同的架構,並代表相同類型的數據。同樣,我們的消費者將針對該主題的每個事件執行相同的操作。因此,如果消息 A 由於數據庫中斷而失敗,那麼消息 B、消息 C 等也將失敗。

不可恢復錯誤指的是無論我們重試多少次都將失敗的錯誤。例如,消息中缺少字段可能會導致一個 NullPointerException,或者包含特殊字符的字段可能會使消息無法解析。

與可恢復錯誤不同,不可恢復錯誤通常會影響單個孤立消息。例如,如果只有消息 A 包含不可解析的特殊字符,則消息 B 將成功,消息 C 等也將成功。

與可恢復錯誤不同,解決不可恢復錯誤意味着我們必須修復消費者本身(永遠不要 “修復” 消息本身——它們是不可變的記錄!)例如,我們可能會修復消費者以便正確處理空值,然後重新部署它。

那麼,這與重試主題解決方案有什麼關係?

對於初學者來說,它對可恢復錯誤不是特別有用。請記住,在解決外部問題之前,可恢復錯誤將影響每一條消息,而不僅僅是當前的一條消息。因此可以肯定的是,將失敗的消息分流到重試主題將爲下一條消息清理出通道。但接下來的消息也將失敗,下一條以及再下一條也將失敗。我們最好還是讓消費者自己重試,直到問題解決爲止。

不可恢復的錯誤呢?重試隊列可以在這些情況下提供幫助。如果一條麻煩的消息阻止了所有後續消息的消費,那麼毫無疑問,分流該消息肯定會爲我們的用戶消費清除障礙(當然,多個重試主題是沒必要的)。

但是,雖然重試隊列可以幫助受不可恢復錯誤困擾的消息消費者繼續前進,但它也可能帶來更多隱患。下面我們就進一步分析背後的原因。

它會忽略排序

我們簡要回顧一下跨邊界事件發佈的一些重要環節。在有界上下文中處理一條命令後,我們會將一個對應的事件發佈到一個 Kafka 主題。重要的是,我們會將聚合的 ID 指定爲分區鍵。

爲什麼這很重要?它確保的是對任何給定聚合的更改都會發布到同一分區。

好吧,那這一點爲什麼會那麼重要呢?當事件發佈到同一分區時,可以保證各個事件按照它們發生的順序進行處理。如果對同一聚合進行連續更改,並且所產生的事件發佈到不同的分區,就可能發生爭用狀況,也就是消費者在消費第一個更改之前就消費了第二個更改。這會導致數據不一致。

我們舉個簡單的例子。我們的 User 有界上下文提供了一個允許用戶更改其名稱的應用程序。一位用戶將他的名字從 Zoey 更改爲 Zoë,然後立即又更改爲 Zoiee。如果我們不管排序,則某個下游消費者(例如 Login 有界上下文)可能會先處理對 Zoiee 的更改,然後不久用 Zoë 覆蓋它。

現在,登錄數據與我們的用戶數據已經不同步了。更麻煩的是,每當 Zoiee 登錄我們的網站時都會看到 “歡迎光臨,Zoë!” 的登錄提示。

這纔是重試主題真正出問題的地方。它們讓我們的消費者容易打亂處理事件的順序。如果一個消費者在處理 Zoë 更改時受到某個臨時的數據庫中斷的影響,它會把這個消息分流到一個重試主題,稍後再嘗試。如果在 Zoiee 更改到達時數據庫中斷已得到糾正,則這條消息將先被成功處理,然後再由 Zoë 更改覆蓋。

爲了說明問題,這裏用了 Zoiee/Zoë 這樣一個簡單的示例。實際上,亂序處理事件可能導致會各種各樣的數據損壞問題。更糟糕的是,這些問題很少會在一開始就被注意到。相反,它們所導致的數據損壞往往在一段時間內都不會引起注意,但損壞程度會隨着時間的推移而增長。一般來說,當我們意識到發生了什麼事情時,已經有大量數據受到影響了。

重試主題什麼時候可行?

需要明確的是,重試主題並非一直都是錯誤的模式。當然,它也存在一些合適的用例。具體來說,當消費者的工作是收集不可修改的記錄時,這種模式就很不錯。這樣的例子可能包括:

這類消費者可能會從重試主題模式中受益,同時沒有數據損壞的風險。

不過,請注意

即使存在這種用例,我們仍應謹慎行事。構建這樣的解決方案既複雜又耗時。因此,作爲一個組織,我們不想爲每個新的消費者編寫一個新的解決方案。相反,我們要創建一個統一的解決方案,比如一個庫或一個容器等,可以在各種服務之間重複使用。

還存在另一個問題。我們可能會爲相關消費者構建一個重試主題的解決方案。不幸的是,不久之後,這個解決方案就會進入跨邊界事件發佈消費者的領域了。擁有這些消費者的團隊可能沒有意識到風險的存在。正如我們前面所討論的那樣,在發生重大數據損壞之前,他們可能不會意識到任何問題。

因此,在實現重試主題解決方案之前,我們應 100%確定:

我們如何改善這種模式?

鑑於重試主題模式可能不是跨邊界事件發佈消費者的可接受解決方案,我們是否可以對其做一些調整來改善它呢?

一開始,本文想要提供一種完整的解決方案。但之後我意識到,並不存在什麼萬能的路徑。因此,我們將只討論一些在制定合適解決方案時需要考慮的事項。

消除錯誤類型

如果我們能夠在可恢復錯誤和不可恢復錯誤之間消除歧義,生活就會變得輕鬆許多。例如,如果我們的消費者開始遇到可恢復錯誤,那麼重試主題就變得多餘了。

因此,我們可以嘗試確定所遇到的錯誤類型:

void processMessage(KafkaMessage km) {
  try {
    Message m = km.getMessage();
    transformAndSave(m);
  } catch (Throwable t) {
    if (isRecoverable(t)) {
      // ...
    } else {
      // ...
    }
  }
}

在上面的 Java 僞代碼示例中,isRecoverable() 將採用一種白名單方法來確定 t 是否表示可恢復錯誤。換句話說,它檢查 t 以確定它是否與任何已知的可恢復錯誤(例如 SQL 連接錯誤或 ReST 客戶端超時)相匹配,如果匹配則返回 true,否則返回 false。這樣就能防止我們的消費者被不可恢復錯誤一直阻塞下去。

誠然,要在可恢復錯誤和不可恢復錯誤之間消除歧義可能很困難。例如,一個 SQLException 可能指的是一次數據庫故障(可恢復)或一次約束違反狀況(不可恢復)。如有疑問,我們可能應該假設錯誤是不可恢復的——爲此要冒的風險是將其他好的消息發送給隱藏主題,從而延遲它們的處理…… 但這也能避免我們無意間陷入泥潭,無休止地嘗試處理不可恢復錯誤。

在消費者內重試可恢復錯誤

正如我們所討論的那樣,存在可恢復錯誤時,將消息發佈到重試主題毫無意義。我們只會爲下一條消息的失敗掃清道路。相反,消費者可以簡單地重試,直到條件恢復。

當然,出現可恢復錯誤意味着外部資源存在問題。我們不斷對這塊資源發送請求是無濟於事的。因此,我們希望對重試應用一個退避策略。我們的僞 Java 代碼現在可能看起來像這樣:

void processMessage(KafkaMessage km) {
  try {
    Message m = km.getMessage();
    transformAndSave(m);
  } catch (Throwable t) {
    if (isRecoverable(t)) {
      doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);
    } else {
      // ...
    }
  }
}

(注意:我們使用的任何退避機制都應配置爲在達到某個閾值時向我們發出警報,並通知我們潛在的嚴重錯誤)

遇到不可恢復錯誤時,將消息直接發送到最後一個主題

另一方面,當我們的消費者遇到不可恢復錯誤時,我們可能希望立即隱藏(stash)該消息,以釋放後續消息。但在這裏使用多個重試主題會有用嗎?答案是否定的。在轉到 DLQ 之前,我們的消息只會經歷 n 次消費失敗而已。那麼,爲什麼不從一開始就將消息粘貼在那裏呢?

與重試主題一樣,這個主題(在這裏,我們將其稱爲隱藏主題)將擁有自己的消費者,其與主消費者保持一致。但就像 DLQ 一樣,這個消費者並不總是在消費消息;它只有在我們明確需要時纔會這麼做。

考慮排序

來看看排序的情況。我們在這裏重用之前的 “用戶 / 登錄” 示例。嘗試處理 Zoë 名稱中的 ë 字符時,Login 消費者可能會遇到錯誤。消費者將其識別爲一個不可恢復錯誤,將消息放在一邊,然後繼續處理後續消息。不久之後,消費者將獲得 Zoiee 消息併成功處理它。

Zoë 消息已隱藏,並且 Zoiee 消息現在已成功處理完畢。目前,兩個有界上下文之間的數據是一致的。

晚些時候,我們的團隊會修復消費者,以便其可以正確處理特殊字符並重新部署它。然後,我們將 Zoë 消息重新發布給消費者,消費者現在可以正確處理該消息了。

當更新的消費者隨後處理隱藏的 Zoë 消息後,兩個有界上下文之間的數據將變得不一致。因此,當 User 有界上下文將用戶視爲 Zoiee 時,Login 有界上下文會將她稱爲 Zoë。

顯然,我們沒有保持排序;Zoë 是在 Zoiee 之前由 Login 消費者處理的,但正確的順序是倒過來的。隱藏一條消息後,我們可以開始隱藏所有消息,但在那種情況下我們實際上會陷入困境。幸運的是,我們不需要保持所有消息的順序,只需考慮與單個聚合相關聯的消息即可。因此,如果我們的消費者可以跟蹤已隱藏的特定聚合,它就可以確保屬於同一聚合的後續消息也被隱藏。

收到隱藏主題中消息的警報後,我們可以取消部署消費者並修復其代碼(請注意:切勿修改消息本身;消息代表不可變的事件!)在修復並測試了我們的消費者之後,我們可以重新部署它。當然,在繼續使用主要主題之前,我們將需要特別注意先處理隱藏主題中的所有記錄。這樣,我們將繼續保持正確的排序狀態。出於這個原因,我們將首先部署隱藏消費者,並且只有在其完成時(這意味着消費者組中的所有實例都完成,如果我們使用了多個消費者),我們纔會取消部署它並部署主消費者。

我們還應該考慮以下事實:固定的消費者處理了隱藏消息後,它仍可能會遇到其他錯誤。在這種情況下,其錯誤處理行爲應像我們之前描述的那樣:

爲此,我們可以考慮使用第二個隱藏主題。

可以接受一些數據不一致?

這樣的系統構建起來可能會變得相當複雜。它們可能很難構建、測試和維護。因此,某些組織可能會想要確定出數據不一致的可能性,並判斷他們是否可以承受這種風險。

在許多情況下,這些組織可能會採用數據協調機制,以使他們的數據最終(是相對較長的 “最終”)變得一致。爲此也存在許多策略(超出了本文的範圍)。

總結

處理重試似乎很複雜,那是因爲它就是這麼麻煩——和一切正常時 Kafka 相對優雅的風格相比之下尤其明顯。我們構建的任何合適的解決方案(無論是重試主題、隱藏主題還是其他解決方案)都將比我們想要的更復雜。

不幸的是,如果我們希望在微服務之間建立彈性的異步通信流,那麼我們就不能忽略它。

本文介紹了一種流行的解決方案、它的缺點以及在設計替代解決方案時應考慮的一些事項。到最後,想要構建正確的解決方案,我們就應該牢記一些事情,例如:

參考資料

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/XIz0EDFke4SBsZGpTQpofA