汽車之家基於 Flink 的實時計算平臺 3-0 建設實踐

摘要**:**本文整理自汽車之家實時計算平臺負責人邸星星在 Flink Forward Asia 2021 平臺建設專場的演講。主要內容包括:

  1. 應用場景

  2. 預算資源管控

  3. Flink 伸縮容

  4. 湖倉一體

  5. PyFlink 實踐

  6. 後續規劃

**Tips:點擊「閱讀原****文」**查看原文視頻 & 演講 PDF~

一、應用場景

我們的應用場景與其他公司很類似,涵蓋了實時指標統計、監控預警、實時數據處理、實時用戶行爲、實時入湖、實時數據傳輸這幾個方面:

我們最早是使用 Storm 平臺,基於 Spout、Bolt 開發模型,實現了基礎的實時計算開發,這個階段是完全基於 Java 編碼方式實現開發,開發門檻及學習成本都比較高。

第二階段,我們在 18、19 年引入 Flink,並建設了 AutoStream 1.0 平臺。這個階段我們主要的目標是提效、降低開發門檻和學習成本。將之前純 Java 開發方式轉變爲基於 SQL + UDF 的開發方式。由於 Flink 早期還不支持 DDL,所以我們很大一部分工作就是建設自己的 meta server,並通過 DDL 定義 source sink 組件,同時完善業務庫數據的實時接入,並將公司內部常用的存儲引擎集成到平臺上,完成整個實時開發鏈路的打通。

第三階段,我們將 Flink 升級到 1.9 版本,並將平臺升級爲 AutoStream 2.0 版本,支持原生 DDL,同時支持自助上傳 UDF,簡化了 UDF 使用流程。同時隨着任務數及平臺用戶的增加,我們的日常 on call 時間也隨之增加,所以我們上線了任務的健康評分機制,從多個方面分析任務的健康度,幫助用戶瞭解任務可以優化的點並附帶解決方案。我們還上線了在線診斷功能,支持動態修改日誌級別,查看線程棧和火焰圖,提升用戶定位問題的效率,同時也降低了我們平臺方的日常 on call 成本。

AutoStream 3.0 代表了我們今年主要的工作,首先是將 Flink 升級到 1.12 版本,這次升級帶給我們的最直接的收益就是支持湖倉一體、 Native on K8S、PyFlink。同時本着降本提效的思路,新增了智能伸縮容的功能,一方面可以提升實時計算資源的利用率,另一方面也進一步降低用戶優化任務資源的難度。

上圖是 AutoStream 2.0 的架構,它包含了很多內容,涵蓋了平臺整體的功能與定位。但不可避免地存在諸多痛點。

由於實時計算離線的存儲資源是混用的,離線 Hadoop 集羣單獨爲實時計算拆出了一部分服務器並單獨部署了一套 Yarn 供實時計算使用,這部分服務器的磁盤用來支持離線數據的存儲,CPU 內存主要用來支持運行 Flink 任務,所以 Flink 計算資源並沒有獨佔服務器,我們也沒有對計算資源作嚴格的管控,所以導致有很多任務分配的資源是不合理的,通常是申請了過多的 CPU 資源但實際的利用率卻比較低。

隨着公司容器化建設的逐步推進,今年我們已經支持了離線和在線混部並錯峯分配資源的方式。也就意味着 Hadoop 集羣的 CPU 內存除了支持 Flink 實時計算,也可以支持在線業務的部署,對 Flink 計算資源管控的重要性及緊迫程度就凸顯出來了。

接下來最重要的就是推動用戶做資源的調優,這部分工作對用戶來說存在一定難度。首先,要理解 CPU 內存和並行度的調整對任務的影響就是有成本的,而且通常修改任務資源、重啓任務就需要幾分鐘,用戶還需要持續觀察是否對業務產生了影響,比如出現延遲或內存溢出等。簡單來說,用戶的調優成本是比較高的。

現有的基於 Hive 的數倉架構需要升級,t+1 或 h+1 的時效性已經無法滿足很多業務場景的需求,我們最終選定 Flink 和 Iceberg 來構建實時湖倉一體的架構。

最後是實時計算支持的生態不夠完善。我們的人工智能團隊主要以 Python 語言爲主,基於 SQL + UDF 的方式顯然對他們不夠友好,所以我們做了 PyFlink 的集成工作,解決了這一痛點。

上圖是 AutoStream 3.0 的整體介紹。

