汽車之家數據傳輸平臺建設實踐

分享嘉賓:劉首維 @汽車之家

內容來源:Flink 中文社區

導讀**:**數據接入與傳輸作爲打通數據系統與業務系統的一道橋樑,是數據系統與架構中不可或缺的一個重要部分。數據傳輸系統穩定性和準確性,直接影響整個數據系統服務的 SLA 和質量。此外如何提升系統的易用性,保證監控服務並降低系統維護成本,優雅應對災難等問題也十分重要。本文介紹了汽車之家實時計算團隊利用 Flink 和 Flink 實時平臺構建數據傳輸 SDK 和傳輸平臺並不斷完善的實踐經驗與總結。

01

背景與需求

汽車之家(下稱之家)作爲一家數據智能驅動的公司,天然存在着對數據的各種複雜需求,之家的數據系統負責支撐這些業務需求的開展。數據傳輸系統,作爲其中一環,承擔了各類數據導入分發的需求,支持用戶訂閱數據變更。隨着支撐的業務擴增與需求的增加。原來的接入系統暴露出了一定的問題和不足:

針對上述問題,我們決定開發一套新的數據傳輸和分發系統,一舉解決上述問題。

02

技術選型與設計 —— Why Flink?

在開展新系統的開發工作之前,我們分析的可選的方案思路大體分三種:       

  1. 完全自研(類似於 otter)              

  2. 複用市面上的開源組件 (Maxwell/Canal/Debezium) 進行二次開發和整合

  3. 基於 Flink 進行組件的開發

我們規約出以下主要設計使用目標:

此外,在性能指標上,接入系統的延時和吞吐至少要滿足所有業務常規狀態****下的需求

(1) 指與實時計算平臺整合的能力

方案設計與對比

依照設計思路和目標,我們整理了方案主要功能的對比表格:

s5UZTR

(1)Flink 自帶高可用和故障恢復,實時計算平臺在此基礎上提供更強的高可用服務

(2) 良好的編碼 + flink 機制即可實現 Exactly-Once

(3) 實時計算平臺自帶任務部署管理能力

(4) 實時計算平臺自帶完備的監控和管理

經過討論,大家一致決定基於 Flink 進行新的傳輸平臺的開發:

  1. Flink DataStream 的編程模型和 API 在應對數據傳輸場景上,非常的自然與直接

  2. Flink 在框架層面提供了一致性保證和 HA / 穩定性 / 流量控制措施,讓我們可以不必去處理這些開發上比較困難和複雜的問題,背靠框架即可較爲輕鬆地完成相關工作

  3. Flink 天然具備橫向縱向擴容的能力,按需使用計算資源即可

  4. 完全複用了之家 Flink 實時計算平臺已有的組件和能力——完備的監控報警 / 任務生命週期管理 / 異地多活 / 自助運維等功能

我們的 MVP 版本開發完成大約只花費了不到 3 周的時間,POC 的結果完全符合預期的性能要求和功能要求。

03

數據傳輸系統的設計架構

從邏輯層面來看,之家的實時數據傳輸平臺分爲 3 部分:

在實現上:

組件架構與交互邏輯

傳輸系統涉及到的組件和交互如圖所示:

AutoDTS 即爲傳輸系統的任務信息管理模塊,AutoStream Core 爲 Flink 實時平臺核心系統,Jar Service 是 Flink 相關 SDK Jar 儲存管理服務,Metastore 爲 Flink 平臺的元數據管理系統,Flink Client 是我們自己封裝的 Submit Client,支持以  Restful 方式向 YARN/K8S 上提交作業。

AutoDTS 前端直接與用戶進行交互,完成用戶對任務信息的修改和任務生命週期的操作。AutoDTS 將任務信息處理後與 Flink 平臺交互,每一個數據傳輸任務對應 Flink 平臺唯一一個任務,同時,部分任務信息被 AutoDTS 處理,會直接在  Metastore 上完成對應流表的創建。用戶直接申請並使用該 Flink 流表,進行 SQL 任務的開發。

