Flink 在唯品會的實踐

**摘要:**唯品會自 2017 年開始基於 k8s 深入打造高性能、穩定、可靠、易用的實時計算平臺,支持唯品會內部業務在平時以及大促的平穩運行。現平臺支持 Flink、Spark、Storm 等主流框架。本文主要分享 Flink 的容器化實踐應用以及產品化經驗。內容包括:

  1. 發展概覽

  2. Flink 容器化實踐

  3. Flink SQL 平臺化建設

  4. 應用案例

  5. 未來規劃

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

一 、發展概覽

平臺支持公司內部所有部門的實時計算應用。主要的業務包括實時大屏、推薦、實驗平臺、實時監控和實時數據清洗等。


1.1 集羣規模

平臺現有異地雙機房雙集羣,具有 2000 多的物理機節點,利用 k8s 的 namespaces,labels 和 taints 等,實現業務隔離以及初步的計算負載隔離。目前線上實時應用有大概 1000 個,平臺最近主要支持 Flink SQL 任務的上線。

1.2 平臺架構

上圖是唯品會實時計算平臺的整體架構。

二、Flink 容器化實踐

**2.1 容器化實踐
**

上圖是實時平臺 Flink 容器化的架構。Flink 容器化是基於 standalone 模式部署的。

在實踐過程中,結合具體場景以及易用性考慮,做了平臺化工作。

在應用部署和運行過程中,不可避免的會出現異常。以下是平臺保證任務在出現異常狀況後的穩定性做的策略。

  1. Flink 原生的 restart 策略和 failover 機制,作爲第一層的保證。

  2. 在 client 中會定時監控 Flink 狀態,同時將最新的 checkpoint 地址更新到自己的緩存中,並彙報到平臺,固化到 MySQL 中。當 Flink 無法再重啓時,由 client 重新從最新的成功 checkpoint 提交任務。作爲第二層保證。這一層將 checkpoint 固化到 MySQL 中後,就不再使用 Flink HA 機制了,少了 zk 的組件依賴。

  3. 當前兩層無法重啓時或集羣出現異常時,由平臺自動從固化到 MySQL 中的最新 chekcpoint 重新拉起一個集羣,提交任務,作爲第三層保證。

2.2 kafka 監控方案

kafka 監控是我們的任務監控裏相對重要的一部分,整體監控流程如下所示。

平臺提供監控 kafka 堆積,消費 message 等配置信息,從 MySQL 中將用戶 kafka 監控配置提取後,通過 jmx 監控 kafka,寫入下游 kafka,再通過另一個 Flink 任務實時監控,同時將這些數據寫入 ck,從而展示給用戶。

三、Flink SQL 平臺化建設

基於 k8s 的 Flink 容器化實現以後,方便了 Flink api 應用的發佈,但是對於 Flink SQL 的任務仍然不夠便捷。於是平臺提供了更加方便的在線編輯發佈、SQL 管理等一棧式開發平臺。


平臺的 Flink SQL 方案如上圖所示,任務發佈系統與元數據管理系統完全解耦。

在實踐過程中,結合易用性考慮,做了平臺化工作,主操作界面如下圖所示:

■ 元數據管理

平臺在 1.11 之前通過構建自己的元數據管理系統 UDM,MySQL 存儲 kafka,redis 等 schema,通過自定義 catalog 打通 Flink 與 UDM,從而實現元數據管理。1.11 之後,Flink 集成 hive 逐漸完善,平臺重構了 FlinkSQL 框架,通過部署一個 SQL - gateway service 服務,中間調用自己維護的 SQL - client jar 包,從而與離線元數據打通,實現了實時離線元數據統一,爲之後的流批一體做好工作。在元數據管理系統創建的 Flink 表操作界面如下所示,創建 Flink 表的元數據,持久化到 hive 裏,Flink SQL 啓動時從 hive 裏讀取對應表的 table schema 信息。

平臺對於官方原生支持或者不支持的 connector 進行整合和開發,鏡像和  connector,format 等相關依賴進行解耦,可以快捷的進行更新與迭代。

■ 拓撲圖執行計劃修改

針對現階段 SQL 生成的 stream graph 並行度無法修改等問題,平臺提供可修改的拓撲預覽修改相關參數。平臺會將解析後的 FlinkSQL 的 excution plan json 提供給用戶,利用 uid 保證算子的唯一性,修改每個算子的並行度,chain 策略等,也爲用戶解決反壓問題提供方法。例如針對 clickhouse sink 小併發大批次的場景,我們支持修改 clickhouse sink 並行度,source 並行度 = 72,sink 並行度 = 24,提高  clickhouse sink tps。

■ 維表關聯 keyBy 優化 cache

針對維表關聯的情況,爲了降低 IO 請求次數,降低維表數據庫讀壓力,從而降低延遲,提高吞吐,有以下幾種措施:

優化之前維表關聯 LookupJoin 算子和正常算子 chain 在一起。