基於 2.0 版本的痛點,除了功能和應用性升級之外,我們主要還做了以下幾個方面的工作:

首先是加強了預算管控,上線了自動伸縮容功能,建設並落地了實時湖倉的架構,並上線了 PyFlink,支持 Python 開發實時任務。同時我們還基於 Flink + StarRocks 引擎,對實時多維分析的鏈路做了進一步簡化。

二、預算資源管控

爲什麼需要做預算資源管控策略?

首先是服務器資源沒有按團隊做預算劃分,先用先得,沒有上限,任務資源利用率低,個別團隊存在嚴重的資源浪費情況。同時沒有外力的推動,大部分用戶主動優化資源的意識很薄弱。

我們做的第一步就是啓用預算的強控機制。與內部的資產雲系統做對接初始化團隊的可用預算,超出預算後任務將無法啓動。還對此定義了規範,用戶需要先優化團隊內的低利用率任務來釋放預算,原則上資源利用率低的任務數應該控制在 10% 以內,如果無法優化,可以在資產雲系統上發起團隊間預算調撥的流程,也就是藉資源;如果還是失敗,則會由平臺開白名單臨時支持業務。

平臺規範裏,我們對資源利用率低的任務也進行了定義,同時展示出低利用率的原因及解決方案。

目前我們主要是針對 CPU 使用率、內存使用率和空閒 slot 這幾個核心規則來識別低利用率任務。早在 AutoStream 2.0 版本,我們就上線了 Flink 任務的健康評分機制,得到了豐富的細粒度得分數據,所以可以很容易地識別低利用率任務。

我們通過引入強控流程來嚴控計算資源的用量,通過制定規範來提升用戶主動優化資源的意識,通過開發自動伸縮容功能降低用戶的調優成本。由此達到的收益是,在實時計算業務穩步增長的前提下我們全年沒有新增服務器。

三、Flink 伸縮容

爲什麼需要自動伸縮容功能?

上圖是自動伸縮容配置的頁面,可以指定自動伸縮容的觸發時間,比如可以指定在夜裏低峯時期執行,降低伸縮容對業務的影響,支持指定 CPU 並行度、內存維度伸縮容的策略,每次執行伸縮容都會通過釘釘和郵件通知任務負責人,並且會記錄伸縮容的觸發原因和伸縮容之後的最新資源配置。

上圖是自動伸縮容功能的整體設計。我們在 jobmanager 中增加了一個新的組件 RescaleCoordinator,它使用 ha 維護其生命週期,且與 dispatcher 之間彼此通信。RescaleCoordinator 會定期訪問 AutoStream 提供的接口,AutoStream 平臺會根據用戶配置的伸縮容策略判斷是否需要執行伸縮容。

整體的流程如下:RescaleCoordinator 獲取到 leader 後會定期檢查是否需要伸縮容,如果需要則向 dispatcher 通知 jobmanager 開始伸縮容。jobmaster 會向 resourcemanager 請求 taskmanager,待所有請求的 taskmanager 都準備就緒,就會將舊的 taskmanager 釋放掉,然後基於新的 taskmanager 重新調度,最終把這次結果持久化到 zk 和 HDFS 上。

平臺的 Flink 本就使用了 zk 和 HDFS 做 ha,所以我們不需要引入新的組件。此外,因爲新的 container 是提前申請好的,又能省去 container 申請的時間,避免了因爲資源不夠而申請不到 slot 導致任務 recover 失敗。如果是做並行度的伸縮容,需要在發起調度前修改 jobgraph 的並行度來實現。

以 CPU 內存爲例,第一步是向 ResourceManager 申請 container 併爲之打標記。新的 taskmanager container 通過 slot pool 向 resultmanager 請求,這一步需要在 slot pool 中維護新的資源配置,對應上圖中的 CPU 2 核,內存 2GB,且需要支持回滾機制。如果這次伸縮容失敗,資源設置回滾到 CPU 1 核,內存 1G。

第二步,停掉任務,刪除 ExecutionGraph。

第三步,釋放舊 taskmanager,重新構建 ExecutionGraph,並在標記的 taskmanager 上從保存點恢復任務。

第四步,將此次伸縮容的資源設置持久化到 zk 和 HDFS,如果 jobmanager 在這裏掛掉,那麼之前伸縮容的配置都會丟失,所以需要將伸縮容後的配置保存在 zk 和 HDFS 上,數據存在基於 HDFS 的 block server 中,在 zk 中會保存 block server 的 key。