針對不同的傳輸任務,AutoDTS 會委託 Core System 組織任務參數和 SQL 邏輯,並從 Jar Service 加載不同的 SDK Jar 提交到 Client 去執行,對於基於 SQL Codegen 的傳輸任務,Flink SQL Codegen Service 會將任務參數組織整合翻譯成可執行的 Flink SQL 任務,通過 SQL 任務,我們可以直接複用平臺 SQL SDKs,執行 SQL 作業。

正如前文提到的,我們最大限度複用已有組件和服務,大大降低了開發的週期。

傳輸任務類型與構成

之家的數據傳輸任務分爲兩種類型,接入任務與分發任務。

如圖所示,接入的數據源主要有 3 種,除了 Mysql 和 SqlServer,我們還支持了  TiDB 的 Changelog(TiCDC) 接入 Java Client 相關邏輯, 並將我們的代碼貢獻到了  TiDB 社區 [1];對於分發端,通過解析用戶的任務配置,從而進行 SQL codegen 生成 Flink SQL 代碼執行。

04

基於 Flink 的 Binlog 接入 SDK

在這些接入和分發 SDK 中,Binlog 接入 SDK 是比較有難度的一個,下面我們以  Binlog 接入 SDK 爲例,剖析接入 SDK 的主體設計思路和開發過程。

Stage 拆解

依照 Flink 經典的 Source->Transformation->Sink,Binlog 接入任務也拆分爲這三個 Stage:

Binlog Source

Binlog Source 的樸素開發思路:創建一個 BinaryLogClient 並持續  fetchBinlogEvent 並進行簡單的轉換處理後發送到下游。在既定的設計目標中,以下問題需要認真思考:

  1. 保證 Source 端處理性能

  2. 保證 source 是可回溯的

  3. 保證 Mysql Transaction 的完整性

對於問題 1,考慮到 Binlog Stream 的特殊性,我們要求 Source 的並行度爲且僅能爲 1。且在絕大部分情況下,從 BinaryLogClient fetch BinlogEvent 不會是性能瓶頸。我們只要保證 BinaryLogClient 與 BinlogSourceFunction 的生命週期一致,二者通過有界的阻塞隊列鏈接,分別充當生產者和消費者,同時  BinlogSourceFunction 對 BinlogEvent 儘可能少的進行邏處理,讓  BinlogSourceFunction 的負擔儘量減輕,從而提升 Source 階段的性能即可。

而對於問題 2、3,則需要從 Binlog 的特性和格式來分析。衆所周知,BinlogEvent  攜帶了唯一的 BinlogPosition。BinlogPosition 是全序的,我們可以在 trigger Checkpoint 的時候,對當前的 BinlogPosition 進行記錄。但是僅僅是記錄這個是不夠的,如果記錄了數據位置,那麼下次從 Checkpoint 恢復的時候,是從當條記錄開始還是當條記錄的下一條記錄開始呢?另一方面,我們希望發送的按照一個完整的  transaction 去發送數據給下游而非從事務中間截斷髮送。這裏,我們就要用到  BinlogEvent 的一種特定事件——TransactionEnd 事件。

我們這裏先來解決問題 2,我們要求 BinlogSourceFunction 只使用 TransactionEnd 事件的 BinlogPosition 來更新位點保存到狀態中,由於  TransactionEnd 事件不是 DML 事件,不會導致下游生成數據,所以就不需要考慮之前提到的問題。

而問題 3 的解決需要和 Flink 的 Checkpoint 機制進行聯動。我們當時使用的 Flink 版本是 1.9.x。在 Source 端,需要通過 CheckpointLock 來讓 Source 和 Checkpoint trigger 進行配合。雖然在理解和使用上有一定的壁壘,但是 CheckppointLock 機制恰恰幫助我們達成了問題 3 的目標。我們保證了 Source 只有拿到 lock 才發送數據給下游,只有在完成一次 transaction 的數據發送後才 unlock,這樣就保證了 2 個  checkpoint 之間必定是完整的 𝒳( 𝒳 ∈ N )次 transaction 的數據。另一方面,我們減小了 checkpoint trigger 的間隔(200ms~500ms),減少了 checkpoint 間的數據 transaction 的數量,加快數據 commit 的速度。

UnifiedFormatTransform

