B 站流式傳輸架構的前世今生

本期作者

王翔宇

嗶哩嗶哩資深開發工程師

2017 年加入 B 站,現服務於基礎架構實時團隊。先後負責 B 站日誌系統、實時流式傳輸工作。

魏澤豐

嗶哩嗶哩高級開發工程師

2021 年加入 B 站,現服務於基礎架構實時團隊,負責實時流式傳輸以及 Flink CDC 相關工作。

高瑞超

嗶哩嗶哩資深開發工程師

2021 年加入 B 站,現服務於基礎架構實時團隊,負責實時流式傳輸以及 Flink connector 相關工作。

01 背景

Lancer 是 B 站的實時流式傳輸平臺,承載全站服務端、客戶端的數據上報 / 採集、傳輸、集成工作,秒級延遲,作爲數倉入口是 B 站數據平臺的生命線。目前每日峯值 5000w/s rps, 3PB / 天, 4K + 條流的數據同步能力。

圖片

服務如此大的數據規模,對產品的可靠性、可擴展性和可維護性提出了很高的要求。流式傳輸的實現是一個很有挑戰的事情,聚焦快、準、穩的需求,  Lancer 整體演進經歷了大管道模型、BU 粒度管道模型、單流單作業模型三個階段的演進,下面我們娓娓道來。

02 關鍵詞說明

logid:每個業務方上報的數據流以 logid 進行標識,logid 是數據在傳輸 + 集成過程中的元信息標識。

數據源:數據進入到 lancer 的入口,例如:log-agent,bfe-agent,flink cdc

lancer-gateway(數據網關):接收數據上報的網關。

數據緩衝層:也叫做內部 kafka,用於解耦數據上報和數據分發。

lancer-collector(數據分發層):也叫做數據同步,可以根據實際場景完成不同端到端的數據同步。

03 技術演進

整個 B 站流式數據傳輸架構的演進大致經歷了三個階段。

3.1 架構 V1.0 - 基於 flume 的

大管道數據傳輸架構 (2019 之前)

B 站流式傳輸架構建立之初,數據流量和數據流條數相對較少,因此採用了全站的數據流混合在一個管道中進行處理,基於 flume 二次定製化的數據傳輸架構,架構如下:

圖片

v1.0 架構在使用中暴露出一些的痛點:

  1. 數據源端對於數據上報的可控性和容錯性較差,例如:
  1. 整體架構是一個大管道模型,資源的劃分和隔離不明確,整體維護成本高,自身故障隔離性差。

  2. 基於 flume 二次迭代的一些缺陷:

3.2 架構 V2.0-BU 粒度的

管道化架構 (2020-2021)

針對 v1.0 的缺陷,我們引入了架構 v2.0,架構如下:

圖片

此架構的關鍵細節如下:

  1. 強化了數據上報源端的邊緣可控能力
  1. 數據管道以 BU 爲粒度搭建,管道間資源隔離,每個管道包含整套獨立的完整數據傳輸鏈路,並且數據管道支持基於 airflow 快速搭建。故障隔離做到 BU 級別。

  2. 數據網關升級到自研 lancer-gateway2.0,邏輯精簡,支持流控反壓,並且適配 kafka failover, 基於 k8s 進行部署。

  3. hdfs 分發基於 flink jar 進行實現:支持 exactly once 語義保證。

V2.0 架構相對於 v1.0, 重點提升了數據上報邊緣的可控力、BU 粒度管道間的資源劃分和隔離性。但是隨着 B 站流式數據傳輸規模的快速增加,對數據傳輸的時效性、成本、質量也提出了越來越高的要求,V2.0 也逐漸暴露出了一些缺陷:

  1. logid 級別隔離性差:
  1. 網關是異步發送模型,極端情況下(組件崩潰),存在數據丟失風險。

  2. ods 層局部熱點 / 故障影響放大

  1. hdfs 小文件問題放大

針對上述痛點,最直接的解決思路就是整體架構做進一步的隔離,以單 logid 爲維度實現數據傳輸 + 分發。面臨的挑戰主要有以下幾個方面:

