伴魚基於 Flink 構建數據集成平臺的設計與實現

**摘要:**數據倉庫有四個基本的特徵:面向主題的、集成的、相對穩定的、反映歷史變化的。其中數據集成是數據倉庫構建的首要前提,指將多個分散的、異構的數據源整合在一起以便於後續的數據分析。將數據集成過程平臺化,將極大提升數據開發人員的效率,本文主要內容爲:

  1. 數據集成 VS 數據同步

  2. 集成需求

  3. 數據集成 V1

  4. 數據集成 V2

  5. 線上效果

  6. 總結

**Tips:FFA 大會以及 Hackathon 比賽重磅開啓,點擊「閱讀原****文」**瞭解詳情~

A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.

—— Bill Inmon

一、數據集成 VS 數據同步

「數據集成」往往和「數據同步」在概念上存在一定的混淆,爲此我們對這二者進行了區分:

二者的關係如下圖所示:

目前「數據同步平臺」的建設正在我們的規劃之中,但這並不影響「數據集成平臺」的搭建,一些同步的需求可提前在「實時計算平臺」創建,以「約定」的方式解耦。

值得一提的是「數據集成」也應當涵蓋「數據採集」(由特定的工具支持) 和「數據清洗」(由採集粒度、日誌規範等因素決定) 兩部分內容,這兩部分內容各個公司都有自己的實現,本文將不做詳細介紹。

二、集成需求

目前伴魚內部數據的集成需求主要體現在三塊:Stat Log (業務標準化日誌或稱統計日誌)、TiDB 及 MongoDB。除此之外還有一些 Service Log、Nginx Log 等,此類不具備代表性不在本文介紹。另外,由於實時數倉正處於建設過程中,目前「數據集成平臺」只涵蓋離線數倉 (Hive)。

由於以上兩種類型的數據集成方式差異較大,下文將分別予以討論。

三、數據集成 V1

伴魚早期「數據集成平臺」已具備雛形,這個階段主要是藉助一系列開源的工具實現。隨着時間推進,這個版本暴露的問題也逐漸增多,接下來將主要從數據流的角度對 V1 進行闡述,更多的細節問題將在 V2 版本的設計中體現。

3.1 Stat Log

日誌的集成並未接入平臺,而是煙囪式的開發方式,數據集成的鏈路如下圖所示:

Kafka 中的數據先經過 Flume 同步至 HDFS,再由 Spark 任務將數據從 HDFS 導入至 Hive 並創建分區。整體鏈路較長且引入了第三方組件(Flume)增加了運維的成本,另外 Kafka 的原始數據在 HDFS 冗餘存儲也增加了存儲的開銷。

3.2 DB

DB 數據的集成主要是基於查詢的方式(批的方式,通過 Select 查詢進行全表掃描得到快照數據)實現,其鏈路如下圖所示:

用戶通過平臺提交集成任務,由 Airflow 定時任務掃描集成平臺元數據庫,生成對應的取數任務 (TiDB 的數據通過 Sqoop 工具,MongoDB 的數據則通過 Mongoexport 工具)。可以看到 V1 版本並沒有獲取數據庫的變更的日誌數據,不能滿足對數據變更過程的分析訴求。

由於 Sqoop 任務最終要從 TiDB 生產環境的業務數據庫獲取數據,數據量大的情況下勢必對業務數據庫造成一定的影響。Mongoexport 任務直接作用在 MongoDB 的隱藏節點 (無業務數據請求),對於線上業務的影響可以忽略不計。基於此,DBA 單獨搭建了一套 TiDB 大數據集羣,用於將體量較大的業務數據庫同步至此 (基於 TiDB Pump 和 Drainer 組件),因此部分 Sqoop 任務可以從此集羣拉羣數據以消除對業務數據庫的影響。從數據流的角度,整個過程如下圖所示:

是否將生產環境 TiDB 業務數據庫同步至 TiDB 大數據集羣由數倉的需求以及 DBA 對於數據量評估決定。可以看出,這種形式也存在着大量數據的冗餘,集羣的資源隨着同步任務的增加時長達到瓶頸。並且隨着後續的演進,TiDB 大數據集羣也涵蓋一部分數據應用生產環境的業務數據庫,集羣作用域逐漸模糊。

四、數據集成 V2

V2 版本我們引入了 Flink,將同步的鏈路進行了簡化,DB 數據集成從之前的基於查詢的方式改成了基於日誌的方式 (流的方式),大大降低了冗餘的存儲。

4.1 Stat Log

藉助 Flink 1.11 版本後對於 Hive Integration 的支持,我們可以輕鬆的將 Kafka 的數據寫入 Hive,因此 Stat Log 的集成也就變得異常簡單 (相比 V1 版本,去除了對 Flume 組件的依賴,數據冗餘也消除了),同時 Flink Exactly-Once 的語義也確保了數據的準確性。從數據流的角度,整個過程如下圖所示:

目前按照小時粒度生成日誌分區,幾項 Flink 任務配置參數如下:

checkpoint: 10 min

watermark: 1 min

