實時數據聚合怎麼破

作者簡介

數據猩猩,攜程數據分析總監,關注分佈式數據存儲和實時數據分析。

實時數據分析一直是個熱門話題,需要實時數據分析的場景也越來越多,如金融支付中的風控,基礎運維中的監控告警,實時大盤之外,AI 模型也需要消費更爲實時的聚合結果來達到很好的預測效果。

實時數據分析如果講的更加具體些,基本上會牽涉到數據聚合分析。

數據聚合分析在實時場景下,面臨的新問題是什麼,要解決的很好,大致有哪些方面的思路和框架可供使用,本文嘗試做一下分析和釐清。

在實時數據分析場景下,最大的制約因素是時間,時間一變動,所要處理的源頭數據會發生改變,處理的結果自然也會因此而不同。在此背景下,引申出來的三大子問題就是:

可以說,數據新鮮性和處理及時性是實時數據處理中的一對基本矛盾。

另外實時是一個相對的概念,在不同場景下對應的時延也差異很大,借用 Uber 給出的定義,大體來區分一下實時處理所能接受的時延範圍。

一、數據新鮮性

爲簡單起見,把數據分成兩大類,一類是關鍵的交易性數據,以存儲在關係型數據庫爲主,另一類是日誌型數據,以存儲在日誌型消息隊列(如 kafka)爲主。

第二類數據,消費端到感知到最新的變化數據,採用內嵌的 pull 機制,比較容易實現,同時日誌類數據,絕大部分是 append-only,不涉及到刪改,無論是採用 ClickHouse 還是使用 TimeScaleDB 都可以達到很好的實時聚合效果,這裏就不再贅述。

針對第一類存儲在數據庫中的數據,要想實時感知到變化的數據(這裏的變化包含有增 / 刪 / 改三種操作類型),有兩種打法。

**打法一:**基於時間戳方式的數據同步,假設在表設計時,每張表中都有 datachange_lasttime 字段表示最近一次操作發生的時間,同步程序會定期掃描目標表,把 datachange_lasttime 不小於上次同步時間的數據拉出進行同步。

這種處理方式的主要缺點是無法感知到數據刪除操作,爲了規避這個不足,可以採用邏輯刪除的表設計方式。數據刪除並不是採取物理刪除,只是修改表示數據已經刪除的列中的值標記爲刪除或無效。使用這種方法雖然讓同步程序可以感知到刪除操作,但額外的成本是讓應用程序在刪除和查詢時,操作語句和邏輯都變得複雜,降低了數據庫的可維護性。

打法一的變種是基於觸發器方式,把變化過的數據推送給同步程序。這種方式的成本,一方面是需要設計實現觸發器,另一方面是了降低了 insert/update/delete 操作的性能, 提升了時延,降低了吞吐量。

**打法二:**基於 CDC(Change Data Capture)的方式進行增量數據同步,這種方式對數據庫設計的侵入性最小,性能影響也最低,同時可以獲得豐富的開源組件支持,如 Cannal 對 MySQL 有很好支持,Debezium 對 PostgreSQL 有支持。利用這些同步組件,把變化數據寫入到 Kafka,然後供後續實時數據分析進一步處理。

二、數據關聯

新鮮數據在獲取到之後,第一步常見操作是進行數據補全(Data Enrichment), 數據補全自然涉及到多表之間的關聯。這裏有一個痛點,要關聯的數據並不一定也會在增量數據中,如機票訂單數據狀態發生變化,要找到變化過訂單涉及到的航段信息。由於訂單信息和航段信息是兩張不同的表維護,如果只是拿增量數據進行關聯,那麼有可能找不到航段信息。這是一個典型的實時數據和歷史數據關聯的例子。

解決實時數據和歷史數據關聯一種非常容易想到的思路就是當實時數據到達的時候,去和數據庫中的歷史數據進行關聯,這種做法一是加大了數據庫的訪問,導致數據庫負擔增加,另一方面是關聯的時延會大大加長。爲了讓歷史數據迅速可達,自然想到添加緩存,緩存的引入固然可以減少關聯處理時延,但容易引起緩存數據和數據庫中的數據不一致問題,另外緩存容量不易估算,成本增加。

