小米基於 Flink 的實時數倉建設實踐

**摘要:**本文整理自小米軟件開發工程師周超,在 Flink Forward Asia 2022 平臺建設專場的分享。本篇內容主要分爲四個部分:

    1. 小米數倉架構演變

    2. Flink+Iceberg 架構升級實踐

    3. 流批一體實時數倉探索

    4. 未來展望

小米數倉架構演變

1.1 數倉架構現狀

在介紹演變前,我們先來了解下小米當前的技術現狀。

上圖展示的是小米目前的技術架構,在存儲側我們主要應用數據湖 Iceberg 和自研消息隊列 Talos,計算層主要應用 Flink 和 Spark,他們統一運行在 Yarn 上,統一通過 Metacat 獲取元數據信息,並通過 Ranger 來進行統一的鑑權服務。我們內部使用 Spark 和 Presto 來支撐 OLAP 查詢場景,並通過 Kyuubi 來實現路由。

在實時數倉場景中,我們選擇 Flink 作爲計算底座,Hive、Talos、Iceberg 作爲存儲底座,其中,消息隊列 Talos 作爲傳統 Lambda 架構的通用選擇,在我們內部佔比較大且很穩定,Iceberg 作爲一款優秀的湖存儲,兼具時效性和低成本,其使用佔比也在逐步提升,使用到 Iceberg 的 Flink 作業在總佔比中已經達到近 50%。

我們對內部實時鏈路進行了統計,Iceberg 在大多數場景下已經對 Hive 進行了替換,對分鐘級的實時鏈路進行了較好的支撐;因爲使用 Iceberg 搭建的實時鏈路目前僅能達到分鐘級的時效,消息隊列仍有着較高佔比。

1.2 數倉架構演變

接下來看下小米內部數倉架構的演變歷程。

在引入數據湖前,針對日誌埋點這樣的聚合計算場景,業務會使用離線計算來搭建鏈路,採集模塊會將日誌或埋點數據統一收集到消息隊列中,Flink 消費消息隊列中的數據實時寫入 ODS 層 Hive 表,下游的計算則採用 Spark 或者 Hive 按小時或天進行清洗、聚合。顯然,這樣的鏈路處理延遲和成本都較高,這些離線作業往往都在凌晨進行調度,給整個集羣帶來較大壓力。

針對 CDC 數據源,實時數據通常會通過消息隊列進行流轉,保證處理的實時性,數據在消息隊列中以 Changelog-Json 的格式進行存儲。但爲了保證計算的準確性,業務鏈路通常會使用 Lambda 架構來搭建,會額外引入一條離線鏈路。離線鏈路基於 Hive 或 Kudu 構建,ODS 層使用 Spark Streaming 近實時導入,部分場景也會定期全量導入,下游計算依賴 Spark 做定時調度。顯然,這樣的架構開發和維護的成本都會很高。

帶着上面的問題,我們想要對批和流鏈路進行統一,並能夠滿足低成本和低延遲,爲此我們引入了 Iceberg,在引入 Iceberg 初期,小米內部的使用以 v1 表爲主 (v1 表是數據分析表,支持 Append 類型數據的增量讀寫)。因爲 Flink 舊架構(1.12 版本) 讀取 Iceberg 的數據時效性不高,所以在日誌埋點場景的應用主要是替換了 Hive,使用 Iceberg 來存儲 ODS、DWD 層數據,可以降低存儲成本,同時配合 Spark、Presto 可以獲得更快的查詢速度。

針對 CDC 數據源的場景,在初期也同樣以替換 Hive 爲主以獲取更低的成本。

在中期,我們開始大規模使用 Iceberg V2 表,並對 Iceberg V2 表的生態不斷進行完善,v2 表在 v1 表的基礎上支持了行級別的更新和刪除,同時也支持了 Merge on read 模式,並且有着不錯的性能。業務的實時鏈路也可以完全依賴 Flink 和 Iceberg 來進行搭建。之前的日誌埋點鏈路通過 Iceberg v2 表的升級後,使用 Flink+Iceberg v2 替換了原先的 Spark + Iceberg v1,將鏈路時效性由小時級提升至分鐘級。

由於 v2 表能夠支持行級別的更新,而且數據實時可查,原本針對 CDC 數據源的 Lambda 架構鏈路可以升級到 Kappa 架構,由 Flink 和 Iceberg v2 表來構建,兼顧時效性和成本,依賴 Parquet+ZSTD 壓縮,存儲成本相比於原先 Parquet+snappy 能夠節省 30%。

1.3 當前架構遇到的問題