partition.time-extractor.kind: ‘custom’

sink.partition-commit.delay: ‘3600s’

sink.partition-commit.policy.kind: ‘metastore,success-file’

sink.partition-commit.trigger: ‘partition-time’

4.2 DB

基於日誌的方式對 DB 數據進行集成,意味着需要採集 DB 的日誌數據,在我們目前的實現中 TiDB 基於 Pump 和 Drainer 組件(目前生產環境數據庫集羣版本暫不支持開啓 TICDC),MongoDB 基於 MongoShake 組件,採集的數據將輸送至 Kafka。

採用這種方式,一方面降低了業務數據庫的查詢壓力,另一方面可以捕捉數據的變更過程,同時冗餘的數據存儲也消除了。不過由於原始數據是日誌數據,需要通過一定的手段還原出快照數據。新的鏈路如下圖所示:

用戶提交集成任務後將同步創建三個任務:

「存量任務」和「Merge 任務」由離線調度平臺 Dolphinscheduler (簡稱 DS) 調度執行,任務執行過程中將從集成任務的元數據庫中獲取所需的信息。目前「Merge 任務」按小時粒度調度,即每小時還原快照數據。

從數據流的角度,整個過程如下圖所示:

DB 的數據集成相較於 Stat Log 複雜性高,接下來以 TiDB 的數據集成爲例講述設計過程中的一些要點 (MongoDB 流程類似,區別在於存量同步工具及數據解析)。

■ 4.2.1 需求表達

對於用戶而言,集成任務需要提供以下兩類信息:

■ 4.2.2 存量任務

存量任務雖然有且僅執行一次,但爲了完全消除數據集成對業務數據庫的影響,我們選擇數據庫的備份 - 恢復機制來實現。公司內部數據庫的備份和恢復操作已經平臺化,集羣將定期進行備份 (天粒度),通過平臺可以查詢到集羣的最新備份,並且可由接口觸發備份恢復操作,故存量的獲取可直接作用於恢復的數據庫。

由於數據庫備份的時間點與集成任務提交的時間點並不一定是同一天,這之間存在着一定的時間差將導致存量快照數據不符合我們的預期,各時間點的關係如下圖所示:

按照我們的設定,存量快照數據應當是包含 T4 之前的全部數據,而實際備份的快照數據僅包含 T1 之前的全部數據,這之間存在這 N 天的數據差。

**注:**這裏之所以不說數據差集爲 T1 至 T4 區間的數據,是因爲增量的 Binlog 數據是以整點爲分區的,在 Merge 的時候也是將整點的分區數據與存量數據進行聚合,並支持了數據去重。因此 T1 時刻的存量數據與 T0-T3 之間的增量數據的 Merge 結果等效於 T0 時刻的存量數據與 T0-T3 之間的增量數據的 Merge 結果。所以 T1 至 T4 的數據差集等效 T0 至 T3 的數據差集,即圖示中的 N 天數據。

對於缺失的這部分數據實則是可以在「存量任務」中進行補全,仔細分析這其實是可以通過執行的 「Merge 任務」的補數操作實現。

整個「存量任務」的工作流如下圖所示:

■ 4.2.3 Merge 任務

Merge 任務的前提是存量數據與增量數據都已經 ready,我們通過 _SUCCESS 文件進行標記。整個「Merge 任務」的工作流如下圖所示:

Merge 操作通過 Flink DataSet API 實現。核心邏輯如下:

核心代碼:

主要思想爲「後來者居上」,針對於 Insert、Update 操作,最新值直接覆蓋舊值,針對 Delete 操作,直接丟棄。這種方式也天然的實現了數據去重操作。

■ 4.2.4 容錯性與數據一致性保證

我們大體可以從三個任務故障場景下的處理方式來驗證方案的容錯性。

以上,通過自動恢復機制和報警機制確保了整個工作流的正確執行。接下來我們可以從數據的角度看一下方案對於一致性的保障。

數據的一致性體現在 Merge 操作。兩份數據聚合,從代碼層面一定可以確保算法的正確性 (這是可驗證的、可測試的),那麼唯一可能導致數據不一致的情況出現在兩份輸入的數據上,即存量和增量,存在兩種情況:

針對 Flink 流式寫 Hive 過程中的亂序數據處理可以採取兩種手段:

問題轉換成了如何感知到亂序,我們可以進一步分析,既然亂序數據會觸發前一個分區的二次提交,那麼只需要在提交分區的時候檢測前一個分區是否存在 _SUCCESS 標記便可以知曉是否是亂序數據以及觸發報警。

五、線上效果

總覽

存量任務

Merge 任務

六、總結

本文闡述了伴魚「數據集成平臺」核心設計思路,整個方案還有一些細節未在文章中體現,如數據 Schema 的變更、DB 日誌數據的解析等,這些細節對於平臺構建也至關重要。目前伴魚絕大部分的集成任務已切換至新的方式並穩定運行。我們也正在推進實時數倉集成任務的接入,以提供更統一的體驗。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/THWRYEgfXwP6-pRYFQorkg