就如名字描述的,UnifiedFormatTransform 的作用是將數據轉換爲統一制定的數據格式。

相較於 Binlog Source 階段,UnifiedFormatTransform 階段不用太過擔心性能問題,良好的編碼和水平垂直擴容能力可以應付絕大部分性能需求。但是有一個重要的問題亟待解決,就是前面提到的功能設計目標:完全防禦 DDL 帶來的問題

DDL 問題在數據同步 / 傳輸中一直是一個比較棘手的問題,帶來的麻煩包括不限於,數據解析失敗 / 錯誤,程序失敗 / 重啓,且恢復的成本往往很高。而其實解決這個問題的核心思路也很簡單,就是在程序中就地解析 DDL 並處理 Schema 變化。爲了實現這個功能,我們需要完成以下幾個步驟:

我們這實現上參考了 Maxwell [2] 的做法,內嵌了 Antlr4 的 Mysql 文法的 g4 文件,然後自定義 listener 來完成對 Schema 的更新和 DDL 數據的生成,然後 Schema 會在 Checkpoint 觸發時被保存到狀態中。

完成了就地解決 DDL 的功能後,不論是簡單的 Alter Table,還是複雜的 Online DDL,接入程序都可以順利解決,利用狀態從斷點恢復,也不會出現 Schema 異常的問題。

Kafka Sink

Kafka Sink 階段主要是將轉換好的數據寫入 Kafka 中。Flink 原生爲 Kafka Sink 賦予了 Exactly-Once 的能力,而我們也將這個功能利用起來,和 Source 一起,提供了開箱即用的端到端 Exactly-Once 解決方案。我們保證了 Source 按照完整的 Mysql Transaction 發送數據,同時 Sink 按照完整的 Mysql Transaction 將數據寫入 Kafka,對於 Transaction 敏感的場景,我們可以開啓 Transactional 消費模式,來完成強 transaction 語義(而非最終一致性)的數據處理。

其他優化

此外我們還做了一些優化功能:

05

平臺使用

用戶在傳輸平臺,只需要完成必要配置的設定,即可完成傳輸任務的創建和數據的使用,比較簡單。

接入任務

對於接入任務,正如我們前文提到的, 接入任務產生的數據會被作爲公共資產。所以用戶只需要查詢需求的表的數據是否已經接入,如果已經接入,則可以直接申請使用,否則發起一次表接入申請,審批通過後會由系統自動進行操作。

分發任務

對於分發作業,需要用戶進行創建,以 Iceberg 分發任務爲例:

字段篩選

選擇出分發作業使用的已經接入到平臺的數據源表字段

在選擇一些任務的運行配置(如資源,運行環境)後,就可以創建並運行一個分發任務,我們可以看到對應唯一一個 Flink 平臺任務 ID:

此外,我們還提供了豐富的監控查詢,元數據信息查詢等功能,充分利用了實時計算平臺的已有組件,實現了傳輸系統與實時計算系統的緊密結合。

06

總結與展望

最近我們在數據湖方向投入了較多的精力,傳輸系統目前也已經初步支持數據接入數據湖,未來希望可以不斷完善相關功能,大幅提升數據湖數據接入的能力,支持用戶一鍵入湖,加強整個數據體系的整合。

另一方面,我們看到 Flink 新版本提供了許多新功能新工具。例如 FLIP-27 Source 和 OperatorCoordinator,我們希望可以藉由這兩個全新的機制和工具,繼續優化我們的代碼,拓展相關功能。對於新推出 Upsert-Kafka,我們已經開始嘗試在 Flink 計算平臺上進行初步的開發和整合,希望之後將 Upsert-Kafka 與傳輸系統打通,繼續擴展與豐富實時計算和傳輸的業務場景!

參考資料:

[1] https://github.com/pingcap/ticdc/pull/804

[2] https://github.com/zendesk/maxwell

作者介紹:

劉首維,本科畢業於大連理工大學,Apache Flink Contributor,Scala/Akka 重度愛好者,19 年加入汽車之家負責實時計算平臺和數據傳輸平臺數據的開發和維護。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/_C7NALtBKQ3kraR3RVhAvg