黃彬耕:Iceberg 在騰訊微視實時場景的應用

分享嘉賓:黃彬耕 騰訊 數據工程師

編輯整理:田長遠

出品平臺:DataFunTalk

導讀: 今天主要分享 Iceberg 在微視的一些使用情況。全文將圍繞下面三點展開:

01 爲什麼會使用 Iceberg?

首先看下數倉架構。數倉的數據接入主要有兩個來源,一個是客戶端的上報,還有一個是業務後臺 DB 的上報,這兩份數據都會通過一個消息隊列接入數倉。我們的數據倉庫採用 lambda 架構,總體分爲離線和實時兩套體系,分別有自己的計算和存儲體系。離線主要是以 Hive 作爲存儲載體,計算以 Spark 爲主,Map Reduce 爲輔。實時數據處理主要用了 Flink,再輔以 Kafka 和 OLAP。

我們對接的應用主要包括對離線數據用 SQL 做的一些分析洞察,還有一些數據可視化的需求、數據看板的搭建、離線和實時的指標推送。同時,我們也會對接運營系統和 AB Test 平臺,以此提供數據支撐。目前我們的數倉存在一些問題,比如實時鏈路,相對於離線來說成本比較高。同時,lambda 架構本身存在一些問題,它的兩套計算和存儲體系存在額外的存儲開銷,指標也會有兩次計算,也就是要維護兩份代碼,這就可能導致數據不一致的問題。

要解決這些問題,需要分析問題產生的原因。其中一個主要原因是一些傳統存儲組件,無法很好地在一個組件上支撐實時和離線場景。

首先看 Kafka,它的成本相對比較高。跟 Hive 相比,Kafka 每單位的存儲成本超過了 Hive 的 10 倍。其次,它的數據壓縮的效果也比較差,同樣的數據寫入 Kafka,壓縮後的數據量相比於 Hive,也接近 10 倍。所以這兩個疊加之後,它的成本會比 Hive 高兩個數量級。Kafka 雖然可以很好地支持實時讀寫,但是它對離線場景下的一些需求又支持得不好。在批處理時,它不能像 Hive 那樣做分區過濾和相關優化。在數據回溯的場景,第一,Kafka 的存儲成本比較高,不適合留存比較久的歷史數據;第二,它只能基於一個偏移量去做數據回溯,無法確定這個偏移量對應的數據是什麼數據。所以相比之下,Hive 基於分區的回溯更能符合我們的回溯需求。

然而 Hive 雖然在離線場景下比較成熟,但是在實時場景提供的支持並不好。

首先,它的延遲比較高。目前 Hive 提供的延遲比較低的需求是通過使用 Hive 小時表來提供一個大概延遲在兩到三個小時的數據。如果這個延時想要再縮減,可能就需要把 Hive 的分區做到更細的粒度。更細粒度的分區也可能會帶來一些問題,比如小文件的問題,會對 HDFS 的 NameNode 造成比較大壓力,同時它的讀取效率也不高。另外,HMS 的擴展性問題,Hive 的元數據主要是使用 MySQL 來做存放,MySQL 的擴展性不好。而如果我們的分區粒度越細,分區數據越多,那 MySQL 就更容易遇到擴展性問題。同時,太多的分區也會對查詢的效率造成一定影響。因爲 Hive 會首先到元數據去獲取這個分區信息的目錄,然後再到那個 HDFS 裏面對這些目錄做一個 list 文件的操作,拿到文件之後再去做數據的讀取。這個過程涉及到的分區越多,查詢就會越慢。

所以基於這些問題,我們希望有一個存儲系統,能夠很好地同時支持實時和離線的場景。在成本比較低的情況下,滿足我們的實時需求。

現在的數據湖文件系統,都可以對實時和離線提供比較好的支持。我們公司內部主要在使用 Iceberg,我們把 Iceberg 與 Hive 和 Kafka 做一個對比。

從設計上來看,Hive 對離線場景的支持 Iceberg 都可以做到,而且在某些方面做得更好。比如在批處理中,Iceberg 的謂詞下推可以做到文件級別的過濾,而 Hive 主要是在分區級別做過濾,然後在文件內部,利用文件的格式再去進行謂詞下推。還有 Iceberg 通過版本控制,可以做到更好的讀寫分離。在 Hive 場景下對一些歷史分區做數據壓縮,可能會影響線上的讀取任務,而 Iceberg 不存在這種情況。並且,更爲重要的一點是 Iceberg 提供了更低延時的讀寫特性。所以從離線存儲的角度來講,用 Iceberg 替換 Hive 可以得到很好的收益。