經過我們一段時間的使用,我們發現目前 Iceberg 能夠很好地兼顧成本、查詢效率,社區的很多優化也以離線爲主,但在實時場景中存在着時效性和穩定性方面的不足,距離消息隊列仍有差距,同時,Iceberg 作爲統一的存儲 Format,在實際消費時需要讀取底層文件,而 v2 表有着多種文件類型,讀取時需要組織 DataFile 和兩類 DleteFile(Equlity delete 和 Position delete) 的關係,邏輯較爲複雜。

我們在基於 Flink+Iceberg 的實時鏈路構建中,經常會遇到以下兩類問題:

Flink+Iceberg 架構升級實踐

2.1 基於 Flink1.12 的舊架構實現

針對上述的兩個問題,我們對 Flink+Iceberg 的架構進行了升級。

上圖中的實時數倉鏈路由多張 Iceberg 表和多個 Flink 作業組成,其中 Iceberg 負責數據的存儲,Flink 負責數據的清洗、流轉,顯然對一條鏈路的實時性和穩定性支撐,Flink 起了關鍵作用。在一個 Flink 流式作業中,數據會經過讀取、計算、寫入,在實際場景中,我們發現數據的讀取效率低,嚴重影響了作業吞吐,後續的相關優化也主要圍繞讀取部分展開。

在優化前,我們的 Flink+Iceberg 實時鏈路主要依託於 Flink 1.12 版本構建,在 1.12 版本中,讀取邏輯被拆分爲 Monitor 和 Reader 兩個算子,在進行增量消費時,Monitor 算子掃描 Snapshot 中的文件,並組織成 Split 發往下游給 Reader 算子消費。這樣的架構做到了很好的掃描和讀取邏輯分離,但是仍有幾點重要缺陷,例如:

2.2 舊架構遇到的主要問題

這樣的缺陷在實際作業中會有實時性和穩定性兩大問題表現。在實時性方面,存在着消費速度慢、消費存在波動;在穩定性方面,存在着 Task OOM,Checkpoint 容易超時。

在時效性方面,目前主要有三個主要問題,分別是消費波動、消費延遲以及消費瓶頸。Split 的單向傳輸和掃描操作時間觸發使得消費存在波動和延遲,考慮 Monitor 算子在一個時間週期內僅能夠發送固定數量的 Split 給下游進行消費,如果 Split 數量少,那麼 Reader 算子會有部分時間處於空閒狀態,導致消費存在波動,存在資源浪費;而如果一個週期內下發的 Split 超過了 Reader 的消費能力,那麼 Split 就會在 Reader 側堆積,佔用額外的堆內存。同時固定的掃描間隔也會導致消費的延遲,新數據需要等待一定掃描間隔後纔可能被消費到,如果用戶配置了一個較大的掃描間隔,那麼數據的時效性會繼續降低。

這樣的機制不僅影響着實時性,對穩定性也有不小的影響。Monitor 和 Reader 的單向同步機制,使得消費需要指定間隔和間隔內下發的 Split 數,未消費完的 Split 會存儲在堆內存中,積壓較多會導致 OOM、Full gc 頻繁,Task 吞吐降低。

同時,舊架構的 SourceFunction 在實現數據下發時需要持有 Checkpoint 鎖從而保證數據下發和狀態更新的一致,而 Reader 算子 Checkpoint 粒度僅細化到 Split 級別,所以 Reader 算子需要長時間去持有 Checkpoint 鎖,只有消費完一個 Split 後纔會釋放,這在下游處理慢,反壓情況下是致命的缺陷,很容易導致 Checkpoint 超時。這些點一同促使着作業穩定性的降低。

2.3 基於 Flink1.14 的新架構實現

爲了解決上述實時性和穩定性問題,我們在社區基於 FLIP-27 的改動上改進了讀取邏輯,主要涵蓋了上圖右側的七點,其中雙向通信,Monitor 邏輯移至 JobManager 是 FLIP-27 的關鍵優化點。我們內部主要對後面的五點進行了優化,分別是 Snapshot 的依次掃描、自適應的掃描模式、分區多併發消費等。

增量消費 Iceberg 存在着兩種方式,分別爲依次掃描 Snapshot 和合並多個 Snapshot 掃描。在合併多個 Snapshot 的掃描模式中,需要依賴 Merge on read 模式,用後續 Snapshot 中的 Delete 文件對當前 Snapshot 中的 Data 文件數據進行過濾。如果合併多個 Snapshot 進行消費,那麼一個 DataFile 可能會關聯到很多後續 Snapshot 的 DeleteFile,使得 Split 的組織變得複雜,同時 Reader 算子在使用 DeleteFile 過濾 DataFile 時,需要將 Equlity delete file 全部讀取到內存中,這也很容易導致 Task 產生內存問題。

