Apache Hudi 在騰訊的落地與應用

Apache Hudi 核心概念

Apache Hudi 是一個基於數據庫內核的流式數據湖平臺,支持流式工作負載,事務,併發控制,Schema 演進與約束;同時支持 Spark/Presto/Trino/HIve 等生態對接,在數據庫內核側支持可插拔索引的更新,刪除,同時會自動管理文件大小,數據 Clustering,Compaction,Cleanning 等

可以基於雲存儲 / HDFS 構建基於 Hudi 的 Lakehouse,Hudi 支持 Parquet、ORC、HFile、avro 格式,同時提供了非常豐富的 API,如 Spark DF、RDD、FlinkSQL、Flink DataStream API,利用這些 API 可以非常方便地對 Hudi 表進行操作,同時 Hudi 也集成了其他生態,如 MPP 引擎 Starrocks,doris 等

Hudi 的基本概念由 Timeline 和 File Layout 組成

Hudi 支持 MOR 和 COW 兩種類型,MOR 表對流式寫入更友好,延遲更低,對於更新的 log 文件支持同步和異步兩種模式進行 Compaction 生成新的 Base 文件,以加速查詢,支持 Snapshot,Read Optimized,Incremental 讀取

而對於 COW 表,每次寫入需要重寫文件,寫放大相對嚴重,延遲相對 MOR 較高,更適合寫少讀多的場景。

爲了加速數據的更新,Hudi 支持多種索引,如分區級別的索引以及全表索引,分區級別的索引可以保證數據在分區內的唯一性,全表索引保證數據在表級的唯一性(開銷較大)。Hudi 支持了多種類型的索引實現,典型的如 BLOOM、BUCKET 索引,以及自定義索引等方式。

另外一個核心的概念是 Hudi 的 Table Service,包含 Compaction 操作,Compaction 針對 FileSlice 進行操作,會將 Base 文件和其對應的 Log 文件進行合併,產生新的 Base 文件;可以通過指定 NUM_COMMITS 或 TIME_ELAPSED 兩種策略調度執行 Compaction,對於調度執行而言,Hudi 爲不影響主鏈路的寫入,支持了異步調度與執行,以及同步調度與執行,同步調度異步執行方式,滿足不同的需求。

另外一個 Table Service 是 Clean,Clean 用於刪除過期的文件,同樣與 Compaction 類型也提供了多種策略以及調度執行策略,值得注意的是對於做了 Savepoint 的時間點,其對應的文件不會被刪除。

接下來分析對於 COW 表的不同查詢的實現,如在 instant 0 時刻寫入一部分數據(ABCDE),在 instant 1 時刻更新 A -> A',D -> D',在 instant 2 時刻更新 A'-> A'',E -> E',並插入 F 那麼對於快照查詢(Snapshot Query)每次都是讀取的最新的 FileSlice,增量查詢(Incremental Query)讀取指定 commit 之間的 Parquet 文件,然後再將時間範圍下推至 Parquet 文件進行過濾,只讀取符合條件的變更的數據。

對於 MOR 表,快照查詢(SNAPSHOT Query)讀取的是 Base 文件與 Log 合併後的最新結果;而增量查詢讀取指定 commit 之間的 Parquet 以及 Log 文件,然後再對 Log 文件進行 Block 級別的過濾(根據 Commit 時間),合併重複 key 後返回結果。

CDC 數據入湖

這個場景主要是 DB 數據入湖入倉把原來 T + 1 的數據新鮮度提升到分鐘級別。數據新鮮度通過目前比較火的以 Debezium、Maxwell 爲代表的 CDC(change Data Capture)技術實現。以 Streaming 近實時的方式同步到數倉裏面。在傳統的 Hive 數倉中想保證實時是非常困難的,尤其是文件更新,湖表實時寫入更新,基本不可能實現。

CDC 技術對數倉本身存儲是有要求的,首先是更新效率得足夠高,能夠支持以 Streaming 方式寫入,並且能夠非常高效的更新。尤其是 CDC log 在更新過程還可能會亂序,如何保證這種亂序更新的 ACID 語義,是有很高要求的,當前能滿足亂序更新的湖格式只有 Hudi 能做到,而且 Hudi 還考慮到了更新的效率問題,是目前來說比較先進的架構。