有沒有別的套路可以嘗試?這個必須要有。

可以在數據庫側先把數據進行補全,利用行轉列的方式,形成一張寬表,實現數據自完備,寬表的變化內容,利用 CDC 機制,讓外界實時感知。

三、計算及時性

在解決好數據變化實時感知和數據完備兩個問題之後,進入最關鍵一環,數據聚合分析。爲了達到結果準確和處理及時之間的平衡,有兩大解決方法:一爲全量,一爲增量。

3.1 全量計算(1m < 時延 < 5m)

全量計算以時間代價,對變化過的數據進行全量分析,分析結果有最高的準確性和可靠性。成本是花費較長的計算時間和消耗較多的計算資源。可以使用的分析引擎或計算框架有 Apache Spark 和 Apache Flink。

全量數據容量一般會比較大,爲了節約存儲,同時爲了方便數據過濾和減少不必要的網絡傳輸,大多會使用列式存儲, 列式存儲使用較多的當屬 Parquet 和 ORC。

列式存儲最大的不足是無法進行刪 / 改操作,爲了支持刪改,一般會把列式存儲和行式存儲相結合。最近時間內變化的數據採用行式存儲如 avro 格式,然後定期合併成列式存儲。非常成功和紅火的 Apache Hudi 和 Delta IO 就是基於這種思路。

3.2 增量計算

假設當前處理的時間窗口中有 10 萬條記錄,因爲其中不到 100 條的記錄發生變化,而對所有記錄的聚合指標進行計算重演,顯然不是非常合理,那麼有沒有可能只對增量數據導致的變化聚合指標進行重算。答案是肯定的,或者說在部分場景下,是可以實現的。

讓我們把增量計算分成幾種不同情況:

1)增量數據會添加新的聚合記錄,對原有計算結果無影響 

2)增量數據會添加新的聚合記錄,並導致原有計算結果部分失效 

3)增量數據不添加新的聚合記錄,但導致原有計算結果全部失效

第 1、2 兩種情況下,增量計算會帶來實時性上的收益,第三種不會,因爲所有指標均被破壞,都需要重演,已經褪化成全量計算。

增量處理模型除了 Apache Flink 之外,非常著名的還有 Microsoft 提出的 Naiad 模型,後者更爲高效。由於後者只提供了非常底層的調用 API,在生態建設方面遠不如 Apache Flink,但其思想深刻影響了 TensorFlow 等框架的設計和實現,等有時間再詳細介紹一下 Naiad。

上面討論的全量也好,增量也罷,都是把數據從數據庫拉出來再進行計算,那麼有沒有可能在數據庫內部實現增量計算的可能?

Oracle 在 12.x 版本中提供物理視圖(materialized view)的自動刷新機制,這意味着用戶可以把實時聚合邏輯定義在物理視圖中,然後每當有數據更新,視圖會被自動更新。既然 Oracle 有,那麼在開源的世界裏一定會有對應的東西出現,最起碼會有相應的影子在浮現,這個影子就是 PostgreSQL IVM。

PostgreSQL IVM 使用到 Transition Table 這個概念,在觸發器中,用戶可以看到變化前和變化後的數據,從而計算出變更的內容,利用這些 Delta 數據,進行刷新預先定義好的物理視圖。

四、計算觸發機制

計算成本比較

五、聚合結果實時可見

聚合結果的存儲要支持 upsert 語義,聚合結果的消費者實時感知到,同時聚合結果的存儲要有水平可擴性。結合這三個要求,比較推薦使用 NoSQL 來進行指標的存儲,具體可以使用 MongoDB。

六、小結

本文嘗試對實時數據聚合分析中涉及到的問題和常見思路進行梳理,文中定有不少疏漏,不足之處希望讀者批評指正。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/CvzfbPeOj5J34ylmKZpAew