單流單作業數據集成方案

在 V3.0 架構中,我們對整體傳輸鏈路進行了單作業單數據流隔離改造,並且基於 Flink SQL 支撐數據分發場景。架構如下:

圖片

相比 v2.0, 資源池容量管理上依然以 BU 爲粒度,但是每個 logid 的傳輸和分發相互獨立,互不影響。具體邏輯如下 :

相較於之前的實現,v3.0 架構具有以下的優勢:

  1. 可靠性:
  1. 可維護性上:
  1. 可擴展性:

04 V3.0 架構具體實現

我們重點介紹下,當前 V3.0 結構各個分層的實現。

4.1 數據上報邊緣層

4.1.1 log-agent

基於 go 自研,插件化架構,部署於物理機,可靠、高效的支持服務端數據上報。

圖片

時間架構分爲收集、處理、發送三層,具有以下主要特性:

4.1.2 bfe-agent

基於 go 自研,部署於 cdn,用於承載公網數據上報。

圖片

邊緣 cdn 節點,cdn 服務器上部署 nginx 和 bfe-agent,bfe-agent 整體實現架構與 log-agent 類似,對於 web 和 app 端數據上報請求 QPS 高、突發性強的特點,主要強化了以下能力:

4.2 數據上報網關層

v3.0 方案中,數據數據網關的架構如下:

圖片

數據網關功能特性如下:

整個數據網關中的實現難點是:單 gateway 承載多 logid 處理的過程中如何保證隔離性和公平性,我們參考了 Golang 中 GMP 的機制,整體數據流程如下:

圖片

  1. 收到的請求,會把請求放到 logid 對應的請求隊列,如果隊列滿,直接拒絕請求

  2. 每個 kafka 集羣,會初始化一個 N 大小的 kafka producer pool,其中每個 producer 會遍歷所有的隊列,進行數據的發送。

  3. 對於每個 logid 的請求隊列,會從兩個維護限制資源的佔用,以保證公平性和隔離性

4.3 數據上報分發層

圖片

隨着 flink 在實時計算領域的成熟,其高性能、低延遲、exactly once 語義保證、批流一體、豐富的數據源支持、活躍的社區等優勢,促使我們選擇了以 flink sql 作爲數據分發層的解決方案。當前我們主要支持了 kafka→hive, kafka→kafka, cdc→kafka->hudi/hive 三種場景:

  1. kafka→hive
  1. kafka→kafka
  1. cdc→kafka->hudi/hive

圖片

在 Flink SQL 數據分發場景的支持中,針對我們遇到的實際需求,對社區原生 connector 進行了對應的優化,我們針對性的介紹下。

5.1 hive sink connector 優化

斷流空分區提交

背景:B 站離線作業的拉起依賴上游分區提交,HDFS 分區提交的判斷依賴於作業整體 watermark 的推進,但是某些 logid 在斷流的情況下,如何進行分區的提交呢

解決辦法:

圖片

如圖所示:當所有的 StreamFileWriter 連續兩次 checkpoint 內未處理任何數據的情況下,StreamingFileCommiter 會判定發生了斷流,按照當前時間提交分區。

支持下游增量數據同步

背景:傳統方式 ods 到 dwd 的數據同步只有當 ods 層分區 ready 之後纔會開始,時效性較差,如何加速數據的同步?

解決辦法:

圖片

圖片

orc+zstd

背景:相較於行式存儲,列式存儲在壓縮比上有着顯著的優勢。

解決辦法:支持 orc+zstd, 經過測試,相較於 text+lzo,空間節省在 40% 以上。

hdfs 異步 close

背景:snapshot 階段 flush 數據,close 文件經常因爲個別文件慢拖累整體吞吐。

解決辦法:

小文件合併

背景:rolling on checkpoint 的滾動策略,會導致文件數量的膨脹,對 namenode 產生較大的壓力。

解決辦法:

圖片

5.2 kafka connector 優化

支持 protobuf format

背景:用戶有處理 protobuf 格式數據的需求

解決辦法:

