B 站流式傳輸架構的前世今生
本期作者
王翔宇
嗶哩嗶哩資深開發工程師
2017 年加入 B 站,現服務於基礎架構實時團隊。先後負責 B 站日誌系統、實時流式傳輸工作。
魏澤豐
嗶哩嗶哩高級開發工程師
2021 年加入 B 站,現服務於基礎架構實時團隊,負責實時流式傳輸以及 Flink CDC 相關工作。
高瑞超
嗶哩嗶哩資深開發工程師
2021 年加入 B 站,現服務於基礎架構實時團隊,負責實時流式傳輸以及 Flink connector 相關工作。
01 背景
Lancer 是 B 站的實時流式傳輸平臺,承載全站服務端、客戶端的數據上報 / 採集、傳輸、集成工作,秒級延遲,作爲數倉入口是 B 站數據平臺的生命線。目前每日峯值 5000w/s rps, 3PB / 天, 4K + 條流的數據同步能力。
服務如此大的數據規模,對產品的可靠性、可擴展性和可維護性提出了很高的要求。流式傳輸的實現是一個很有挑戰的事情,聚焦快、準、穩的需求, Lancer 整體演進經歷了大管道模型、BU 粒度管道模型、單流單作業模型三個階段的演進,下面我們娓娓道來。
02 關鍵詞說明
logid:每個業務方上報的數據流以 logid 進行標識,logid 是數據在傳輸 + 集成過程中的元信息標識。
數據源:數據進入到 lancer 的入口,例如:log-agent,bfe-agent,flink cdc
lancer-gateway(數據網關):接收數據上報的網關。
數據緩衝層:也叫做內部 kafka,用於解耦數據上報和數據分發。
lancer-collector(數據分發層):也叫做數據同步,可以根據實際場景完成不同端到端的數據同步。
03 技術演進
整個 B 站流式數據傳輸架構的演進大致經歷了三個階段。
3.1 架構 V1.0 - 基於 flume 的
大管道數據傳輸架構 (2019 之前)
B 站流式傳輸架構建立之初,數據流量和數據流條數相對較少,因此採用了全站的數據流混合在一個管道中進行處理,基於 flume 二次定製化的數據傳輸架構,架構如下:
-
整個架構從數據生成到落地分爲:數據源、數據網關、數據緩衝、數據分發層。
-
數據上報端基本採用 sdk 的方式直接發送 http 和 grpc 請求上報。
-
數據網關 lancer-gateway 是基於 flume 二次迭代的數據網關,用於承載數據的上報,支持兩種協議:http 用於承載公網數據上報(web/app),grpc 用於承載 IDC 內服務端數據上報。
-
數據緩衝層使用 kafka 實現,用於解耦數據上報和數據分發。
-
數據分發層 lancer-collector 同樣是基於 flume 二次迭代的數據分發層,用於將數據從緩衝層同步到 ODS。
v1.0 架構在使用中暴露出一些的痛點:
- 數據源端對於數據上報的可控性和容錯性較差,例如:
-
數據網關故障情況下,數據源端缺少緩存能力,不能直接反壓,存在數據丟失隱患。
-
重 SDK:SDK 中需要添加各種適配邏輯以應對上報異常情況
-
整體架構是一個大管道模型,資源的劃分和隔離不明確,整體維護成本高,自身故障隔離性差。
-
基於 flume 二次迭代的一些缺陷:
-
邏輯複雜,性能差,我們需要的功能相對單一
-
hdfs 分發場景,不支持 exactly once 語義,每次重啓,會導致數據大量重複
3.2 架構 V2.0-BU 粒度的
管道化架構 (2020-2021)
針對 v1.0 的缺陷,我們引入了架構 v2.0,架構如下:
此架構的關鍵細節如下:
- 強化了數據上報源端的邊緣可控能力
-
服務器上部署 log-agent 承載服務端數據上報。
-
cdn 上部署 bfe-agent 用於承載公網(web 端、app 端)數據上報。
-
log-agent/bfe-agent 中集成數據緩衝、預聚合、流控、重試、降級等能力,數據上報 sdk 只需專注數據的生成和上報邏輯。
-
agent 端基於 logid 的 BU 屬性,將數據路由到不同的管道。
-
數據管道以 BU 爲粒度搭建,管道間資源隔離,每個管道包含整套獨立的完整數據傳輸鏈路,並且數據管道支持基於 airflow 快速搭建。故障隔離做到 BU 級別。
-
數據網關升級到自研 lancer-gateway2.0,邏輯精簡,支持流控反壓,並且適配 kafka failover, 基於 k8s 進行部署。
-
hdfs 分發基於 flink jar 進行實現:支持 exactly once 語義保證。
V2.0 架構相對於 v1.0, 重點提升了數據上報邊緣的可控力、BU 粒度管道間的資源劃分和隔離性。但是隨着 B 站流式數據傳輸規模的快速增加,對數據傳輸的時效性、成本、質量也提出了越來越高的要求,V2.0 也逐漸暴露出了一些缺陷:
- logid 級別隔離性差:
-
單個管道內部某個 logid 流量陡增,幾倍甚至幾十倍,依然會造成整個管道的數據分發延遲,
-
單個管道內分發層組件故障重啓,例如:hdfs 分發對應的 flink jar 作業掛掉重啓,從 checkpoint 恢復,此管道內所有的 logid 的 hdfs 分發都會存在歸檔延遲隱患。
-
網關是異步發送模型,極端情況下(組件崩潰),存在數據丟失風險。
-
ods 層局部熱點 / 故障影響放大
-
由於分發層一個作業同時分發多個 logid,這種大作業模型更易受到 ods 層局部熱點的影響,例如:hdfs 某個 datanode 熱點,會導致某個分發作業整體寫阻塞,進而影響到此分發作業的其他 logid, kafka 分發同理。
-
hdfs 單個文件塊的所有副本失效,會導致對應分發任務整體掛掉重啓。
- hdfs 小文件問題放大
- hdfs 分發對應的 flink jar 作業爲了保證吞吐,整體設置的併發度相對較大。因此對於管道內的所有 logid,同一時刻都會打開併發度大小的文件數,對於流量低的 logid,就會造成小文件數量變大的問題。
針對上述痛點,最直接的解決思路就是整體架構做進一步的隔離,以單 logid 爲維度實現數據傳輸 + 分發。面臨的挑戰主要有以下幾個方面:
-
如何保證全鏈路以 logid 爲單位進行隔離,如何在資源使用可控的情況下合理控流並且保證數據流之間的隔離性
-
需要與外部系統進行大量的交互,如何適配外部系統的各種問題:局部熱點、故障
-
集成作業的數量指數級增加,如何保障高性能、穩定性的同時並且高效的進行管理、運維、質量監控。
3.3 架構 V3.0 - 基於 Flink SQL 的
單流單作業數據集成方案
在 V3.0 架構中,我們對整體傳輸鏈路進行了單作業單數據流隔離改造,並且基於 Flink SQL 支撐數據分發場景。架構如下:
相比 v2.0, 資源池容量管理上依然以 BU 爲粒度,但是每個 logid 的傳輸和分發相互獨立,互不影響。具體邏輯如下 :
-
agent:整體上報 SDK 和 agent 接收 + 發送邏輯按照 logid 進行隔離改造,logid 間採集發送相互隔離。
-
lancer-gateway3.0:logid 的請求處理之間相互隔離,當 kafka 發送受阻,直接反壓給 agent 端,下面詳細介紹。
-
數據緩衝層:每個 logid 對應一個獨立的內部 kafka topic,實現數據的緩衝。
-
數據分發層:分發層對每個 logid 的啓動獨立的 flink sql 作業進行數據的分發,單個 logid 處理受阻,只會導致當個 logid 的數據堆積。
相較於之前的實現,v3.0 架構具有以下的優勢:
- 可靠性:
- 功能質量上整理鏈路可以保證數據不丟失,網關層以同步方式發送數據,可以保證數據被持久化到內部 kafka;flink 支持狀態恢復和 exactly once 的語義,同樣保證數據不丟。
- 可維護性上:
-
隔離性上 logid 之間相互隔離,一個 logid 出現問題,其他 logid 不受影響。
-
資源分配以 logid 爲最小單位,可以精確控制單個 logid 的資源使用。
- 可擴展性:
- 可以以單個 logid 爲單位靈活管控:靈活的擴縮資源
04 V3.0 架構具體實現
我們重點介紹下,當前 V3.0 結構各個分層的實現。
4.1 數據上報邊緣層
4.1.1 log-agent
基於 go 自研,插件化架構,部署於物理機,可靠、高效的支持服務端數據上報。
時間架構分爲收集、處理、發送三層,具有以下主要特性:
-
支持文件採集和 unix sock 兩種數據上報方式
-
與網關 GRPC 通信:ACK + 退避重試 + 流控
-
整體上報 SDK 和 agent 接收 + 發送邏輯按照 logid 進行隔離改造,單 logid 處理相互隔離:每個 logid 啓動獨立的 pipeline 進行採集、解析、發送。
-
網關基於服務發現,自適應網關的調整
-
發送受阻情況下,基於磁盤進行本地堆積
-
logid 粒度的埋點監控,實時監控數據的處理狀態
-
CGroup 資源限制:CPU + 內存
-
數據聚合發送,提升傳輸效率
-
支持物理機和容器日誌此採集,配置隨應用發佈,自適應配置的增、刪、改。
4.1.2 bfe-agent
基於 go 自研,部署於 cdn,用於承載公網數據上報。
邊緣 cdn 節點,cdn 服務器上部署 nginx 和 bfe-agent,bfe-agent 整體實現架構與 log-agent 類似,對於 web 和 app 端數據上報請求 QPS 高、突發性強的特點,主要強化了以下能力:
-
應對流量陡增:基於邊緣節點的本地緩衝起到削峯作用
-
策略(降級、流控)前置,增強可控力
-
logid 級別分流隔離, 支持等級劃分
-
聚合壓縮回傳以提升數據傳輸效率、降低成本,回源 QPS 降低 90% 以上。
4.2 數據上報網關層
v3.0 方案中,數據數據網關的架構如下:
數據網關功能特性如下:
-
kafka 的通用代理層:支持 grpc /http 協議
-
基於 kafka send callback 實現了同步發送模型,保證數據不丟:數據寫入 kafka 後,再對請求返回 ack
-
請求不拆分:基於 agent 的聚合機制,只支持單次請求單條記錄,因此一條記錄對應一條緩存層 kakfa 的消息
-
lancer-gateway3.0 根據請求的 topic 信息,發送請求到對應的 kafka 集羣
-
lancer-gateway3.0 適配 kafka 集羣的局部熱點:支持 partition 動態剔除
-
logid 與 topic 一一對應,處理流程中相互隔離:一個 topic 發送受阻,不影響其他的 topic
整個數據網關中的實現難點是:單 gateway 承載多 logid 處理的過程中如何保證隔離性和公平性,我們參考了 Golang 中 GMP 的機制,整體數據流程如下:
-
收到的請求,會把請求放到 logid 對應的請求隊列,如果隊列滿,直接拒絕請求
-
每個 kafka 集羣,會初始化一個 N 大小的 kafka producer pool,其中每個 producer 會遍歷所有的隊列,進行數據的發送。
-
對於每個 logid 的請求隊列,會從兩個維護限制資源的佔用,以保證公平性和隔離性
-
限制當個 logid 隊列綁定的 producer 數量
-
基於時間片限定當個 producer 服務於單個隊列的時間長度
4.3 數據上報分發層
隨着 flink 在實時計算領域的成熟,其高性能、低延遲、exactly once 語義保證、批流一體、豐富的數據源支持、活躍的社區等優勢,促使我們選擇了以 flink sql 作爲數據分發層的解決方案。當前我們主要支持了 kafka→hive, kafka→kafka, cdc→kafka->hudi/hive 三種場景:
- kafka→hive
-
以流式方式,實時導入數據到 hive。
-
file rolling on check,保證 exactly once。
-
按照 event time 寫入分區和歸檔,歸檔延遲小於 15min
-
支持 text+lzo(行存) 和 orc+zstd(列存)兩種存儲格式。
-
支持下游作業增量同步。
- kafka→kafka
-
以流式方式,支持數據的實時同步
-
支持 kafka header metadata 信息的透傳
- cdc→kafka->hudi/hive
-
以實時流的方式同步全量和增量數據,整個 cdc 的使用場景分爲兩個環節
-
cdc → kafka
-
基於 cdc 2.1,同步 mysql 的全量和增量 binlog 同步
-
單 sql 作業支持分庫分表、多庫多表的同步。
-
支持根據 db 和 table 自定義策略分流到不同的數據緩衝層 kafka topic
-
kafka→hudi/hive
-
消費單 topic 同步到單張 hudi/hive 表,支持 event_time 落分區。
-
保證數據最終一致性
05 Flink connector 功能迭代
在 Flink SQL 數據分發場景的支持中,針對我們遇到的實際需求,對社區原生 connector 進行了對應的優化,我們針對性的介紹下。
5.1 hive sink connector 優化
斷流空分區提交
背景:B 站離線作業的拉起依賴上游分區提交,HDFS 分區提交的判斷依賴於作業整體 watermark 的推進,但是某些 logid 在斷流的情況下,如何進行分區的提交呢
解決辦法:
如圖所示:當所有的 StreamFileWriter 連續兩次 checkpoint 內未處理任何數據的情況下,StreamingFileCommiter 會判定發生了斷流,按照當前時間提交分區。
支持下游增量數據同步
背景:傳統方式 ods 到 dwd 的數據同步只有當 ods 層分區 ready 之後纔會開始,時效性較差,如何加速數據的同步?
解決辦法:
-
不依賴 ods 層分區 ready,當 ods 目錄中文件生成後,即可開始數據的處理,以增量的方式讀取數據文件。
-
通過 HDFS 的 list 操作來獲取需要讀取的文件,對 NameNode 壓力較大,爲此我們提供了文件 list 列表索引(包括文件名和數據條數),下游只需要讀取索引,即可獲取增量文件列表。
-
實現中索引文件狀態被持久化到 state 中,snapshot 中生成. inflight 狀態臨時文件,notifyCheckpointComplete 中將文件 rename 成 commit 正式文件, 提供 exactly once 語義保證。
-
下游作業讀取文件索引,支持 ods 到 dwd 的增量數據同步。
orc+zstd
背景:相較於行式存儲,列式存儲在壓縮比上有着顯著的優勢。
解決辦法:支持 orc+zstd, 經過測試,相較於 text+lzo,空間節省在 40% 以上。
hdfs 異步 close
背景:snapshot 階段 flush 數據,close 文件經常因爲個別文件慢拖累整體吞吐。
解決辦法:
-
將 close 超時的文件扔到異步隊列中。也就是 close 的動作不會去堵塞整個主鏈路的處理,提升 hdfs 局部熱點情況下的吞吐。異步 close 文件列表保存到 pendingPartsForCurrentCheckpoint,並且持久化到 state 當中。故障恢復時,也能繼續對文件進行關閉。
-
異步 close 的引入,會引入分區提前創建的隱患,爲此引入了對於 bucket 狀態的判斷。對於某分區,只有當隸屬於此分區的所有 bucket 中的 pendingPartsForCurrentCheckpoint 爲空(所有文件都進行了關閉),纔在 commit 算子中進行分區的提交。
小文件合併
背景:rolling on checkpoint 的滾動策略,會導致文件數量的膨脹,對 namenode 產生較大的壓力。
解決辦法:
-
引入了小文件合併功能,在 checkpoint 完成後,由 Streaming writer 的 notifyCheckpointComplete 方法觸發合併操作,向下遊發送 EndCheckpoint 信號。
-
coordinator 收到每個 writer 的 EndCheckpoint 後,開始進行文件的分組,封裝成一個個 compactunit 廣播下游,全部 unit 發送完之後,再廣播 EndCompaction。
-
compact operator 找到屬於自己的任務後開始處理,當收到 EndCompaction 後,往下游發送分區提交信息。
5.2 kafka connector 優化
支持 protobuf format
背景:用戶有處理 protobuf 格式數據的需求
解決辦法:
-
使用 protoc 生成 java 類,打包 jar,上傳到實時計算平臺。
-
實現對應的 DeserializationSchema 和 SerializationSchema,動態加載 pb 類並通過反射調用方法,完成 pb bytes 與 RowData 的互轉。
kafka sink 支持自定義分流
背景:用戶希望在一個 sql 作業中根據需要,靈活定製將消息發送到指定 kafka 集羣和 topic。
解決辦法:
-
支持用戶自定義 udf,靈活選擇 sql 中的字段作爲 udf 的入參,在 udf 內部,用戶根據業務場景定製邏輯,返回 topic 或者 broker list。最終 sink 內部發送到對應的 kafka 集羣和 topic。
-
kakfa sink 內部動態加載 udf,通過反射機制實時獲取對應的 broker 和 topic,同時支持結果的緩存。
-
例子:
CREATE TABLE sink_test (
broker_name_arg varchar,
topic_name_arg varchar,
message string,
t1 string
) WITH(
'bootstrapServers' = 'BrokerUdf(broker_name_arg)', // 根據broker_name_arg作爲udf參數計算brokers
'bootstrapServers.udf.class' = 'com.bilibili.lancer.udf.BrokerUdf', // 獲取brokers Udf
'topic' = 'TopicUdf(broker_name_arg, topic_name_arg)', // 根據broker_name_arg和topic_name_arg作爲udf參數計算topic
'topic.udf.class' = 'com.bilibili.lancer.udf.TopicUdf', // 計算topoc Udf
'udf.cache.min' = '1', // 緩存時間
'exclude.udf.field' = 'false', // udf的相關字段是否輸出
'connector' = 'kafka-diversion'
);
5.3 cdc connector 優化
sql 場景下多庫多表場景支持
背景:原生的 flink cdc source 在單個 sql 任務中,只能同步相同 DDL 定義的表,如果需要同步異構 DDL,不得不啓動多個獨立的 job 進行同步。這樣會存在資源的額外開銷。
解決辦法:
-
sql 定義去 DDL:
原生 flink cdc source 會對所有監聽到的數據在反序列化時根據 sql ddl 定義做 column 轉換和解析,以 RowData 的形式傳給下游。我們在 cdc-source 中新增了一種的 format 方式:changelog bytes 序列化方式。該 format 在將數據反序列化時在不再進行 column 轉換和解析,而是將所有 column 直接轉換爲 changelog-json 二進制傳輸,外層將該二進制數據直接封裝成 RowData 再傳給下游。對下游透明,下游在消費 kafka 數據的時候可以直接通過 changelog-json 反序列化進行數據解析。並且由於該改動減少了一次 column 的轉換和解析工作,通過實際測試下來發現除自動感知 schema 變更外還能提升 1 倍的吞。在 kafka sink connector 中,根據 db 和 table 進行分流,可以支持發送到不同的 topic。
-
擴展 metadata,添加 sequence:
將增量數據同步到 kafka 中,由於 kafka 存在多分區,因此必然會導致消息亂序問題。因此需要提供一個單任務內嚴格單調遞增的 sequence,用於下游消費者進行排序,保證數據的最終一致性。最終我們提取 binlog 中的 gtid 作爲 binlog 消息的 sequence id,通過 metadata 的方式暴露處理來,寫入 kafka record 的 header 中,對於全量數據,sequence 設置爲 0。
斷流場景分區提交支持
背景:由於整個 cdc 方案存在上游和下游兩個獨立的 job,並且都是基於 event time 推進 watermark 做分區的提交,下游 watermark 的推進受阻可能受到數據正常斷流或者上游作業異常兩種原因的影響,如果正確判斷呢?
解決辦法:
- 在 cdc source connector 內定義一種新類型的 record HeartbeatRecord,此 record 時間爲當前時間。當發現某張表數據停止發送時,定期 mock 心跳數據進行發送。正常斷流情況下,下游作業可以根據心跳信息正常推進 watermark,並且可以過濾丟棄此信息。
- 最終 cdc connector sql 樣例:
CREATE TABLE mysql_binlog (
host_name STRING METADATA FROM 'host_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP(3) METADATA FROM 'op_ts' VIRTUAL,
sequence BIGINT METADATA FROM 'sequence' VIRTUAL, // sequence嚴格單調遞增
heartbeat BOOLEAN METADATA FROM 'heartbeat'VIRTUAL, // 對於心跳信息標識爲true
mtime TIMESTAMP(3) METADATA FROM 'mtime'VIRTUAL, // 提取mtime,用於下游推進watermark
id BIGINT NOT NULL,
filed_list BYTES NOT NULL, // 去DDL,在source內部數據全部按照changelog-json格式進行序列化、
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxx',
'port' = '3552',
'username' = 'datacenter_cdc',
'password' = 'xxx',
'database-name' = 'xxx',
'debezium.format' = 'bytes',
'table-name' = 'xxx',
'server-time-zone' = 'Asia/Shanghai',
'heartbeat.enable'='true',
'scan.incremental.snapshot.chunk.size' = '80960'
);
06 架構穩定性優化
爲了保障流式傳輸穩定和高效運行,我們在以下幾個方面做了一些優化,分別介紹下:
6.1 管道熱點優化
作業在正常運行的過程中,經常遇到局部熱點問題,例如 kafka/hdfs io 熱點導致局部並行度消費速度下降或者寫入受阻、yarn 隊列機器 load 不均勻導致作業部分並行度消費能力不如,雖然原因多種多樣,但是本質看,這些問題的一個共性就是由於局部熱點導致局部數據延遲。針對這個問題,我們分別從局部流量調度和全局流量調度兩個維度進行優化。
局部流量調度
局部流量調度的優化思路是在單個 producer 和 task 內部,分區之間進行流量的重分配。目前在兩個點就行了優化:
-
bsql Task manager 內部 subtask 上下游通信優化:
集成作業並沒有 keyby 的需求,基於 Flink Credit-based Flow Control 反壓機制,可以通過 Backlog Size 判斷下游任務的處理負載,那麼我們就可以將 Round-robin 發送的方式修改爲根據 Channel 的 Backlog Size 信息選擇負載更低的下游 Channel 發送的方式。注意:此種策略只有 source 和 sink 端之間是 rebalance/rescale 時,纔有效果。會造成一定的序列化開銷,但是測試下來可以接受。
-
kafka producer partition 自動剔除機制:
kafka producer 在發送數據 callback 異常(絕大多數是 timeout)超出一定的閾值,會將對應 tp 從 available partition list 中進行剔除,後續 record 將不再發送到剔除的 tp。同時,被剔除 tp 後續將進行恢復性測試,如果數據可以正常發送,將重新放入到 available partition list 中。目前此機制在 flink kafka sink connector 和標準 kafka client 都進行了實現。
全局流量調度
全局流量調度的優化思路是整個傳輸鏈路層級之間的流量調配,目前我們將生產者 (lancer-gateway) 與消費者 (flink sql kafka source) 進行聯動,當消費者出現 tp 消費 lag 的情況,通過註冊黑名單(lag partition)到 zookeeper,上游生產者感知黑名單,停止向高 lag partition 中繼續發送數據。
Flink kafka source 中基於 flink AggregateFunction 機制,kafka source subtask 上報 lag 到 job manager,job manager 基於全局 lag 判斷註冊黑名單到 zookeeper
黑名單判斷邏輯:當單 tp lag > min(全局 lag 平均值,全局 lag 中位數)* 倍數 && 單 tp lag 大於 lag 絕對值, 其中 "單 tp lag 大於 lag 絕對值" 是爲了規避此機制過於敏感,"單 tp lag> min(全局 lag 平均值,全局 lag 中位數)* 倍數" 用於篩選出頭部的 lag tp。爲了防止黑名單比例過大,黑名單剔除的 tp 數量上限不得大於全部 tp 數量的一定比例。
局部流量調度和全局流量調度在管道熱點優化效果上存在一定的互補性,但是也各有優勢。
6.2 全鏈路埋點質量監控
數據質量是重要一環,通常數據質量包含完整性、時效性、準確性、一致性、唯一性等方面,對於數據傳輸場景,當面我們重點關注完整性和時效性兩個方面
整體質量方案大致包含監控數據採集和規則配置兩個大的方向,整體架構如下:
監控數據採集
我們自研了 trace 系統:以 logid 爲單位,我們在數據處理流程中的每一層都進行了監控埋點
-
每層埋點包含三個方面:接收、發送、內部錯誤。所有埋點數據以數據創建時間(ctime)進行窗口對齊,並且通過更新 utime 以統計層間和層內的處理耗時。
-
通過監控埋點可以實時統計出:端到端、層級間、層級內部的數據處理耗時、完整性、錯誤數。
-
當前方案缺陷:flink sql 掛掉從 ck 恢復,監控數據不能保證冪等,後續需要進一步改進。
監控報警規則
我們針對數據流進行了分級,每個等級指定了不同的保障級別(SLA),SLA 破線,報警通知 oncall 同學處理。
延遲歸檔報警:hdfs 分區提交延遲,觸發報警。
實時完整性監控:基於 trace 數據,實時監控端到端的完整性,接收條數 / 落地條數
離線數據完整性:hdfs 分區 ready 後,觸發 dqc 規則運行,對比接收條數(trace 數據)/ 落地條數(hive 查詢條數)
傳輸延遲監控:基於 trace 數據,計算端到端數據傳輸延遲的分位數。
DQC 阻塞:離線數據完整性異常後,阻塞下游作業的調度。
6.3 kafka 同步斷流重複優化
相對比 2.0 方案中 flume 方案,基於 flink sql 的 kafka 到 kafka 的實現方案明顯的一個變化就是作業的重啓、故障恢復會導致整體的斷流和一定比例的數據重複(從 checkpoint 恢復),因此如何降低用戶對此類問題的感知,至關重要。
首先梳理下可能造成問題出現的原因:1)作業升級重啓 2)task manager 故障 3)job manager 故障 4)checkpoint 連續失敗,同時根據 flink job 整體提交流程,影響作業恢復速度的關鍵環節是資源的申請。根據上述分析和針對性測試,針對 kafka 同步場景的斷流重複採用瞭如下優化方式:
-
checkpoint interval 設置成 10s:降低從 checkpoint 恢復導致的數據重複比例
-
基於 session 模式提交作業:作業重啓無需重複申請資源
-
jobmanager.execution.failover-strategy=region,單個 tm 掛掉後,只恢復對應的 region,不用恢復整個作業。集成作業 DAG 相對簡單,可以儘量規避 rebalance 的出現,降低恢復的比例。
-
使用小資源粒度 task manager(2core cpu,8GB memory,2 slot):同等資源規模下,tm 數量變多,單 tm 掛掉影響程度明顯變低。
-
針對高優作業冗餘 task manager:冗餘一個 tm,當單個 tm 掛掉情況下,流量幾乎沒受影響
-
基於 zookeeper 實現 job manager ha:在開啓 jm ha 後,jm 掛掉任務未斷流
-
針對 checkpoint 連續失敗的場景,我們引入了 regional checkpoint,以 region(而不是整個 topology)作爲 checkpoint 管理的單位,防止個別 task 的 ck 失敗造成整個作業的失敗,可以有效防止在個別 task 的 ck 連續失敗的情況下需要回溯的數據量,減小集羣波動(網絡,HDFS IO 等)對 checkpoint 的影響
經過上述優化,經過測試一個(50core,400GB memory,50 slot)規模的作業,優化效果如下:
6.4 kafka 流量動態 failover 能力
爲了保證數據及時上報,Lancer 對於數據緩衝層的 kafka 的發送成功率依賴性很高,經常遇到的 case 是高峯期或者流量抖動導致的 kafka 寫入瓶頸。參考 Netflix Hystrix 熔斷原理,我們在網關層實現了一種動態 kafka failover 機制:網關可以根據實時的數據發送情況計算熔斷率,根據熔斷率將流量在 normal kafka 和 failover kafka 之間動態調節。
- 基於滑動時間窗口計算熔斷比例:滑動窗口的大小爲 10,每個窗口中統計 1s 內成功和失敗的次數。
-
熔斷器狀態:關閉 / 打開 / 半開,熔斷率 = fail_total/sum_total , 爲避免極端情況流量全切到 failover,熔斷率需要有一個上限配置。熔斷後的降級策略:normal kafka 熔斷後嘗試切 failover,failover kafka 如果也熔斷的話就切回 normal
-
判斷邏輯:
6.5 全鏈路流控、反壓、降級
從端上上報到數據落地的整個流程中,爲了保證穩定性和可控性,除了前述手段,我們還引入了整體流控、反壓、降級等技術手段,下面綜合介紹下。
從後向前,分爲幾個環節:
- 數據分發層:
-
如果出現消費延遲,數據反壓到數據緩衝層 kafka
-
單作業內部通過 backlog 反壓做 subtask 之間的流量均衡
- 數據網關層:
-
如果寫入 kafka 延遲,直接返回流控碼(429)給數據上報端
-
數據網關層和數據分發層之間通過 kafka tp 級別流控調度適配局部 tp 處理延遲。
- 數據上報層:
-
適配數據網關的流控返回:做退避重試
-
基於本地磁盤進行數據的堆積
-
配置動態推送生效主動採樣 / 降級堆積
6.6 開發階段質量驗證
爲了在開發階段保證整體服務的正確性和穩定性,開發階段我們設計了一套完整的測試框架。
-
新版本上線之前,我們會同時雙跑新舊兩條作業鏈路,將數據分別落入兩張 hive 表,並且進行全分區的條數和內容 md5 校驗,校驗結果以小時級別 / 天級別報表的形式發出。此測試框架保證了版本迭代的過程中,端到端的正確性。
-
同時爲了保證異常極端情況下數據的準確性,我們也引入了混沌測試,主動注入一些異常。異常包括:job manager 掛掉,taskmanager 掛掉、作業隨機重啓、局部熱點、髒數據等等。
07 未來展望
-
鏈路架構升級,接入公司級的數據網關(Databus),架構統一併且可以涵蓋更多的數據上報場景。
-
雲原生,擁抱 K8S,面向用戶 quota 管理,並且實現自動資源 AutoScale。
-
擁抱批流一體,強化增量化集成,覆蓋離線批集成場景,打造統一基於 Flink 的統一化集成框架。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/NawxeiP-_DFpyoekRrzlLQ