最後,對伸縮容策略進行一個粗略的總結:

四、湖倉一體

基於 Hive 的數據倉庫主要存在以下幾個痛點:

經過一番選型,我們決定選擇基於 Iceberg 來構建湖倉一體架構,解決基於 Hive 的數據倉庫的痛點。

Iceberg 的定位是開放的表格,不綁定某一種存儲或計算引擎,同時它能提供增量快照機制,可以輕鬆實現準實時的數據寫入和讀取。Iceberg 的 v2 格式支持 acid 語義,可以滿足 upsert 需求,爲後續做存儲層面的批流一體提供了可能性。讀取型的 schema 對 schema 的變更也十分友好。目前主要的查詢引擎都和 Iceberg 做了集成,讀寫路徑上也都支持了流和批的方式,從流批一體的角度來看,也是十分友好的。

上圖右側是 Iceberg 增量快照機制的基本原理。每次針對表的 commit 操作都會產生一個新的快照,比如針對表的第一次數據寫入的 commit 會生成 snapshot0 快照 (上圖中的 s0),第二次寫入的 commit 會生成 s1。每個快照對應一個 manifest list 對象,會指向多個 manifest file,每個 manifest file 又會指向多個 data file,也就是存儲數據的文件。圖中的 current metadata pointer 會爲每一個 Iceberg 表指向一個最新的 metadata file,即最新生成的快照。

上圖是 Iceberg 目前在我們內部的集成情況,最底層是基於 Hive Metastore 來統一 Hive 表和 Iceberg 表的元數據,基於 HDFS 來統一 Hive 表和 Iceberg 表的存儲,這也是湖倉一體的基礎。往上一層是表格式,即 Iceberg 對自身的定位:介於存儲引擎和計算引擎之間的開放的表格式。再往上是計算引擎,目前 Flink 主要負責數據的實時入湖工作, Spark 和 Hive 作爲主要的產品引擎。最上面是計算平臺,Autostream 支持點擊流和日誌類的數據實時入湖,AutoDTS 支持關係型數據庫中的數據實時入湖,離線平臺與 Iceberg 做了集成,支持像使用 Hive 表一樣來使用 Iceberg,在提升數據時效性的同時,儘量避免增加額外的使用成本。

下面是我們在湖倉一體架構落地過程中的一些典型的實踐。

實時數據入湖方面

在 Iceberg 場景中,需要確保主鍵相同的數據寫入到同一個 bucket 的下。由於 Flink 表的 DDL 並不支持 Iceberg 的 bucket 的定義,所以我們做的第一件事就是支持在 Flink DDL 的 property 中定義 bucket。

第二個問題是 Iceberg 表本身無法直接反映數據的寫入進度,離線調度難以精確地觸發下游任務,所以我們藉助 Flink 良好的 watermark 機制,直接在入湖的階段將 watermark 持久化到 Iceberg 表的元數據中,這樣可以通過簡單的腳本調用,就能知道 Iceberg 表的數據寫入進度,從而精確地觸發下游的調度任務。

第三個問題是實時入湖階段和離線團隊賬號體系的打通。Flink 向 Iceberg 寫入數據的時候,需要訪問 HDFS 和 Hivemetastore,所以必然要和離線既有的團隊賬號體系打通。一個離線的 HDFS 目錄只能給一個用戶分配寫入權限,所以在引入 Iceberg 之前,所有的 Flink 任務都是通過一個固定的 Hadoop 賬號運行的,這樣的好處就是方便我們做統一的資源管理,包括 checkpoint 目錄的統一管理。我們通過修改 Iceberg 創建 Hadoop Filesystem 實例的代碼,增加了賬號代理的機制,實現使用自定義賬號向 Iceberg 寫入數據,同時擴展 HiveMetaStoreClient 增加代理機制來打通對 HiveMetaStore 的訪問。

可用性和穩定性方面的實踐

爲了湖倉一體元數據的統一,我們堅持和離線數倉複用同一套 HiveMetaStore 服務,期間也遇到了很多穩定性和數據正確性的問題。

首先是訪問 HiveMetaStore 異常,這是因爲我們的 Hadoop 集羣啓用了 kerberos 機制,並且 Hive config 的過渡方法被誤用,導致 Hive 客戶端 kerberos 相關配置被覆蓋,造成訪問 HiveMetaStore 異常。我們做了相應做的修復,也已反饋給社區。