從實時的角度來看,Iceberg 與 Kafka 對比,一個顯著優勢是 Iceberg 的成本可以低很多。因爲 Iceberg 跟 Hive 使用的底層存儲比較類似,是基於 HDFS 的,成本可以做到比 Kafka 低兩個數量級;相比於 Kafka,Iceberg 在實時性上會差一點。因爲 Kafka 可以做到流式的讀寫,而 Iceberg 只能做到分鐘級別的延遲。但在我們的數據場景中,強實時的場景比較少,因此 Iceberg 可以很好地支持我們的實時場景,可以在一些新的增量模型中承載流批一體的存儲。

02 我們如何使用 Iceberg?

我們目前的架構,主要是用 Iceberg 來替換掉之前 Kafka 以及 OLAP 組件來承接一部分實時數據需求,以此降低實數據需求的實現成本。

出於初期實驗的目的,我們仍然使用 lambda 架構,實時和離線的數據並存,雖然 Iceberg 有能力替換 Hive 提供的功能,但是由於我們之前已經在 Hive 上實現了存量數倉,整體的遷移成本和風險都比較高,可能會影響數據提供的穩定性,所以目前做整體的遷移是不太現實的。因此,我們初期的應用主要是用 Iceberg 來提供一個實時數據場景,後續在一些新的模型上再使用 Iceberg 去完成流批一體的統一存儲。

下面介紹一個已經實現的實時需求方案。這個需求是給我們的運營系統提供一些實時的累積數據。我們通過 Hive 先提供一個 T+1 天級別的累積數據,然後再把當天的增量數據通過 iceberg 來落地。然後在下游配置一個定時調度的推送模塊,去做數據的合併,得到最終的實時累計數據,推送到 Kafka 裏面。交付給下游系統。

我們用到了 Iceberg 的增量讀取接口,不需要推送大量的全量數據,只需要推送實時的增量數據即可,同時也不會出現少推數據的情況。這也涉及到了實時維表的需求,因爲實時累計數據需要通過一個最終累計的狀態表來做。我們一開始嘗試了用 Iceberg 的 upsert 功能。但是由於 Iceberg 只支持 copy-on-write 的格式,而我們的維表每次更新的數據佔全量數據的佔比很低,可能只有萬分之一。如果用 copy-on-write 模式,每次更新數據就要做一個全量的寫入,這樣對資源的消耗比較大。所以我們最後還是使用了 lambda 架構的模式,通過 Hive 的累計數據和 Iceberg 的增量數據,在推送時再去做 merge。後續也希望可以嘗試用 Iceberg 提供的 merge-on-read 模式去生成實時累計數據來簡化流程。

對於這個模式的需求落地,除了從實驗角度考慮之外,也需要考慮到表的複用性。所以 Iceberg 的模型會參照之前的離線數倉去進行建模的規範,同時還要考慮到數據的複用性。在做早期需求的同時,也爲後期更多需求落地打好一個基礎。用這種 Iceberg 的方式實現,相比於之前實時鏈路的 Kafka 加 OLAP 的方案,成本降低超過 99%。

我們並不只是希望 Iceberg 去提供實時的數據,還希望在新的場景中,Iceberg 可以承載流批一體的存儲。爲了實現這個目標,還需要研究 Iceberg 是否具有在離線的場景下的一些功能,其中一個是數據回溯功能,像表增加字段或者修改計算口徑等操作,都需要去數據回溯。另外,如果上游有數據修復,也會需要去回溯重跑一段歷史分區。

在離線場景下,數據回溯實現比較簡單。

以填表爲例,每個任務的實例必須處理一個某一天分區的數據。如果我們想要回溯 3 月的 1-3 號 3 天的數據,只要在調度平臺上把這三個實例任務重跑就可以實現了。但是在流批一體存儲的場景下,表可能是使用 Flink 生成的,Flink 的回溯可能會稍有不同,因爲它是一個線上一直在運行的任務,無法通過直接重跑的方式去做回溯。可能需要通過複製 Flink 應用的方式來複用它的數據生產代碼,再通過修改參數的方式讓 Flink 的 source 進入一個回溯的批讀模式,最後再通過傳參的方式指定回溯的數據時間範圍。當然,現在社區在新版本 Flink 中,source 也新增了可以實現這個功能的接口。但是它還存在一些缺陷,比如 Flink 的 source 是沒有狀態的,也就是在回溯任務失敗,重啓執行後可能會產生一些重複數據。這是因爲在第一次跑的時候已經有一些 check 成功了,提交了部分數據,而這時發生了故障失敗重啓,重啓之後的任務又會重新讀取 source 數據,那麼,第一次運行時提交的數據就變成了重複數據。在數倉裏,數據回溯之後產生重複數據是不能接受的。所以還是希望它可以實現 exactly once 的語義,也就是數據一致性。

