Flink 在唯品會的實踐
**摘要:**唯品會自 2017 年開始基於 k8s 深入打造高性能、穩定、可靠、易用的實時計算平臺,支持唯品會內部業務在平時以及大促的平穩運行。現平臺支持 Flink、Spark、Storm 等主流框架。本文主要分享 Flink 的容器化實踐應用以及產品化經驗。內容包括:
-
發展概覽
-
Flink 容器化實踐
-
Flink SQL 平臺化建設
-
應用案例
-
未來規劃
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
一 、發展概覽
平臺支持公司內部所有部門的實時計算應用。主要的業務包括實時大屏、推薦、實驗平臺、實時監控和實時數據清洗等。
1.1 集羣規模
平臺現有異地雙機房雙集羣,具有 2000 多的物理機節點,利用 k8s 的 namespaces,labels 和 taints 等,實現業務隔離以及初步的計算負載隔離。目前線上實時應用有大概 1000 個,平臺最近主要支持 Flink SQL 任務的上線。
1.2 平臺架構
上圖是唯品會實時計算平臺的整體架構。
-
最底層是計算任務節點的資源調度層,實際是以 deployment 的模式運行在 k8s 上,平臺雖然支持 yarn 調度,但是 yarn 調度是與批任務共享資源,所以主流任務還是運行在 k8s 上。
-
存儲層這一層,支持公司內部基於 kafka 實時數據 vms,基於 binlog 的 vdp 數據和原生 kafka 作爲消息總線,狀態存儲在 hdfs 上,數據主要存入 redis,mysql,hbase,kudu,clickhouse 等。
-
計算引擎層,平臺支持 Flink,Spark,Storm 主流框架容器化,提供了一些框架的封裝和組件等。每個框架會都會支持幾個版本的鏡像滿足不同的業務需求。
-
平臺層提供作業配置、調度、版本管理、容器監控、job 監控、告警、日誌等功能,提供多租戶的資源管理(quota,label 管理),提供 kafka 監控。在 Flink 1.11 版本之前,平臺自建元數據管理系統爲 Flink SQL 管理 schema,1.11 版本開始,通過 hive metastore 與公司元數據管理系統融合。
-
最上層就是各個業務的應用層。
二、Flink 容器化實踐
**2.1 容器化實踐
**
上圖是實時平臺 Flink 容器化的架構。Flink 容器化是基於 standalone 模式部署的。
-
部署模式共有 client,jobmanager 和 taskmanager 三個角色,每一個角色都由一個 deployment 控制。
-
用戶通過平臺上傳任務 jar 包,配置等,存儲於 hdfs 上。同時由平臺維護的配置,依賴等也存儲在 hdfs 上,當 pod 啓動時,會進行拉取等初始化操作。
-
client 中主進程是一個由 go 開發的 agent,當 client 啓動時,會首先檢查集羣狀態,當集羣 ready 後,從 hdfs 上拉取 jar 包向 Flink 集羣提交任務。同時,client 的主要功能還有監控任務狀態,做 savepoint 等操作。
-
通過部署在每臺物理機上的 smart - agent 採集容器的指標寫入 m3,以及通過 Flink 暴漏的接口將 metrics 寫入 prometheus,結合 grafana 展示。同樣通過部署在每臺物理機上的 vfilebeat 採集掛載出來的相關日誌寫入 es,在 dragonfly 可以實現日誌檢索。
■ Flink 平臺化
在實踐過程中,結合具體場景以及易用性考慮,做了平臺化工作。
-
平臺的任務配置與鏡像,Flink 配置,自定義組件等解耦合,現階段平臺支持 1.7、1.9、1.11、1.12 等版本。
-
平臺支持流水線編譯或上傳 jar、作業配置、告警配置、生命週期管理等,從而減少用戶的開發成本。
-
平臺開發了容器級別的如火焰圖等調優診斷的頁面化功能,以及登陸容器的功能,支持用戶進行作業診斷。
■ Flink 穩定性
在應用部署和運行過程中,不可避免的會出現異常。以下是平臺保證任務在出現異常狀況後的穩定性做的策略。
-
pod 的健康和可用,由 livenessProbe 和 readinessProbe 檢測,同時指定 pod 的重啓策略。
-
Flink 任務異常時:
-
Flink 原生的 restart 策略和 failover 機制,作爲第一層的保證。
-
在 client 中會定時監控 Flink 狀態,同時將最新的 checkpoint 地址更新到自己的緩存中,並彙報到平臺,固化到 MySQL 中。當 Flink 無法再重啓時,由 client 重新從最新的成功 checkpoint 提交任務。作爲第二層保證。這一層將 checkpoint 固化到 MySQL 中後,就不再使用 Flink HA 機制了,少了 zk 的組件依賴。
-
當前兩層無法重啓時或集羣出現異常時,由平臺自動從固化到 MySQL 中的最新 chekcpoint 重新拉起一個集羣,提交任務,作爲第三層保證。
-
機房容災:
-
用戶的 jar 包,checkpoint 都做了異地雙 HDFS 存儲
-
異地雙機房雙集羣
2.2 kafka 監控方案
kafka 監控是我們的任務監控裏相對重要的一部分,整體監控流程如下所示。
平臺提供監控 kafka 堆積,消費 message 等配置信息,從 MySQL 中將用戶 kafka 監控配置提取後,通過 jmx 監控 kafka,寫入下游 kafka,再通過另一個 Flink 任務實時監控,同時將這些數據寫入 ck,從而展示給用戶。
三、Flink SQL 平臺化建設
基於 k8s 的 Flink 容器化實現以後,方便了 Flink api 應用的發佈,但是對於 Flink SQL 的任務仍然不夠便捷。於是平臺提供了更加方便的在線編輯發佈、SQL 管理等一棧式開發平臺。
3.1 Flink SQL 方案
平臺的 Flink SQL 方案如上圖所示,任務發佈系統與元數據管理系統完全解耦。
■ Flink SQL 任務發佈平臺化
在實踐過程中,結合易用性考慮,做了平臺化工作,主操作界面如下圖所示:
-
Flink SQL 的版本管理,語法校驗,拓撲圖管理等;
-
UDF 通用和任務級別的管理,支持用戶自定義 UDF;
-
提供參數化的配置界面,方便用戶上線任務。
■ 元數據管理
平臺在 1.11 之前通過構建自己的元數據管理系統 UDM,MySQL 存儲 kafka,redis 等 schema,通過自定義 catalog 打通 Flink 與 UDM,從而實現元數據管理。1.11 之後,Flink 集成 hive 逐漸完善,平臺重構了 FlinkSQL 框架,通過部署一個 SQL - gateway service 服務,中間調用自己維護的 SQL - client jar 包,從而與離線元數據打通,實現了實時離線元數據統一,爲之後的流批一體做好工作。在元數據管理系統創建的 Flink 表操作界面如下所示,創建 Flink 表的元數據,持久化到 hive 裏,Flink SQL 啓動時從 hive 裏讀取對應表的 table schema 信息。
3.2 Flink SQL 相關實踐
平臺對於官方原生支持或者不支持的 connector 進行整合和開發,鏡像和 connector,format 等相關依賴進行解耦,可以快捷的進行更新與迭代。
■ FLINK SQL 相關實踐
-
connector 層,現階段平臺支持官方支持的 connector,並且構建了 redis,kudu,clickhouse,vms,vdp 等平臺內部的 connector。平臺構建了內部的 pb format,支持 protobuf 實時清洗數據的讀取。平臺構建了 kudu,vdp 等內部 catalog,支持直接讀取相關的 schema,不用再創建 ddl。
-
平臺層主要是在 UDF、常用運行參數調整、以及升級 hadoop3。
-
runntime 層主要是支持拓撲圖執行計劃修改、維表關聯 keyBy cache 優化等
■ 拓撲圖執行計劃修改
針對現階段 SQL 生成的 stream graph 並行度無法修改等問題,平臺提供可修改的拓撲預覽修改相關參數。平臺會將解析後的 FlinkSQL 的 excution plan json 提供給用戶,利用 uid 保證算子的唯一性,修改每個算子的並行度,chain 策略等,也爲用戶解決反壓問題提供方法。例如針對 clickhouse sink 小併發大批次的場景,我們支持修改 clickhouse sink 並行度,source 並行度 = 72,sink 並行度 = 24,提高 clickhouse sink tps。
■ 維表關聯 keyBy 優化 cache
針對維表關聯的情況,爲了降低 IO 請求次數,降低維表數據庫讀壓力,從而降低延遲,提高吞吐,有以下幾種措施:
-
當維表數據量不大時,通過全量維表數據緩存在本地,同時 ttl 控制緩存刷新的時候,這可以極大的降低 IO 請求次數,但會要求更多但內存空間。
-
當維表數據量很大時,通過 async 和 LRU cache 策略,同時 ttl 和 size 來控制緩存數據的失效時間和緩存大小,可以提高吞吐率並降低數據庫的讀壓力。
-
當維表數據量很大同時主流 qps 很高時,可以開啓把維表 join 的 key 作爲 hash 的條件,將數據進行分區,即在 calc 節點的分區策略是 hash,這樣下游算子的 subtask 的維表數據是獨立的,不僅可以提高命中率,也可降低內存使用空間。
優化之前維表關聯 LookupJoin 算子和正常算子 chain 在一起。
優化之間維表關聯 LookupJoin 算子和正常算子不 chain 在一起,將 join key 作爲 hash 策略的 key。採用這種方式優化之後,例如原先 3000W 數據量的維表,10 個 TM 節點,每個節點都要緩存 3000W 的數據,總共需要緩存 3000W * 10 = 3 億的量。而經過 keyBy 優化之後,每個 TM 節點只需要緩存 3000W / 10 = 300W 的數據量,總共緩存的數據量只有 3000W,大大減少緩存數據量。
■ 維表關聯延遲 join
維表關聯中,有很多業務場景,在維表數據新增數據之前,主流數據已經發生 join 操作,會出現關聯不上的情況。因此,爲了保證數據的正確,將關聯不上的數據進行緩存,進行延遲 join。
-
最簡單的做法是,在維表關聯的 function 裏設置重試次數和重試間隔,這個方法會增大整個流的延遲,但主流 qps 不高的情況下,可以解決問題。
-
增加延遲 join 的算子,當 join 維表未關聯時,先緩存起來,根據設置重試次數和重試間隔從而進行延遲的 join。
四、應用案例
**4.1 實時數倉
**
■ 實時數據入倉
-
流量數據一級 kafka 通過實時清洗之後,寫到二級清洗 kafka,主要是 protobuf 格式,再通過 Flink SQL 寫入 hive 5min 表,以便做後續的準實時 ETL,加速 ods 層數據源的準備時間。
-
MySQL 業務庫的數據,通過 VDP 解析形成 binlog cdc 消息流,再通過 Flink SQL 寫入 hive 5min 表。
-
業務系統通過 VMS API 產生業務 kafka 消息流,通過 Flink SQL 解析之後寫入 hive 5min 表。支持 string、json、csv 等消息格式。
-
使用 Flink SQL 做流式數據入倉,非常的方便,而且 1.12 版本已經支持了小文件的自動合併,解決了小文件的痛點。
-
我們自定義分區提交策略,當前分區 ready 時候會調一下實時平臺的分區提交 api,在離線調度定時調度通過這個 api 檢查分區是否 ready。
採用 Flink SQL 統一入倉方案以後,我們可以獲得的收益:可解決以前 Flume 方案不穩定的問題,而且用戶可自助入倉,大大降低入倉任務的維護成本。提升了離線數倉的時效性,從小時級降低至 5min 粒度入倉。
■ 實時指標計算
-
實時應用消費清洗後 kafka,通過 redis 維表、api 等方式關聯,再通過 Flink window 增量計算 UV,持久化寫到 Hbase 裏。
-
實時應用消費 VDP 消息流之後,通過 redis 維表、api 等方式關聯,再通過 Flink SQL 計算出銷售額等相關指標,增量 upsert 到 kudu 裏,方便根據 range 分區批量查詢,最終通過數據服務對實時大屏提供最終服務。
以往指標計算通常採用 Storm 方式,需要通過 api 定製化開發,採用這樣 Flink 方案以後,我們可以獲得的收益:將計算邏輯切到 Flink SQL 上,降低計算任務口徑變化快,修改上線週期慢等問題。切換至 Flink SQL 可以做到快速修改,快速上線,降低維護成本。
■ 實時離線一體化 ETL 數據集成
-
Flink SQL 在最近的版本中持續強化了維表 join 的能力,不僅可以實時關聯數據庫中的維表數據,現在還能關聯 Hive 和 Kafka 中的維表數據,能靈活滿足不同工作負載和時效性的需求。
-
基於 Flink 強大的流式 ETL 的能力,我們可以統一在實時層做數據接入和數據轉換,然後將明細層的數據迴流到離線數倉中。
-
我們通過將 presto 內部使用的 HyperLogLog (後面簡稱 HLL) 實現引入到 Spark UDAF 函數里,打通 HLL 對象在 Spark SQL 與 presto 引擎之間的互通,如 Spark SQL 通過 prepare 函數生成的 HLL 對象,不僅可以在 Spark SQL 裏 merge 查詢而且可以在 presto 裏進行 merge 查詢。具體流程如下:
UV 近似計算示例:
Step 1: Spark SQL 生成 HLL 對象
insert overwrite dws_goods_uv partition (dt='${dt}',hm='${hm}') AS select goods_id, estimate_prepare(mid) as pre_hll from dwd_table_goods group by goods_id where dt = ${dt} and hm = ${hm}
Step 2: Spark SQL 通過 goods_id 維度的 HLL 對象 merge 成品牌維度
insert overwrite dws_brand_uv partition (dt='${dt}',hm='${hm}') AS select b.brand_id, estimate_merge(pre_hll) as merge_hll from dws_table_brand A left join dim_table_brand_goods B on A.goods_id = B.goods_id where dt = ${dt} and hm = ${hm}
Step 3: Spark SQL 查詢品牌維度的 UV
select brand_id, estimate_compute(merge_hll) as uv from dws_brand_uv where dt = ${dt}
Step 4: presto merge 查詢 park 生成的 HLL 對象
select brand_id,cardinality(merge(cast(merge_hll AS HyperLogLog))) uv from dws_brand_uv group by brand_id
所以基於實時離線一體化 ETL 數據集成的架構,我們能獲得的收益:
-
統一了基礎公共數據源;
-
提升了離線數倉的時效性;
-
減少了組件和鏈路的維護成本。
4.2 實驗平臺(Flink 實時數據入 OLAP)
唯品會實驗平臺是通過配置多維度分析和下鑽分析,提供海量數據的 A / B - test 實驗效果分析的一體化平臺。一個實驗是由一股流量(比如用戶請求)和在這股流量上進行的相對對比實驗的修改組成。實驗平臺對於海量數據查詢有着低延遲、低響應、超大規模數據 (百億級) 的需求。整體數據架構如下:
通過 Flink SQL 將 kafka 裏的數據清洗解析展開等操作之後,通過 redis 維表關聯商品屬性,通過分佈式表寫入到 clickhouse,然後通過數據服務 adhoc 查詢。業務數據流如下:
我們通過 Flink SQL redis connector,支持 redis 的 sink 、source 維表關聯等操作,可以很方便的讀寫 redis,實現維表關聯,維表關聯內可配置 cache ,極大提高應用的 TPS。通過 Flink SQL 實現實時數據流的 pipeline,最終將大寬表 sink 到 CK 裏,並按照某個字段粒度做 murmurHash3_64 存儲,保證相同用戶的數據都存在同一 shard 節點組內,從而使得 ck 大表之間的 join 變成 local 本地表之間的 join, 減少數據 shuffle 操作,提升 join 查詢效率。
五、未來規劃
**5.1 提高 Flink SQL 易用性
**
當前我們的 Flink SQL 調試起來很有很多不方便的地方,對於做離線 hive 用戶來說還有一定的使用門檻,例如手動配置 kafka 監控、任務的壓測調優,如何能讓用戶的使用門檻降低至最低,是一個比較大的挑戰。將來我們考慮做一些智能監控告訴用戶當前任務存在的問題,儘可能自動化並給用戶一些優化建議。
5.2 數據湖 CDC 分析方案落地
目前我們的 VDP binlog 消息流,通過 Flink SQL 寫入到 hive ods 層,以加速 ods 層數據源的準備時間,但是會產生大量重複消息去重合並。我們會考慮 Flink + 數據湖的 cdc 入倉方案來做增量入倉。此外,像訂單打寬之後的 kafka 消息流、以及聚合結果都需要非常強的實時 upsert 能力,目前我們主要是用 kudu,但是 kudu 集羣,比較獨立小衆,維護成本高,我們會調研數據湖的增量 upsert 能力來替換 kudu 增量 upsert 場景。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/G_GZQEksBDec53KZyPC_dw