優化之間維表關聯 LookupJoin 算子和正常算子不 chain 在一起,將 join key 作爲  hash 策略的 key。採用這種方式優化之後,例如原先 3000W 數據量的維表,10 個 TM 節點,每個節點都要緩存 3000W 的數據,總共需要緩存 3000W * 10 = 3 億的量。而經過 keyBy 優化之後,每個 TM 節點只需要緩存 3000W / 10 = 300W 的數據量,總共緩存的數據量只有 3000W,大大減少緩存數據量。

■ 維表關聯延遲 join

維表關聯中,有很多業務場景,在維表數據新增數據之前,主流數據已經發生 join 操作,會出現關聯不上的情況。因此,爲了保證數據的正確,將關聯不上的數據進行緩存,進行延遲 join。

四、應用案例

**4.1 實時數倉
**

實時數據入倉

採用 Flink SQL 統一入倉方案以後,我們可以獲得的收益:可解決以前 Flume 方案不穩定的問題,而且用戶可自助入倉,大大降低入倉任務的維護成本。提升了離線數倉的時效性,從小時級降低至 5min 粒度入倉。

■ 實時指標計算

以往指標計算通常採用 Storm 方式,需要通過 api 定製化開發,採用這樣 Flink 方案以後,我們可以獲得的收益:將計算邏輯切到 Flink SQL 上,降低計算任務口徑變化快,修改上線週期慢等問題。切換至 Flink SQL 可以做到快速修改,快速上線,降低維護成本。

■ 實時離線一體化 ETL 數據集成

UV 近似計算示例:

Step 1: Spark SQL 生成 HLL 對象

insert overwrite dws_goods_uv partition (dt='${dt}',hm='${hm}') AS select goods_id, estimate_prepare(mid) as pre_hll from dwd_table_goods group by goods_id where dt = ${dt} and hm = ${hm}

Step 2: Spark SQL 通過 goods_id 維度的 HLL 對象 merge 成品牌維度

insert overwrite dws_brand_uv partition (dt='${dt}',hm='${hm}') AS select b.brand_id, estimate_merge(pre_hll) as merge_hll from dws_table_brand A left join dim_table_brand_goods B on A.goods_id = B.goods_id where dt = ${dt} and hm = ${hm}  

Step 3: Spark SQL 查詢品牌維度的 UV

select brand_id, estimate_compute(merge_hll) as uv from dws_brand_uv where dt = ${dt}

Step 4: presto merge 查詢 park 生成的 HLL 對象

select brand_id,cardinality(merge(cast(merge_hll AS HyperLogLog))) uv from dws_brand_uv group by brand_id

所以基於實時離線一體化 ETL 數據集成的架構,我們能獲得的收益:

唯品會實驗平臺是通過配置多維度分析和下鑽分析,提供海量數據的 A / B - test 實驗效果分析的一體化平臺。一個實驗是由一股流量(比如用戶請求)和在這股流量上進行的相對對比實驗的修改組成。實驗平臺對於海量數據查詢有着低延遲、低響應、超大規模數據 (百億級) 的需求。整體數據架構如下:

通過 Flink SQL 將 kafka 裏的數據清洗解析展開等操作之後,通過 redis 維表關聯商品屬性,通過分佈式表寫入到 clickhouse,然後通過數據服務 adhoc 查詢。業務數據流如下:

我們通過 Flink SQL redis connector,支持 redis 的 sink 、source 維表關聯等操作,可以很方便的讀寫 redis,實現維表關聯,維表關聯內可配置 cache ,極大提高應用的 TPS。通過 Flink SQL 實現實時數據流的 pipeline,最終將大寬表 sink 到 CK 裏,並按照某個字段粒度做 murmurHash3_64  存儲,保證相同用戶的數據都存在同一 shard 節點組內,從而使得 ck 大表之間的 join 變成 local 本地表之間的 join, 減少數據 shuffle 操作,提升 join 查詢效率。

五、未來規劃

**5.1 提高 Flink SQL 易用性
**

當前我們的 Flink SQL 調試起來很有很多不方便的地方,對於做離線 hive 用戶來說還有一定的使用門檻,例如手動配置 kafka 監控、任務的壓測調優,如何能讓用戶的使用門檻降低至最低,是一個比較大的挑戰。將來我們考慮做一些智能監控告訴用戶當前任務存在的問題,儘可能自動化並給用戶一些優化建議。

5.2 數據湖 CDC 分析方案落地

目前我們的 VDP binlog 消息流,通過 Flink SQL 寫入到 hive ods 層,以加速 ods 層數據源的準備時間,但是會產生大量重複消息去重合並。我們會考慮 Flink + 數據湖的 cdc 入倉方案來做增量入倉。此外,像訂單打寬之後的 kafka 消息流、以及聚合結果都需要非常強的實時 upsert 能力,目前我們主要是用 kudu,但是 kudu 集羣,比較獨立小衆,維護成本高,我們會調研數據湖的增量 upsert 能力來替換 kudu 增量 upsert 場景。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/G_GZQEksBDec53KZyPC_dw