我們通過給 source 增加狀態的方式來實現這個功能。在之前一個比較老版本的 Flink 上,我們給它的 source 增加了一個切塊的功能。

首先 Flink source 由兩個部分組成。

第一部分是一個 source 算子,主要負責一個單線程的文件掃描,然後把掃描的文件下發到下游多節點的 FlatMap 算子上。然後 FlatMap 主要負責把這個文件數據讀取出來,再下發給下游做數據處理。我們就可以在 source 節點上增加一個狀態。把它讀取出來的文件,首先按照表的 partition 做聚合,得到一個 map 的結構。然後 map 的每一個 key 對應一個分區的數據,接下來就可以按分區做文件的下發,先下發完一個分區,再接着下發第二個分區的數據。第二個要做的事是把 checkpoint 按照 partition 對齊,保證每一個 checkpoint 提交上去的都是一個完整的 partition。我們實驗的方法是:如果 partition 在沒有完成下發的情況下到了 checkpoint,就拋異常,失敗掉,這樣可以保證在一個 partition 下發完成之後,再等待一個 checkpoint 成功,來保證一次提交是一個完整的分區。

這種處理方式,在 partition 等待 checkpoint 的過程中有一些性能的開銷,我們可以通過傳參的方式指定一個 checkpoint 提交的分區數量。將等待的時間平攤到多個分區上,來提升整體的性能。加入了這個狀態之後,在失敗重啓的場景下, source 可以跳過已經消費過的分區,下發就不會產生重複的數據,這樣就實現了 exactly once 的語義。

還需要支持的另一種場景是流轉批場景,如果使用 Iceberg 做流批一體的存儲,在上游的明細表,主要是 ODS 和 DWD 層的表可能會使用 Flink 生成。但這個表的二次加工可能會使用批處理去做計算。假設批處理是一個 3 月 2 號的實例,我們需要在 3 月 3 號時觸發計算。那這個觸發的批處理任務就需要知道上游的數據表什麼時候的數據是完備的。如果簡單地用一個延時調起方法的話,在一些異常的場景下會出現問題,比如上游的 Flink 鏈路出現了問題,導致數據沒有產生或者遲到了,那批處理的任務處理的就是一個不完整的數據或者是空跑。如果是人爲發現了之後再去做回溯,成本會較高。

對此,我們使用的方案是通過在 Flink 的 Sink 裏面,從數據中選取一個時間字段寫入表的快照的方式去通知下游當前的數據進度,可以看到這裏的 Flink Sink 也包含了兩個算子。第一個是一個 writer 算子,它負責把數據寫入文件,writer 在 checkpoint 觸發時,會把自己寫入的最大的一個時間傳到 commit 中,然後 commit 從多個上游傳過來的時間中選取一個最小值作爲這一批提交數據的時間,並寫入表的元數據。

在下游的批處理任務之前加一個監控任務去監控最新快照元數據。如果它的時間已經超過了當前的分區時間,就認爲這個表的數據已經完備了,這個 monitor 任務就會成功觸發下游的批處理任務進行計算,這樣可以防止在異常場景下數據管道或者批處理任務空跑的情況。

我們在完善 Iceberg 在批處理場景下的功能之後,可以設計一個流批一體的架構,雖然看起來總體上還是一個 lambda 架構,但它有一些改進。首先,它在生成 DWD 的過程中,統一使用了 Flink 計算引擎去生成,並進行雙寫,一份寫入 Iceberg,另外一份寫入 Kafka。如果沒有強實時的需求,很多數據都不需要再走 Kafka 這條鏈路。所以在 DWD 層可以做到計算引擎的統一。其次 MQ 的數據除了被 Flink 任務消費,還會同步一部分落地到 ODS 層,用作回溯數據的支持。

在我們用 Iceberg 替換掉 Hive 之後可以做一個準實時以及離線場景下的一個流批統一存儲,很大程度上解決我們之前遇到的很多指標重複計算帶來的口徑不一致問題,還有一些冗餘的存儲開銷,節約我們的準實時需求的成本。

我們會在新的一些場景下去落地這個流批一體的方案。

03 在 Iceberg 的表維護場景下的實踐