kafka sink 支持自定義分流

背景:用戶希望在一個 sql 作業中根據需要,靈活定製將消息發送到指定 kafka 集羣和 topic。

解決辦法:

CREATE TABLE sink_test (
  broker_name_arg varchar,
  topic_name_arg varchar,
  message string,
  t1 string
) WITH(
'bootstrapServers' = 'BrokerUdf(broker_name_arg)', // 根據broker_name_arg作爲udf參數計算brokers
'bootstrapServers.udf.class' = 'com.bilibili.lancer.udf.BrokerUdf', // 獲取brokers Udf
'topic' = 'TopicUdf(broker_name_arg, topic_name_arg)', // 根據broker_name_arg和topic_name_arg作爲udf參數計算topic
'topic.udf.class' = 'com.bilibili.lancer.udf.TopicUdf', // 計算topoc Udf
'udf.cache.min' = '1', // 緩存時間
'exclude.udf.field' = 'false', // udf的相關字段是否輸出
'connector' = 'kafka-diversion'
);

5.3 cdc connector 優化

sql 場景下多庫多表場景支持

背景:原生的 flink cdc source 在單個 sql 任務中,只能同步相同 DDL 定義的表,如果需要同步異構 DDL,不得不啓動多個獨立的 job 進行同步。這樣會存在資源的額外開銷。

解決辦法:

圖片

斷流場景分區提交支持

背景:由於整個 cdc 方案存在上游和下游兩個獨立的 job,並且都是基於 event time 推進 watermark 做分區的提交,下游 watermark 的推進受阻可能受到數據正常斷流或者上游作業異常兩種原因的影響,如果正確判斷呢?

解決辦法:

圖片

CREATE TABLE mysql_binlog (
  host_name STRING METADATA              FROM 'host_name' VIRTUAL,
  db_name STRING METADATA                FROM 'database_name' VIRTUAL,
  table_name STRING METADATA             FROM 'table_name' VIRTUAL,
  operation_ts TIMESTAMP(3) METADATA     FROM 'op_ts' VIRTUAL,
  sequence BIGINT METADATA               FROM 'sequence' VIRTUAL,  // sequence嚴格單調遞增
  heartbeat BOOLEAN METADATA             FROM 'heartbeat'VIRTUAL, // 對於心跳信息標識爲true
  mtime TIMESTAMP(3) METADATA            FROM 'mtime'VIRTUAL, // 提取mtime,用於下游推進watermark
  id BIGINT NOT NULL,
  filed_list BYTES NOT NULL,  // 去DDL,在source內部數據全部按照changelog-json格式進行序列化、
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'xxxx',
  'port' = '3552',
  'username' = 'datacenter_cdc',
  'password' = 'xxx',
  'database-name' = 'xxx',
  'debezium.format' = 'bytes',
  'table-name' = 'xxx',
  'server-time-zone' = 'Asia/Shanghai',
  'heartbeat.enable'='true',  
  'scan.incremental.snapshot.chunk.size' = '80960'
);

06 架構穩定性優化

爲了保障流式傳輸穩定和高效運行,我們在以下幾個方面做了一些優化,分別介紹下:

6.1 管道熱點優化

作業在正常運行的過程中,經常遇到局部熱點問題,例如 kafka/hdfs io 熱點導致局部並行度消費速度下降或者寫入受阻、yarn 隊列機器 load 不均勻導致作業部分並行度消費能力不如,雖然原因多種多樣,但是本質看,這些問題的一個共性就是由於局部熱點導致局部數據延遲。針對這個問題,我們分別從局部流量調度和全局流量調度兩個維度進行優化。

局部流量調度

局部流量調度的優化思路是在單個 producer 和 task 內部,分區之間進行流量的重分配。目前在兩個點就行了優化:

圖片

全局流量調度

全局流量調度的優化思路是整個傳輸鏈路層級之間的流量調配,目前我們將生產者 (lancer-gateway) 與消費者 (flink sql kafka source) 進行聯動,當消費者出現 tp 消費 lag 的情況,通過註冊黑名單(lag partition)到 zookeeper,上游生產者感知黑名單,停止向高 lag partition 中繼續發送數據。