圖中方案 3 相比上面的方案,比較適合目前體量比較大(每天增量能達到億級別地)、數據平臺比較健全的公司,中間有一套統一的數據同步方案(彙總不同源表數據同步至消息隊列),消息隊列承擔了數據的容錯、容災、緩存功能。同時,這套方案的擴展性也更加好。通過 kafka 的 topic subscribe 方式,可以比較靈活地分發數據。通過以上三種方式入湖 hudi,以某數據中臺爲例已經有 6000 多張源表寫入 Hudi 日增幾十億數據入湖。

分鐘級實時數倉

第二個場景是構造分鐘級別的實時數倉,分鐘級別的端到端數據新鮮度,同時又非常開放的 OLAP 查詢引擎可以適配。其實是對 kappa 架構或者是原先 Streaming 數倉架構的一套新解法。在沒有這套架構之前,實時分析會跳過 Hudi 直接把數據雙寫到 OLAP 系統中,比如 ClickHouse、ES、MongoDB 等。當倉存儲已經可以支持高效率分級別更新,能夠對接 OLAP 引擎,那麼這套架構就被大大簡化,首先不用雙寫,一份數據就可以保證 only one truth 語義,避免雙寫帶來數據完整性的問題。其次因爲湖格式本身是非常開放的,在查詢端引擎可以有更多選擇,比如 Hudi 就支持 Presto、trino、Spark、Starrocks、以及雲廠商的 redshift 引擎,會有非常高的靈活度。多層數層數據可見性也從 T+1 小時或天縮短到分鐘級別。

流式計算 PV/UV

Apache Hudi 的 Payload 是一種可擴展的數據處理機制,通過不同的 Payload 我們可以實現複雜場景的定製化數據寫入方式,大大增加了數據處理的靈活性。Hudi Payload 在寫入和讀取 Hudi 表時對數據進行去重、過濾、合併等操作的工具類,通過使用參數 "hoodie.datasource.write.payload.class" 指定我們需要使用的 Payload class。爲了實現 pv/uv 計算,我們實現了 RecordCountAvroPayload ,它可以在對數據去重的時候,將重複數據的數量記錄下來,這裏的重複指的是 HoodieKey(primary key + partition path)相同。以往處理方式是通過 flink + window 聚合實現,該方式有延遲數據丟棄和 state 爆掉風險,Hudi Payload 機制則沒有這些風險。

多流拼接(大寬表)

上圖是一個典型的非常複雜的業務落地, 消息流 1 由 kafka 寫入 hudi 商品銷售明細表,消息流 2 由 kafka 寫入 hudi 用戶基本屬性表,然後結合 hudi 商品標籤表和 hive 用戶擴展屬性表進行實時和離線拼接大寬表。

在實現多流拼接功能前有三個前置條件需要滿足:

  1. 1. 基於樂觀鎖的 Timeline

  2. 2. 基於 marker 的早期衝突檢測

  3. 3. 啓用 occ(樂觀併發控制)

這裏主要描述基於時間線服務器的標記機制,該機制優化了存儲標記的相關延遲。Hudi 中的時間線服務器用作提供文件系統和時間線視圖。如下圖所示,新的基於時間線服務器的標記機制將標記創建和其他標記相關操作從各個執行器委託給時間線服務器進行集中處理。時間線服務器在內存中爲相應的標記請求維護創建的標記,時間線服務器通過定期將內存標記刷新到存儲中有限數量的底層文件來實現一致性。通過這種方式,即使數據文件數量龐大,也可以顯着減少與標記相關的實際文件操作次數和延遲,從而提高寫入性能。

實現的原理基本上就是通過自定義的 Payload class 來實現相同 key 不同源數據的合併邏輯,寫端會在批次內做多源的合併並寫入 log,讀端在讀時合併時也會調用相同的邏輯來處理跨批次的情況。這裏需要注意的是亂序和遲到數據(out-of-order and late events)的問題。如果不做處理,在下游經常會導致舊數據覆蓋新數據,或者列更新不完整的情況。針對亂序和遲到數據,我們對 Hudi 做了 Multiple ordering value 的增強,保證每個源只能更新屬於自己那部分列的數據,並且可以根據設置的 event time (ordering value) 列,確保只會讓新數據覆蓋舊數據。最後結合 lock less multiple writers 來實現多 Job 多源的併發寫入。

