DataLeap 的 Catalog 系統近實時消息同步能力優化

字節數據中臺 DataLeap 的 Data Catalog 系統通過接收 MQ 中的近實時消息來同步部分元數據。Apache Atlas 對於實時消息的消費處理不滿足性能要求,內部使用 Flink 任務的處理方案在 ToB 場景中也存在諸多限制,所以團隊自研了輕量級異步消息處理框架,很好的支持了字節內部和火山引擎上同步元數據的訴求。本文定義了需求場景,並詳細介紹框架的設計與實現。

1. 背景

1.1 動機

字節數據中臺 DataLeap 的 Data Catalog 系統基於 Apache Atlas 搭建,其中 Atlas 通過 Kafka 獲取外部系統的元數據變更消息。在開源版本中,每臺服務器支持的 Kafka Consumer 數量有限,在每日百萬級消息體量下,經常有長延時等問題,影響用戶體驗。

在 2020 年底,我們針對 Atlas 的消息消費部分做了重構,將消息的消費和處理從後端服務中剝離出來,並編寫了 Flink 任務承擔這部分工作,比較好的解決了擴展性和性能問題。然而,到 2021 年年中,團隊開始重點投入私有化部署和火山公有云支持,對於 Flink 集羣的依賴引入了可維護性的痛點。

在仔細的分析了使用場景和需求,並調研了現成的解決方案後,我們決定投入人力自研一個消息處理框架。當前這個框架很好的支持了字節內部以及 ToB 場景中 Data Catalog 對於消息消費和處理的場景。

本文會詳細介紹框架解決的問題,整體的設計,以及實現中的關鍵決定。

1.2 需求定義

使用下面的表格將具體場景定義清楚。

obkQYH

1.3 相關工作

在啓動自研之前,我們評估了兩個比較相關的方案,分別是 Flink 和 Kafka Streaming。

Flink 是我們之前生產上使用的方案,在能力上是符合要求的,最主要的問題是長期的可維護性。在公有云場景,那個階段 Flink 服務在火山雲上還沒有發佈,我們自己的服務又有嚴格的時間線,所以必須考慮替代;在私有化場景,我們不確認客戶的環境一定有 Flink 集羣,即使部署的數據底座中帶有 Flink,後續的維護也是個頭疼的問題。另外一個角度,作爲通用流式處理框架,Flink 的大部分功能其實我們並沒有用到,對於單條消息的流轉路徑,其實只是簡單的讀取和處理,使用 Flink 有些 “殺雞用牛刀” 了。

另外一個比較標準的方案是 Kafka Streaming。作爲 Kafka 官方提供的框架,對於流式處理的語義有較好的支持,也滿足我們對於輕量的訴求。最終沒有采用的主要考慮點是兩個:

2. 設計

2.1 概念說明

2.2 框架架構

整個框架主要由 MQ Consumer, Message Processor 和 State Manager 組成。

3. 實現

3.1 線程模型

每個 Task 可以運行在一臺或多臺實例,建議部署到多臺機器,以獲得更好的性能和容錯能力。

每臺實例中,存在兩組線程池:

兩類 Thread 的性質分別如下:

3.2 StateManager

在 State Manager 中,會爲每個 Partition 維護一個優先隊列(最小堆),隊列中的信息是 Offset,兩個優先隊列的職責如下:

MQ Consumer 會週期性的檢查當前可以 Commit 的 Offset,情況枚舉如下:

注意:當發生 Consumer 的 Rebalance 時,需要將對應 Partition 的隊列清空

3.3 KeyBy 與 Delay Processing 的支持

因源頭的 Topic 和消息格式有可能不可控制,所以 MQ Consumer 的職責之一是將消息統一封裝爲 Event。

根據需求,會從原始消息中拼裝出 Event Key,對 Key 取 Hash 後,相同結果的 Event 會進入同一個隊列,可以保證分區內的此類事件處理順序的穩定,同時將消息的消費與處理解耦,支持增大內部隊列數量來增加吞吐。

Event 中也支持設置是否延遲處理屬性,可以根據 Event Time 延遲固定時間後處理,需要被延遲處理的事件會被髮送到有界延遲隊列中,有界延遲隊列的實現繼承了 DelayQueue,限制 DelayQueue 長度, 達到限定值入隊會被阻塞。

3.4 異常處理

Processor 在消息處理過程中,可能遇到各種異常情況,設計框架的動機之一就是爲業務邏輯的編寫者屏蔽掉這種複雜度。Processor 相關框架的邏輯會與 State Manager 協作,處理異常並充分暴露狀態。比較典型的異常情況以及處理策略如下:

3.5 監控

爲了方便運維,在框架層面暴露了一組監控指標,並支持用戶自定義 Metrics。其中默認支持的 Metrics 如下表所示:

dkVrkk

4. 線上運維 Case 舉例

實際生產環境運行時,偶爾需要做些運維操作,其中最常見的是消息堆積和消息重放。

對於 Conusmer Lag 這類問題的處理步驟大致如下:

消息重放被觸發的原因通常有兩種,要麼是業務上需要重放部分數據做補全,要麼是遇到了事故需要修復數據。爲了應對這種需求,我們在框架層面支持了根據時間戳重置 Offset 的能力。具體操作時的步驟如下:

5. 總結

爲了解決字節數據中臺 DataLeap 中 Data Catalog 系統消費近實時元數據變更的業務場景,我們自研了輕量級消息處理框架。當前該框架已在字節內部生產環境穩定運行超過 1 年,並支持了火山引擎上的數據地圖服務的元數據同步場景,滿足了我們團隊的需求。

下一步會根據優先級排期支持 RocketMQ 等其他消息隊列,並持續優化配置動態更新,監控報警,運維自動化等方面。

6. 關於我們

火山引擎大數據研發治理套件 DataLeap

一站式數據中臺套件,幫助用戶快速完成數據集成、開發、運維、治理、資產、安全等全套數據中臺建設,幫助數據團隊有效的降低工作成本和數據維護成本、挖掘數據價值、爲企業決策提供數據支撐。點擊閱讀原文立即體驗產品!

歡迎加入字節跳動 數據平臺 官方羣,進行數據技術交流、獲取更多內容乾貨

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/ge9KEcMBc2B-s020ff4_zw