其次我們引入了基於 zk 的分佈式鎖用來替換默認的 HiveMetaStore 的鎖。Iceberg 默認基於 HiveMetaStore 分佈式鎖來控制單表的併發 commit,但是存在一種情況,當 Flink 進程意外退出時,代碼無法觸達 unlock 邏輯,導致針對表級別的鎖一直被佔用,無法釋放。Flink 任務被自動拉起後無法再次獲取到鎖,導致後續無法正常寫入數據。隨着入湖任務量的增加,這個問題每週都會至少出現一次,每次都需要人工介入去、手動訪問 HiveMetaStore 釋放鎖,才能讓 Flink 任務恢復正常。如果處理不及時,可能導致數據入湖延遲幾個小時。

針對這一問題,在高版本的 Hive 中其實已經有了解決方案,就是針對分佈式鎖設置超時時間,超時之後會自動將鎖釋放。但我們是基於 Hive 2.0.1 版本,整體升級和拉取 patch 的成本都比較高,所以我們針對 Iceberg 做了改造,使用基於 zk 的分佈式鎖替換之前的鎖機制,上線後這一問題也得到了根本的解決。

還有 Iceberg 表元數據文件被誤刪的問題,這個問題會導致出現找不到數據文件的異常,直接影響 Iceberg 表無法被訪問。解決方法就是在修改 metadata_location 屬性的時候,增加容錯機制,優先嚐試重試並檢查是否修改成功,僅在確認元數據未保存成功的情況下,纔會對 metadata.json 文件做刪除操作。

對 v2 格式小文件合併方面的一些改進

Iceberg 實現 upsert 語義的原理是,它的 v2 格式通過引入 sequence number 並結合 position delete file 和 equality delete file 來實現的,寫入思路是首先將 delete 行寫在 equality delete file 中,如果 delete 行在當前的事務中 insert 過,就把 insert 行所在的文件行號和地址給 position delete file。

舉個例子,假設有兩個事務:

這三條 SQL 產生的行爲如下:首先生成了一個 data file2,裏面包括了新寫入的 ID 爲 3 的數據即 I (3,300),還包含了 ID 爲 3 的數據 update 之後的數據即 I (300,301),並把 I (3,300) 寫入到 equality delete file 中。由於 ID 爲 3 的數據在當前的事務中寫入過,所以還會生成一個 position delete file,然後把 (3,300) 這條數據對應的 position 標記爲刪除。而因爲 ID 爲 2 的數據不是在當前事務中寫入的,所以把 ID 爲 2 的數據即 I (2,200) 追加到 equality delete file,並標記爲刪除即可。

upsert 語義在讀路徑上的實現思路如下:首先 position delete file 與不大於自己 sequence number 的 datafile 做 join,equality delete file 則與小於自己 sequence number 的 datafile 做 join。因爲上圖中,position delete file 會把 datafile2 中的第零條數據即 I (3,300) 刪除, equality delete file 會把 datafile1 中 I (2,200) 刪除掉,最終查詢結果只有 ID 爲 1 和 3 的兩條數據。

由於 delete file 越來越多,查詢性能也會隨之降低。爲了保證查詢性能,我們每個小時都會對 Iceberg 表進行小文件合併。

但是在引入 sequence number 之前,針對 v1 格式的小文件合併無法保證 v2 格式數據在合併後的正確性。所以在實踐的過程中,我們針對 v2 格式的小文件合併做了一些改造。

針對 v1 格式的小文件合併思路和讀取思路完全一致,即將兩種 delete file apply 到合適的 datafile 上,合併後刪除對舊文件的引用,改爲引用新生成的 datafile。根據之前 sequence number 的定義,它會一直遞增,所以合併後的 datafile 對應的 sequence number 也會變大,會導致 v2 格式的數據在合併小文件場景下的衝突。

首先小文件合併過程中,修改 sequence number 會導致 Flink 實時寫入的事務衝突,導致上圖中事務 3 的 delete 語句失效。事務 3 將主鍵 3 的數據 delete,但事務 4 的小文件合併成功後會把這條數據又加回來,因爲這條數據的 sequence number 變成 4,此時 delete file 的 sequence number 是 3,不會再把 sequence number 爲 4 的數據刪掉。

針對上述衝突,我們對 Iceberg 的小文件合併做了改造,改造的思路是合併小文件本質上並不會對最終的數據做修改,僅僅是優化文件的存儲。所以在合併過程中,複用被合併的文件中最大的 sequence number 即可。