介紹多流拼接場景下 Snapshot Query 的核心過程,即先對 LogFile 進行去重合並,然後再合併 BaseFile 和 去重後的 LogFile 中的數據。下圖顯示了整個數據合併的過程,具體可以拆分成以下 兩個過程:

如上圖所示,以最簡單的覆蓋邏輯爲例,當讀到 BaseFile 中的主鍵是 key1 的 Record 時,發現 key1 在 Map 中已經存在並且對應的 Record 有 BCD 三列的值,則更新 BaseFile 中的 BCD 列,得到新的 Record(key1,b0_new,c0_new,d0_new,e0),注意 E 列沒有被更新,所以保持原來的值 e0。對於新增的 Key 如 Key3 對應的 Record,則需要將 BCE 三列補上默認值形成一條完整的 Record。

批流探索 - 廣告歸因

廣告歸因是指在用戶在廣告行爲鏈路中,使用科學的匹配模型兩兩匹配各環節的行爲數據點,可用於判斷用戶從何渠道下載應用(或打開小程序),通過匹配用戶廣告行爲,分析是何原因促使用戶產生轉化。廣告歸因的數據結果是衡量廣告效果、評估渠道質量的重要依據,可幫助廣告主合理優化廣告素材,高效開展拉新、促活營銷推廣,而實時廣告歸因則能更及時的應用到優化廣告投放的過程中。

在增長買量業務場景中,買量團隊在快手、百度、字節等渠道上投放廣告,比如某雲遊戲廣告素材,吸引潛在用戶點擊廣告,進入業務開始玩雲遊戲,也可以下載遊戲的 APK 安裝包,從而實現將用戶轉化成業務新增用戶和遊戲新增用戶的目的。如下圖所示,渠道方可以獲取用戶的點擊數據,業務可以獲取新增用戶的數據,在點擊歸因鏈路中,就是將業務新增用戶匹配到用戶在某渠道上近 N 天的最後一次廣告點擊,在正常的業務過程中,先有用戶點擊廣告數據,後有業務新增用戶數據,根據離線數據統計經驗,點擊轉化成新增用戶的窗口時間最長不超過 3 天,也就是 N=3。

基於 Hudi 方案優勢如下

批流探索 - 流轉批

在某些業務場景下,我們需要一個標誌來衡量 hudi 數據寫入的進度,比如:Flink 實時向 Hudi 表寫入數據,然後使用這個 Hudi 表來支持批量計算並通過一個 flag 來評估它的分區數據是否完整從而進一步寫入分區數據進行分區級別的 ETL,這也就是我們通常說的流轉批

上左圖中 Flink Sink 包含了兩個算子。第一個 writer 算子,它負責把數據寫入文件,writer 在 checkpoint 觸發時,會把自己寫入的最大的一個時間傳到 commit 算子中,然後 commit 算子從多個上游傳過來的時間中選取一個最小值作爲這一批提交數據的時間,並寫入 HUDI 表的元數據中。

我們的方案是將這個進度值 (EventTime) 存儲爲 hudi 提交(版本)元數據的屬性裏,然後通過訪問這個元數據屬性獲取這個進度值。在下游的批處理任務之前加一個監控任務去監控最新快照元數據。如果它的時間已經超過了當前的分區時間,就認爲這個表的數據已經完備了,這個監控任務就會成功觸發下游的批處理任務進行計算,這樣可以防止在異常場景下數據管道或者批處理任務空跑的情況。

上右圖是一個 flink 1 分鐘級別入庫到 HUDI ODS 表, 然後通過流轉批計算寫入 HUDI DWD 表的一個執行過程。

如何解決亂序到來問題, 我們可以通過設置 spedGapTime 來設置允許延遲到來的範圍默認是 0 不會延遲到來。

未來規劃

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/OCLIso3oHvpGBB10fuuTyw