圖片

Flink kafka source 中基於 flink AggregateFunction 機制,kafka source subtask 上報 lag 到 job manager,job manager 基於全局 lag 判斷註冊黑名單到 zookeeper

黑名單判斷邏輯:當單 tp lag > min(全局 lag 平均值,全局 lag 中位數)* 倍數 && 單 tp lag 大於 lag 絕對值, 其中 "單 tp lag 大於 lag 絕對值" 是爲了規避此機制過於敏感,"單 tp lag> min(全局 lag 平均值,全局 lag 中位數)* 倍數" 用於篩選出頭部的 lag tp。爲了防止黑名單比例過大,黑名單剔除的 tp 數量上限不得大於全部 tp 數量的一定比例。

局部流量調度和全局流量調度在管道熱點優化效果上存在一定的互補性,但是也各有優勢。

圖片

6.2 全鏈路埋點質量監控

數據質量是重要一環,通常數據質量包含完整性、時效性、準確性、一致性、唯一性等方面,對於數據傳輸場景,當面我們重點關注完整性和時效性兩個方面

圖片

整體質量方案大致包含監控數據採集和規則配置兩個大的方向,整體架構如下:

圖片

監控數據採集

我們自研了 trace 系統:以 logid 爲單位,我們在數據處理流程中的每一層都進行了監控埋點

圖片

監控報警規則

我們針對數據流進行了分級,每個等級指定了不同的保障級別(SLA),SLA 破線,報警通知 oncall 同學處理。

延遲歸檔報警:hdfs 分區提交延遲,觸發報警。

實時完整性監控:基於 trace 數據,實時監控端到端的完整性,接收條數 / 落地條數

離線數據完整性:hdfs 分區 ready 後,觸發 dqc 規則運行,對比接收條數(trace 數據)/ 落地條數(hive 查詢條數)

傳輸延遲監控:基於 trace 數據,計算端到端數據傳輸延遲的分位數。

DQC 阻塞:離線數據完整性異常後,阻塞下游作業的調度。

6.3 kafka 同步斷流重複優化

相對比 2.0 方案中 flume 方案,基於 flink sql 的 kafka 到 kafka 的實現方案明顯的一個變化就是作業的重啓、故障恢復會導致整體的斷流和一定比例的數據重複(從 checkpoint 恢復),因此如何降低用戶對此類問題的感知,至關重要。

首先梳理下可能造成問題出現的原因:1)作業升級重啓 2)task manager 故障 3)job manager 故障 4)checkpoint 連續失敗,同時根據 flink job 整體提交流程,影響作業恢復速度的關鍵環節是資源的申請。根據上述分析和針對性測試,針對 kafka 同步場景的斷流重複採用瞭如下優化方式:

經過上述優化,經過測試一個(50core,400GB memory,50 slot)規模的作業,優化效果如下:

圖片

6.4 kafka 流量動態 failover 能力

爲了保證數據及時上報,Lancer 對於數據緩衝層的 kafka 的發送成功率依賴性很高,經常遇到的 case 是高峯期或者流量抖動導致的 kafka 寫入瓶頸。參考 Netflix Hystrix 熔斷原理,我們在網關層實現了一種動態 kafka failover 機制:網關可以根據實時的數據發送情況計算熔斷率,根據熔斷率將流量在 normal kafka 和 failover kafka 之間動態調節。

圖片

圖片

6.5 全鏈路流控、反壓、降級

從端上上報到數據落地的整個流程中,爲了保證穩定性和可控性,除了前述手段,我們還引入了整體流控、反壓、降級等技術手段,下面綜合介紹下。

圖片

從後向前,分爲幾個環節:

  1. 數據分發層: 
  1. 數據網關層:
  1. 數據上報層:

6.6 開發階段質量驗證

爲了在開發階段保證整體服務的正確性和穩定性,開發階段我們設計了一套完整的測試框架。

07 未來展望

圖片

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/NawxeiP-_DFpyoekRrzlLQ