按照新的思路,事務 4 之後的 datafile 對應的新 sequence number 爲 2,也就是複用了被合併的文件中的最大的 sequence number。sequence numbe2 依舊小於事務 3 的 sequence number3,所以可以保證 delete file 的語義正確性。

湖倉一體架構的落地爲我們帶來了不少的收益:

五、PyFlink 實踐

引入 PyFlink,主要因爲我們想把 Flink 強大的實時計算能力輸出給人工智能團隊。人工智能團隊由於技術本身的特點,大部分開發人員都是基於 Python 語言開發,而 Python 本身的分佈式和多線程支持比較弱,他們需要一個能快速上手又具備分佈式計算能力的框架,來簡化他們日常的程序開發和維護。我們正好也需要補全平臺上對 Python 生態的支持不足,所以很自然地想到了把 PyFlink 集成到我們的平臺上。

上圖是 PyFlink 的基本架構。python vm 和 jvm 雙向通訊的架構,實現了 Python API 和 Java API 的一一映射,用戶可以通過編寫 Python 代碼來實現 Flink 任務的開發。

通過對比,我們最終選擇了用 Kubernetes 方式來部署 PyFlink 環境。Kubernetes 除了可以較好地支持資源隔離,也可以方便地集成 Python 環境和其他機器學習的依賴。

對於 PyFlink,我們主要支持了三種依賴管理:

Flink 原生基於 K8S 部署的情況下,每次修改用戶程序都需要重新制作鏡像,費時費力。爲了簡化用戶的開發難度、提高部署效率,我們在集成 PyFlink 的時候進行了優化。依賴平臺提供的文件服務,用戶啓動任務時只需要將所依賴的文件上傳到文件服務上,然後通過修改鏡像的入口腳本,真正啓動 jobmanager 和 taskmanager 進程之前,根據傳入的參數,讓所需的依賴也就是 jar 文件、Python 文件等下載到容器的內部目錄下,這樣程序啓動就可以加載到對應的文件了。

我們還和 AutoStream 平臺的 catalog 做了打通,Python 用戶可以直接複用平臺上已經聲明過的表和 UDF,也可以自己開發註冊 Python UDF。用戶可以通過 SQL + UDF 的方式快速完成業務需求的開發。

舉個自定義任務的事例。在開發 PyFlink 程序時,可以依靠 PyFlink 提供的 Gateway 調用平臺內置的 Catelog 和 UDF 註冊類完成 Catelog 和 UDF 的註冊,避免開發時的重複定義和重複開發。

再來看一下 PyFlink UDF 的開發事例。用戶可以在原有的項目中進行開發,保持原來的項目結構不變,然後按照 PyFlink UDF 開發規範,在 eval 方法中調用相應的處理邏輯來增加新的額 Python 代碼。

而使用 Python UDF,只需要在 SQL 任務中創建 function 時指定 language 爲 Python,並在高級配置中添加 Python UDF 所需的文件。

我們通過集成 PyFlink + AutoStream 實現了對 Python 生態的基礎支持,解決了 Python 用戶難以開發實時任務的痛點。同時他們也可以方便地將之前部署的單機程序遷移到 AutoStream 平臺上,享受 Flink 強大的分佈式計算能力。

六、後續規劃

未來,我們會持續優化計算資源,讓計算資源的利用更加合理化,進一步降低成本。一方面充分利用自動伸縮容的功能,擴展伸縮容策略,實現實時離線計算資源的混部,利用實時離線錯峯計算的優勢進一步降低實時計算的服務器成本。同時我們也會嘗試優化 Yarn 的細粒度資源調度,比如分配給 jobmanager 和 taskmanager 少於一核的資源,做更精細化的優化。

在流批一體方面,我們準備利用 Flink 的批處理能力小範圍做批處理的應用和 web 場景的試水。同時在數據湖架構的基礎上,繼續探索存儲層面批流一體的可能性。最近我們還在關注 FLIP-188 提案,它提出了一個全新的思路,將流表和批處理表進行一定程度的統一,可以實現一次 insert 就把數據同時寫入到 Logstore 和 Filestore 中,讓下游可以實時消費 Logstore 的數據做實時 Pipeline,也可以使用 filestore 的批式數據做 ad_hoc 查詢。後續我們希望也能做類似的嘗試。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/wX5dnix5Bu9DDrykEDN_jQ