考慮到上面的文件組織複雜度和內存文件,我們默認選擇將掃描模式設置爲了依次掃描,該模式可以更好地追蹤數據變化,並且降低文件組織複雜度,避免了在合併多個 Snapshot 模式中因爲 Delete 文件較大而產生的內存問題,對穩定性更加友好。

舊架構中,掃描邏輯主要由時間驅動,定時觸發,在新架構中,我們引入了自適應的掃描模式,增加了事件驅動,解決了消費波動和 Task 潛在的內存問題。在實際掃描過程中,動態 Enumerator 會根據內存中 Buffer 的反饋進行決策,小於閾值就立刻執行掃描操作,保證 Reader 能夠連續消費,大於閾值就阻塞掃描,避免將更多的 Split 緩存在內存中。

在新架構中,我們針對 v2 表實現了併發消費,將原本的單一隊列 Buffer 按照下游 Task 拆分成多個隊列 Buffer,Iceberg 表中不同分區的數據文件會按照寫入排序,並被 Hash 到不同的隊列,實現消費的分區有序。

同時爲了保證各個 Task 消費數據的對齊,我們使用 Snapshot 的提交時間來生成 Watermark,引入 AlignedAssigner 來實現統一的 Split 分配,在分配端實現對齊,保證下游各個 Task 消費數據的對齊。

上面我們講到的自適應掃描只能解決單個 Source 實例的問題,在實際應用中,部分場景仍有潛在穩定性問題存在,例如集成場景中的指標拆分,將一張表的數據拆分至多張表;數倉場景中,對同一張表進行多次引用,篩選不同部分的數據進行 Join。在這兩個使用場景中,因爲不滿足 Source 複用規則,會有多個讀取同一張 Source 表的實例存在。

在 Flink 中,Source 的複用受 Partition、Limit、Project、Filter 影響,以 Project 和 Filter 爲例。上圖左邊的 SQL 描述了 Project 下推導致的複用失效,因一個字段的區別,同一份數據就會被讀取三次;上圖右邊的 SQL 描述了 Filter 下推導致複用失效的場景,即使選取的範圍有很大重複,但 Source 仍不會得到複用。由於複用的失效,同一個表的相同 Split 會在內存中存在多份,依然有出現內存問題的可能。

爲了優化這種情況,我們引入了兩種方式。

通過切換至新架構,消費 Iceberg 表的平均掃描間隔降至小於 1 秒,單個 Task 吞吐提升至 70 萬條每秒,實時數倉鏈路新鮮度提升至 5 分鐘內。

流批一體實時數倉探索

上一章介紹了 Flink 讀取 Iceberg 架構的優化,這一章將主要介紹小米在 Flink 流批一體實時數倉上遇到的問題以及相關探索。

遇到的問題可以歸結爲三類。

第一類是數據波動,實時數倉中數據是不斷變化的,由於 Flink 回撤機制的存在,-U 和 + U 會拆分爲兩條數據寫入,在 - U 寫入,+U 未寫入時執行查詢,會查詢到異常數據,而在 + U 寫入後又能查詢到正常結果。

第二類是計算不確定性,Flink 中算子的狀態過期會導致計算結果的不確定。同時針對這部分異常數據,往往沒有簡單的對比、修復手段,這也會導致實時鏈路產生的數據修復起來比較麻煩。

針對數據波動問題,考慮到下游絕大多數系統都能夠支持 Upsert 寫入,我們引入了寫入前數據丟棄能力,用於丟棄無關緊要的數據,將其稱爲 Drop Operator。該算子作用在 Sink 節點前,能夠根據配置丟棄指定類型的數據。

針對 Flink 聚合增量數據寫入 ADS 層 MySQL 的場景,可以配置丟棄 - U,避免 ADS 層查詢波動。同樣,該配置可以很方便的將 Changelog 流丟棄 - U 和 - D 轉爲 Append 流,滿足一些特殊的業務場景。

在解決計算的不確定性前,我們需要先了解其產生的原因。在 Flink SQL 中,狀態起着重要作用,正確的中間狀態是計算結果正確的必要條件。但顯然,目前狀態的保持是昂貴的,我們需要一個狀態過期策略來進行平衡。

在 Flink 內部,有着 Watermark 清理和 TTL 清理兩類算子。Watermark 可以根據業務的需要去生成,清理的策略根據實際使用場景制定,所以對計算結果影響可控。而依賴 TTL 清理的這類算子,在 Flink SQL 中狀態過期的策略無法得到準確控制,只能設置一個統一的狀態過期時間,往往因爲過期時間設置不合理或者滿足不了業務需求,從而產生預料之外的計算結果。

