流批一體的近實時數倉的思考與設計
摘要: 基於對數據時間旅行的思考,引出了對目前三種數倉形態和兩種數倉架構的思考。結合數據湖在 Flink 的應用和數據湖元數據類型的思考,探索了基於數據湖的 Flink SQL 流批一體的實踐,在流批一體 SQL 表達一致、結果一致性、流批任務分離、混合調度依賴等進行了設計和探索。同時,歡迎大家多分享具體實踐,一起共築新的數據實踐方式。
01 數據的時間旅行和業務對數據的本質要求
大規模的數據處理興起於 Hadoop 生態的發展,關鍵在於分佈式儲存和分佈式計算的發展,造就瞭如今近百種有關大數據的生態技術。數倉理論和建模理論基於大數據技術體系得以快速發展,其中離線數倉的標準化建設得到了廣泛應用。數據的本質是一種行爲的具象,業務在對數據的需求,核心在於對行爲的可探索和可觀察。基於此,我們需要明確一點,大數據技術是否完全滿足了業務對數據需求在時間維度上的確定性了呢,這點是值得思考的。那麼我們先來看一下數據的時間旅行。
業務期望的數據:用戶空間下的時間數據,t1 時間數據,用戶自然時間點或自然時間段的明細或者統計數據。
傳輸延遲:App 用戶,數據發送到網關或者日誌服務系統,或者 Server 日誌落文件系統所產生的延遲。Event 進入到存儲空間,可以代表數據已經是確定的,基本可觀察,一般情況下,這個延遲很小。但是,在某些情況,比如 APP 的日誌產生之後,但是因爲網絡等問題一直沒有發送,或者 Server 宕機,導致延遲發送或者最終丟失。總體而言,傳輸延遲屬於不可控延遲,暫時沒有什麼好的技術方案來解決。
存儲空間:數據承載於實際的存儲中,離線數倉承載於具體的分佈式文件系統,實時數倉基於 Kafka 的消息隊列系統,近實時數倉承載於數據湖存儲中。這裏可以抽象來看離線數倉,Event 承載於分佈式文件系統,以小時分區爲例,某個小時的分區本質是自然時間產生的文件的集合,時間精度退化爲小時級別。
計算延遲:數據進入存儲之後,與進入計算空間的時間差,t3-t2。實時數倉中,計算延遲是數據的 ProcessTime-IngestTime。離線數倉中,計算延遲是調度產生實例運行時間 - 數據進入存儲空間的時間差。本質離線數倉和實時數倉的計算延遲在抽象上看是一致的。計算延遲在不同的數倉體系下,產生的時效不同,我們會劃分爲三種主流的數倉體系,秒級的實時數倉,分鐘級的近實時數倉,小時級的離線數倉。可以看出,數倉的時效性差異,因爲傳輸延遲的不可控,退化爲計算延遲的差異。
02 離線、近實時、實時三種數倉在時間維度下的成因
在離線數倉和實時數倉,常常會提到數據的有界和無界,認爲離線數倉的數據是有界的,實時數倉的消息流是無界的。準確與否在於數據的確定性考量。
離線數倉的確定性,在於文件自然生成時間的確定性和不可更改性,某個小時的自然文件生成,近似等於事件時間在自然時間的確定性,反例就是我們能看到數據漂移的情況,事件時間會或多或少落入上個小時或者下個小時的自然文件生成時間。那麼離線數倉的確定性,實質是數據的 IngestTime 的確定性,具有天然的文件屬性,易於分割。當我們說離線數倉計算的數據是準確的時候,默認了傳輸延遲帶來的影響很小或者默認了當前小時的數據指標的標準是文件的自然形成時間。
實時數倉,常常會提及不確定性或者說 Lambda 架構實際是對實時數倉的不確定性的替代方案。這種不確定性的原因是什麼呢?這裏分爲四類情況說明,一是 ETL 的處理,從窗口上來說,是單條數據即爲一個窗口,窗口的產生和銷燬在一個 Event 中完成,y=window(data)。二是基於 EventTime 的時間窗口,如果再定義延遲時間,y=window(datas, datas.EventTime, delay),第三種和第四種分別就是 IngestTime 和 ProcessTime 的時間窗口函數。對比離線數倉,可以看出,基於 IngestTime 的時間窗口和離線數倉的時間語義最爲一致。離線數倉在時間窗口上,可以看做爲數據進入文件的自然時間所對應的小時窗口,數據所承載的文件的確定性,保證了小時窗口的數據確定性,y=window(files)。
近實時數倉,比如基於 Iceberg 的數據湖建立的近實時數倉,在於離線數倉對比中,實際是將基於小時文件細分到分鐘級別的快照文件上來,y=window(snapshots)。對比實時數倉,因爲 Kafka 的 IngestTime 目前在精確性上是不精確的,基於快照的文件劃分,在精確性上有一定的保證,同時在降低時效程度,從秒退化爲分鐘,很多場景是可以容忍的。
三種在時間維度對比上看,一是在某個時間,統計的本質對業務的需求都是近似的,這個本質是傳輸延遲所帶來的,但是這個在實踐中,不影響數據的可用性和統計學意義。二是不同數倉的劃分,是存儲和計算技術發展所帶來的。三是離線數倉的確定性模糊了傳輸延遲,實時數倉的不確定性,是對傳輸延遲的一種取捨,人爲的限定了 EventTime 的最大延遲時間,從而保證了指標的時效性,都是具有實踐的意義所在。
03 Lambda 和 Kappa 架構在時間維度下的取捨
當離線數倉剛剛發展的時候,只有一種數倉架構,也是基於大數據分佈式處理剛剛發展的原因。隨着實時技術的發展,大家在時效性上有了更多要求,但是同離線數倉對比的時候,在數據的準確性上,因爲統計的窗口不同,必然會導致某個時刻的指標結果的不嚴格一致。
爲了解決這種不嚴格一致的情況,Lambda 架構(由 Storm 的作者 Nathan Marz 提出的)產生的,實時確保時效,離線確保準確。最終會以確保離線三個時間窗口的統計一個事件時間窗口的結果,來回補實時數倉以爲 EventTime 窗口,因爲時效性丟棄的延遲數據的結果,從而保證業務上對 EventTime 窗口的要求,或者默認爲離線的 IngestTime 所產生的文件分區近似認爲 EventTime 的時間窗口。這種帶來的弊端,維護兩套數據路線,而大家總在想辦法解決。
Kappa 架構的提出,得益於實時計算的效率提升,但是因爲在批處理技術短板,生產實踐推廣受限。Kappa 架構是基於實時 EventTime 的一種數據窗口處理,因爲 Kafka 的 IngestTime 不精確和爲了同離線數倉對比而權衡考慮,EventTime 在傳輸延遲上的不可控,導致 Kappa 架構的準確性就會出現折扣。雖然是業務上最準確的時間範圍,可行性上確不佳。
近些年來,不斷髮展的 MPP 架構的 OLAP 查詢引擎,並不會涉及到時間窗口的計算取捨,OLAP 引擎本質是基於 ProcessTime 來加速查詢的一種技術手段,是數倉不可分割的一部分,但是傳輸延遲的不可控沒有解決,但是將計算延遲下推到了查詢時,通過快速查詢來解決儘可能減少計算延遲,同時保證了查詢的靈活性,自助分析探索上有着廣泛的應用。
從數倉架構的發展上看,不斷在圍繞結果的確定性,技術的可行性,數據的時效性,查詢的靈活性上,不斷的權衡,各個組件也是依據實際需求而發展起來的。
04 數倉一體的可行性思考
基於三種數倉體系和兩種架構的思考,每個設計都是兼顧一種或多種考量,那麼能不能實現一種機制,能夠較好的滿足數倉需求體系建設呢?從目前的技術發展上看,是有一定的可能性的。架構體系的發展一是基於技術基礎,二是不斷吸收組件的優點,做加法。
除去實時、近實時、離線數倉的劃分,從技術的視角去看數倉建設的可行性。那麼我們就要選取一些重要的點,取捨掉一些不可能的實現。
第一點是結果的確定性,這點是基於離線數倉發展的思考。不確定性帶來的問題是信息的不對稱,確定性的結果是可以模糊一定的指標含義的。
第二點是數據的時效性,高時效必然能夠滿足低時效,反之不然。另外數據的時效性,本身是基礎組件的技術發展所限制的。
第三點是開發的便利性,排在時效性後面的考慮是,便利性是基於應用層面建設的,難度一般是弱於基礎組件的,可以通過不斷實踐優化,達到一個良好的使用體驗。
第四點是查詢的靈活性和高響應,OLAP 的基礎設計保證了查詢速度,那麼 OLAP 的技術架構體現是可以複用或者拓展的。
那麼基於上面四點考慮,可以在實時數倉的基礎上,優先解決掉確定性問題。這個是很重要的一個命題,要保證計算結果同離線數倉的一致性。這一點的實現方面,可以參考離線數倉,模糊 EventTime 和 IngestTime,用文件的 start 和 end 作爲確定性的依據,文件的中間實時計算,確保時效性。那麼基於 Flink,就需要實現一種基於文件自然分割的 Watermark 機制,作爲計算窗口劃分的依據。
在確定性問題之後,需要解決計算的成本和使用的成本,這裏比較重要的是存儲層,實時數倉依賴 Kafka,Kafka 發展不具備數倉一些重要的點,成本是一個方面,查詢是一個方面,Kafka 無法架構在各種 OLAP 引擎或者計算引擎上面。這裏,近實時數倉的依賴,比如數據湖或者 Paimon,數據湖分鐘級的時效。不過,從發展的角度上看,是一種可行的解決方案。數據湖兼顧了流計算和批計算,同時,如果未來 OLAP 引擎如果能夠在數據湖上實現類似 MPP 架構的查詢效率,這也是有可能的,比如短期可以用數據冗餘,將數據湖格式的數據轉換一份到 OLAP 對應的引擎上實現加速查詢。
第三個方面,流式計算的管理和依賴機制,借鑑於離線數倉的管理方式,需要一套完備的數據依賴管理,任務容錯回跑機制。實時數倉一般是基於單個任務式的管理,離線數倉是基於任務流的管理,那麼實時數倉的發展,也必然要實現任務流的管理方式,覆蓋整個開發鏈路。
爲了實現一種統計的數倉架構,那麼需要的發展工作如下:一是着重發展存儲層,比如數據湖,既要比較好的適應流和批引擎,又要能夠高度適應 OLAP 查詢引擎。二是在實時數倉或者近實時數倉,引入類似離線數倉的調度依賴管理和補數和容錯回跑機制,或者在離線調度上兼容流任務依賴調度,實現任務流級別的管理和流批一體的數倉實現。三是在引擎層着重發展 Flink 批處理能力。
最終的任務運行方式同時包含三種:實時模式、離線模式、業務模式,分別對應着不同的數據準確性級別。也可以任選其一或者其二作爲運行方式。
05 基於 Flink 和數據湖的流批一體近實時數倉設計示例
數倉任務在離線調度和實時任務的簡單抽象示例:
數據源 => 同步任務 / 實時任務 =>
stg_table(partition=hour) => 計算任務 (insert overwrite partition=hour)=>
dwd_table(partition=hour)=> 計算任務 (insert overwrite partition=hour)=>
dws_table(partition=hour)=> 同步任務 =>OLAP 加速 => 數據服務
如果存儲層是基於數據湖(以 Paimon 爲例):
離線調度產生的表的版本信息,commit_kind: insert overwrite 類型的。同時離線任務的驅動,是基於調度依賴的驅動,one by one 的調度。
如果是基於流式計算,比如分鐘級生成 snapshot 那麼會演變爲:
數據源 => 同步任務 / 實時任務 =>
stg_table(version=snapshot_id) => 計算任務 (insert into version=snapshot_id)=>
dwd_table(version=snapshot_id)=> 計算任務 (insert into version=snapshot_id)=>
dws_table(version=snapshot_id)=> 同步任務 =>OLAP 加速 => 數據服務
那麼啓動多個任務,任務是持續的運行。commit_kind: insert into 類型的。
那麼要想實現流批一體的近實時數倉,需要解決如下問題:
- Flink 任務支持批量計算能力要持續不斷的加強
從 Flink 1.16/1.17 的版本發佈情況,在批處理能力上有比較大的提升,同時,社區也在持續不斷的加強批處理能力以及同 hive 的兼容能力。
- 如何使用同一份 Flink SQL,既可以用於批任務調度,又可以用於流任務運行呢
兩張表:dwd_partition_word_count,dws_partition_word_count,計算 word count
CREATE TABLE tablestore.tablestore_test.dwd_partition_word_count (
logdate String,
user_id bigint
) PARTITIONED BY (logdate)
WITH (
'bucket' = '3'
);
CREATE TABLE tablestore.tablestore_test.dws_partition_word_count (
logdate String,
user_id bigint,
cnt BIGINT,
PRIMARY KEY (logdate,user_id) NOT ENFORCED
) PARTITIONED BY (logdate)
WITH (
'bucket' = '3'
);
批任務的 Flink SQL:
insert overwrite tablestore.tablestore_test.dws_partition_word_count PARTITION(logdate=${start_date})
select user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate=${start_date} group by user_id;
-- 或者
insert overwrite tablestore.tablestore_test.dws_partition_word_count
select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate=${start_date} group by logdate,user_id;
流任務的 Flink SQL:
insert into tablestore.tablestore_test.dws_partition_word_count
select logdate,user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count group by logdate,user_id;
如何用一個 Flink SQL 來實現流批模型下的不同呢?
不同點:Insert into 和 Insert overwrite 的問題,這個通過在提交運行模式的時候,如果是批任務,則是 Insert Overwrite,如果是流任務,則轉爲 Insert into,這個在技術上沒有什麼難點。
不同點:Where 條件的數據範圍問題。抽象來看,流任務和批任務的時間範圍在表達上是可以統一的
insert overwrite tablestore.tablestore_test.dws_partition_word_count
select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate>=${start_date} and logdate<=${end_date} group by logdate,user_id;
比如跑 4 月 22 號一天的數據,執行的批 SQL 爲:
insert overwrite tablestore.tablestore_test.dws_partition_word_count
select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate>='20230422' and logdate<='20230422' group by logdate,user_id;
如果用流模式跑,執行的 SQL 可以爲:
select logdate, user_id,count(1) as cnt from tablestore.tablestore_test.dwd_partition_word_count where logdate>='19700101' and logdate<='99990101' group by logdate,user_id;
insert overwrite/into 和時間範圍,可以由平臺執行的時候自動轉換和參數輸入。
- 批任務的調度和流任務的計算如何分離
任務完成開發,在批模式下,用調度任務驗證了邏輯無誤,那麼之後可以用流模式,一直持續不斷的運行。一是計算邏輯變更或者歷史數據修復怎麼辦,二是可不可以支持流批雙跑。其實本質是一個問題。如果計算邏輯變更,那麼可以修改流批一體的 SQL 邏輯,然後流任務重啓應用新的計算邏輯。同時,流批一體的 SQL,在調度上回跑歷史數據,重新刷寫數據。
重刷歷史數據的時候,流任務會不會讀取到重刷的歷史數據進行計算。
這個問題主要是通過上述說的數據湖版本 commit kind 解決。批任務只應用 insert overwrite,流任務應用 insert into. 如果流任務檢測到 insert overwrite 的版本提交,直接跳過,不做實際的數據讀取和處理。只處理 insert into 的數據。實際批任務的執行,對流任務不會產生影響。
目前在數據湖流式讀取上,只需要加個開關選項就可以實現。
- 流任務的 Insert into 如何實現主鍵寫入
如果流任務的 Insert into 不能實現主鍵寫入,那麼分區數據的重複性無法解決,那麼就只能流批雙跑來解決數據的重複性問題。也就是,下游如果是主鍵冪等寫入,insert into 和 insert overwrite 語義等同。
這個可以通過數據湖主鍵表 (比如 Paimon 的主鍵表) 實現。Paimon 的主鍵表已初步具備生產可用性。
- 流批任務的調度依賴
如果一個流任務,下游接的是批任務調度,如果實現調度依賴呢?
比較優雅的實現可以是,在流任務寫入下游表的時候,假如數據的 Watermark 寫入到下游表的屬性中,如果最晚的數據已經是當前小時的 05 分,那麼當前小時的下游調度任務,通過檢查表的屬性時間,就可以判斷批任務的調度實例是否應該拉起。或者也可以基於流任務的運行延遲做檢查依賴。
基於上述的實現和解決,我們基本就可以實現流批一體的 Flink SQL 在批模式和流模式下運行,如果調度依賴做的比較完善的情況下,可以實現流批混跑。同時補數或者雙跑對流任務的穩定性不會產生影響。
實際開發,就可以用批任務先開發驗證,然後用流模式拉起,數據產出基本是分鐘級別的。出問題可以用批任務修正。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Pn_f01__R9IoqnkR_C59ow