Iceberg 表在維護過程中比 Hive 稍微複雜一些,Hive 只需要清除過期數據即可,但是 Iceberg 除了清除過期數據,還要做過期快照的刪除、小文件的合併、元數據的合併,還包括清除一些孤兒文件。

這些功能大部分平臺側都可以完成。這裏講一下小文件合併的實踐。因爲 Iceberg 會用 Flink 去生成表的數據,Flink 的提交的批次間隔比較小,就帶來了更多的小文件,因此需要定期去做合併。合併主要是通過 Spark 實現的。Spark 對小文件合併主要有兩種策略,一種是 BinPack 揹包策略,另外一種是加入了排序邏輯的 Sort 策略。揹包策略主要是把小文件加入到相同大小到揹包裏面去做合併,最後每一個揹包就是一個合併後的文件。Sort 策略在合併小文件的基礎上,會做一個分組排序的功能,使用我們指定的一個字段去做分組排序,使得這個字段在各個分區之間整體是有序的,同時每個分區內部也是有序的。分組排序獲得的收益主要是可以減少表的大小。

我們一般在明細表上做分組排序,用 Sort 策略做小文件合併。相比於揹包策略,合成後表的大小可以縮減 40% 到 70%。表大小的縮減帶來的直接收益是二次讀寫的時間縮短和點查效率大幅提升,這主要得益於使用這個排序字段做點查時,Flink 提供了一個謂詞下推的文件過濾效果。

簡單分析一下原理。

首先是存儲收益,因爲 Iceberg 主要是基於 Parquet 列存,這種存儲格式會通過對數據進行編碼以及壓縮算法來壓縮數據。如果某一列數據的局部相似度很高,那壓縮算法就可以更好地發揮它的作用,生成一個更小的數據文件。微視的明細表有一個特徵就是大量的字段都跟用戶相關。所以我們按照用戶 ID 做數據排序之後,大量的字段相鄰的值都是相同或者相似的。最終落地出來的文件大小會比沒排序之前小很多。

第二是文件過濾,這個主要得益於 Iceberg 在元數據的 My Manifest 文件裏面保存了每一個列值的上下界。在查詢時,可以將查詢的條件和每一列的上下界做對比。如果發現這個值不可能存在這個 Data 範圍中,那麼在文件掃碼階段就可以把這個文件過濾掉。

上圖是我們做的數據對比,左邊一列是使用 BinPack 策略合併小文件的結果,右邊是用 Sort 策略合併小文件的結果。對於同一份數據進行合併,合併之後的總文件數量大概是 70 多個,此時查詢 3 個用戶的數據,在使用 BinPack 策略合併後,需要掃描 77 個文件,也就是沒有做任何的文件過濾。但是在 Sort 排續策略之後,由於已經將其他部分的文件進行了過濾,用戶只需要掃描 3 個文件就可以。如果是查詢 1 個用戶的話,就只需要訪問一個文件,這個效果是比較明顯的。

04 問答環節

Q:數據在 Iceberg 中進行打寬或者聚合,用 SQL 方式多久調度一次?在涉及到 join 的場景下,如果是表 A 的增量去 join 表 B 的全量,那麼是表 B 的增量去校驗 A 表的全量嗎?

A:對於這兩個問題,首先調度的頻率是根據數據需求的實效性來決定的,如果這是一個離線的 T+1 的需求的話,就是天級別的調度,如果對實效性要求比較高,可以考慮用 Flink 去做分鐘級別的聚合。基本上實效性要求越高,需要的代價就越高。對於 Join 的場景,是這樣去做的。這裏主要考慮的是實時場景下的 Join,因爲在離線場景中 Join 是沒有什麼問題的,因爲它的實效性要求不高。如果是在實時場景下,我們需要做一些維表規劃,如果數據有很強的實效性要求,應該儘量避免出現 Join 的情況。

Q:Iceberg 數據湖底層存儲跟離線數倉底層存儲都是使用同一個集羣嗎?

A:可以是同一個集羣,但不是必須的。

Q:使用 Iceberg 代替 Hive 來進行建模,對於一些實時需求,更新頻率是怎麼樣的?每一層都需要更新是否影響性能?

A:對一些實時的需求,時效性要求比較高,那層次就不能太多,因爲每一層都會引入一些數據延時。對於需求場景,實時需求可能從 DWD 層就直接出這個指標。如果 DWD 層的讀取效率比較低的話,在下游的 ODS 層可以做一些聚合,然後再從 DWS 層輸出數據。總體而言,實時性要求越高的場景,模型的層次就不能太多。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/aEhZJUXOmNilHuImf7KB0A