例如物流、服務單場景,訂單從創建到關閉的時間跨度往往很長,很容易出現在訂單還沒有結束前,狀態就過期了。爲了解決訂單跨度時間長導致狀態丟失的問題,業務會設置一個離線的 Topic,通過離線鏈路定期往離線 Topic 裏補數據,補充的數據重新流入實時鏈路中,將過期狀態重新補回。

針對由狀態過期而導致的計算不確定問題,我們有兩種解決思路。

爲了能夠讓狀態按需過期,我們引入了算子級的狀態清理功能,將清理規則應用範圍從作業細化到各個算子,將清理規則從時間規則拓展到業務規則,並通過 Query Hint 對算子提供靈活、方便的定義。

目前該功能支持兩類算子,分別是 Group 聚合算子和 Regular Join 算子,上圖表格爲支持的參數,通過 TTL 的參數可以設置該算子狀態的過期時間,condition 參數可以填寫清理規則,爲了方便判斷,清理規則需要是布爾表達式。

上圖的 SQL 展示了求某類商品總銷量的聚合計算邏輯,該聚合算子狀態保存時間爲 30 天,覆蓋了作業級的 1 天保存時間,且當商品狀態爲售罄或下架,那麼就清除該商品的狀態,這意味着有關該商品的銷售記錄後續不會再出現。

在聚合算子裏,我們加入了一個狀態清理的檢查器,將用戶設置的清理規則經過 codegen 轉換爲 Java 代碼,在聚合計算後進行規則檢查,匹配成功後執行清理。

同樣針對 Join 算子,狀態清理檢查器的實現類似,只是在 Join 算子會對左右表的狀態分別進行清理,清理完後會去對方狀態中將引用計數 - 1。

上圖的 SQL 示例描述了一個物流表的 Join 場景,左表爲物流訂單表,保存着訂單狀態以及更新時間,右表爲維度表,保存着該訂單的一些基礎信息,包括創建時間。在圖中的例子中,Join 算子的狀態清理不再依賴 Proctime,只依賴於運單狀態和運單的持續時間。

雖然算子級狀態清理能夠解決一部分需求,但它的使用門檻較高,且並非所有業務都有明確的清理規則,一個簡單方便的修復手段才適用於所有場景。如果想要用 Flink Batch 對數據進行修復,目前有 INSERT 和 OVERWRITE 兩種方式。使用 INSERT 實現 SQL 邏輯較爲複雜,且只能對數據進行覆蓋,不能刪除;OVERWRITE 的修復方式粒度較粗,而且會使下游實時作業產生較大波動。

在這樣的場景下,我們使用 Flink 實現了 Merge 語法。Merge 語法會對兩個數據源做 Join,並可以針對不同的 Join 情況執行增、刪、改操作,對下游影響小。

在具體的實現上,我們在原本的 Calcite 語法上完善了 Merge 語法的解析邏輯,支持爲每個 Action 設置獨立的判斷條件,在 Schema 匹配情況下支持 Insert * 和 Update * 語句,簡化邏輯。

在 SQL 校驗階段,Merge 邏輯會被轉爲 Outer Join 和多個 Merge Action 的結合。在優化階段,目前我們會根據實際的 Merge Action 情況來優化 Join 方式,將默認的 Outer Join 改寫爲 Anti Join 或者 Inner Join,減少處理的數據量。

最終,Merge 邏輯會生成 Join 和 MergeAction 兩個算子,Merge Action 算子根據上游 Join 情況來生成增、刪、改數據併發往下游。由於 Flink SQL 目前提供了優秀的流批一體架構,可以複用當前的邏輯,將增刪改數據寫入下游數據系統。

在我們內部,Spark 和 Flink 目前都支持 Merge 語法,但 Spark 在框架層只提供了語法側的支持,Runtime 層的支持在 Iceberg 側由插件實現。Flink 則在框架中實現了語法和 Runtime 層的支持,使得 Merge 的功能更加通用,也能夠支持更多存儲系統。目前,在我們內部,Flink merge into 入湖和入庫場景使用較多。

因爲實現原因,Spark 在我們內部目前僅能支持 Merge into 入湖,所以我們在 Merge into 入湖場景下對 Spark 和 Flink 的處理速度做了測試,目前中小批量數據的 Merge 操作 Flink 執行速度會略快,大數據場景下 Flink 因爲入湖速度較 Spark 慢,所以耗時稍多,但整體來看,Flink 已經能夠滿足日常修復需求。

未來展望

我們的未來展望主要包括以下三點:

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/FqmIynf3ap0VBElxvRYAew