百萬 QPS,秒級延遲,攜程基於實時流的大數據基礎層建設

作者簡介

紀成,攜程數據開發總監,負責金融數據基礎組件及平臺開發、數倉建設與治理相關的工作。對大數據領域開源技術框架有濃厚興趣。

一、背景

2017 年 9 月攜程金融成立,本着踐行金融助力旅行的使命,開始全面開展集團風控和金融業務,需要在攜程 DC 構建統一的金融數據中心,實現多地多機房間的數據融合,滿足離線和在線需求;涉及數千張 mysql 表到離線數倉 、實時數倉、在線緩存的同步工作。由於跨地域、實時性、準確性、完整性要求高,集團內二次開發的 DataX(業界常用的離線同步方案)無法支持。以 mysql-hive 同步爲例,DataX 通過直連 MySQL 批量拉取數據,存在以下問題:

1)性能瓶頸:隨着業務規模的增長,離線批量拉取的數據規模越來越大,影響 mysql-hive 鏡像表的產出時間,進而影響數倉下游任務。對於一些需要 mysql-hive 小時級鏡像的場景更加捉襟見肘。

2)影響線上業務:離線批量拉取數據,可能引起慢查詢,影響業務庫的線上服務。

3)無法保證冪等:由於線上庫在實時更新,在批量拉取 SQL 不變的情況下,每次執行可能產生不一樣的結果。比如指定了 create_time 範圍,但一批記錄的部分字段(比如支付狀態)時刻在變化。也即無法產出一個明確的 mysql-hive 鏡像 , 對於一些對時點要求非常高的場景(比如離線對賬) 無法接受。

4)缺乏對 DELETE 的支持:業務庫做了 DELETE 操作後,只有整表全量拉取,才能在 Hive 鏡像裏體現。

二、方案概述

    

基於上述背景,我們設計了一套基於 binlog 實時流的數據基礎層構建方案,並取得了預期效果。架構如圖,各模塊簡介:

1)webUI 做 binlog 採集的配置,以及 mysql->hive,mysql→實時數倉,mysql→在線緩存的鏡像配置工作。

2)canal 負責 binlog 採集 ,寫入 kafka ;其中 kafka 在多地部署,並通過專線實現 topic 的實時同步。

3)spark-streaming 負責將 binlog 寫入 HDFS。

4)merge 離線調度的 ETL 作業,負責將 HDFS 增量和 snap 合併成新的 snap。

5)mirror 負責將 binlog 事件更新到實時數倉、在線緩存。

6)基礎服務:包括歷史數據的重放,數據校驗,全鏈路監控,明文檢測等功能。

三、詳細介紹

    

本章將以 mysql-hive 鏡像爲例,對技術方案做詳細介紹。

3.1.binlog 採集

canal 是阿里巴巴開源的 Mysql binlog 增量訂閱和消費組件,在業界有非常廣泛的應用,通過實時增量採集 binlog ,可以降低對 mysql 的壓力,細粒度的還原數據的變更過程,我們選型 canal 作爲 binlog 採集的基礎組件,根據應用場景做了二次開發,其中 raw binlog → simple binlog 的消息格式轉換是重點。

下面是 binlog 採集的架構圖:

canal 在 1.1.4 版本引入了 canal-admin 工程,支持面向 WebUI 的管理能力;我們採用原生的 canal-admin 對 binlog 採集進行管理 ,採集粒度是 mysql instance 級別。

Canal Server 會向 canalAdmin 拉取所屬集羣下的所有 mysql instance 列表,針對每個 mysql instance 採集任務,canal server 通過在 zookeeper 創建臨時節點的方式實現 HA,並通過 zookeeper 實現 binlog position 的共享。

canal 1.1.1 版本引入 MQProducer 原生支持 kafka 消息投遞 , 圖中 instance active 從 mysql 獲取實時的增量 raw binlog 數據,在 MQProducer 環節進行 raw binlog → simple binlog 的消息轉換,發送至 kafka。我們按照 instance 創建了對應的 kafka topic,而非每個 database 一個 topic , 主要考慮到同一個 mysql instance 下有多個 database,過多的 topic (partition) 導致 kafka 隨機 IO 增加,影響吞吐。發送 Kafka 時以 schemaName+tableName 作爲 partitionKey,結合 producer 的參數控制,保證同一個表的 binlog 消息按順序寫入 kafka。

參考 producer 參數控制:

max.in.flight.requests.per.connection=1
retries=0
acks=all

topic level 的配置:

topic partition 3副本, 且
min.insync.replicas=2

從保證數據的順序性、容災等方面考慮,我們設計了一個輕量級的 SimpleBinlog 消息格式:

金融當前部署了 4 組 canal 集羣,每組 2 個物理機節點,跨機房 DR 部署,承擔了數百個 mysql instance binlog 採集工作。Canal server 自帶的性能監控基於 Prometheus 實現,我們通過實現 PrometheusScraper 主動拉取核心指標,推送到集團內部的 Watcher 監控系統上,配置相關報警,其中各 mysql instance 的 binlog 採集延遲是全鏈路監控的重要指標。

系統上線初期遇到過 canal-server instance 腦裂的問題,具體場景是 active instance 所在的 canal-server ,因網絡問題與 zookeeper 的鏈接超時,這時候 standby instance 會搶佔創建臨時節點,成爲新的 active;也就出現了 2 個 active 同時採集並推送 binlog 的情況。解決辦法是 active instance 與 zookeeper 鏈接超時後,立即自 kill,再次發起下一輪搶佔。

3.2 歷史數據重放

有兩個場景需要我們採集歷史數據:

1)首次做 mysql-hive 鏡像 ,需要從 mysql 加載歷史數據;

2)系統故障(丟數等極端情況),需要從 mysql 恢復數據。

有兩種方案:

1)從 mysql 批量拉取歷史數據,上傳到 HDFS 。需要考慮批量拉取的數據與 binlog 採集產出的 mysql-hive 鏡像的格式差異,比如去重主鍵的選擇,排序字段的選擇等問題。

2)流式方式, 批量從 mysql 拉取歷史數據,轉換爲 simple binlog 消息流寫入 kafka,同實時採集的 simple binlog 流複用後續的處理流程。在合併產生 mysql-hive 鏡像表時,需要確保這部分數據不會覆蓋實時採集的 simple binlog 數據。

我們選用了更簡單易維護的方案 2,並開發了一個 binlog-mock 服務,可以根據用戶給出的庫、表(前綴)以及條件,按批次(比如每次 select 10000 行)從 mysql 查詢數據,組裝成 simple_binlog 消息發送 kafka。

對於 mock 的歷史數據,需要注意:

1)保證不覆蓋後續實時採集的 binlog:simple binlog 消息裏 binlogOffset 字段用於全局排序,它由 ${timestamp}+${seq} 組成,mock 的這部分數據 timestamp 爲發起 SQL 查詢的時間戳向前移 5 分鐘,seq 爲 000000;  

 2)落到哪個分區:我們根據 binlog 事件時間 (executeTime) 判斷數據所屬哪個 dt 分區,mock 的這部分數據 executeTime 爲用戶指定的一個值,默認爲 ${yesterday}。

**3.3 Write2HDFS **

我們採用 spark-streaming 將 kafka 消息持久化到 HDFS,每 5 分鐘一個批次,一個批次的數據處理完成(持久化到 HDFS)後再提交 consumer offset,保證消息被 at-least-once 處理;同時也考慮了分庫分表問題、數據傾斜問題:

屏蔽分庫分表:以訂單表爲例,mysql 數據存儲在 ordercenter_00 ... ordercenter_99 100 個庫,每個庫下面又有 orderinfo_00...orderinfo_99 100 張表,庫前綴 schemaNamePrefix=ordercenter, 表前綴 tableNamePrefix=orderinfo,統一映射到 tableName=${schemaNamePrefix}${tableNamePrefix} 裏; 根據 binlog executeTime 字段生成對應的分區 dt,確保同一個庫表同一天的數據落到同一個分區目錄裏:  base_path/ods_binlog_source.db/${database_prefix}${table_prefix}/dt={binlogDt}/binlog-{timestamp}-{rdd.id}

防止數據傾斜:  系統上線初期經常出現數據傾斜問題,排查發現某些時間段個別表由於業務跑批等產生的 binlog 量特別大,一張表一個批次的數據需要寫入同一個 HDFS 文件,單個 HDFS 文件的寫入速度成爲瓶頸。因此增加了一個環節(Step2),過濾出當前批次裏的 “大表 ",將這些大表的數據分散寫入多個 HDFS 文件裏。 

base_path/ods_binlog_source.db/${database_prefix}_${table_prefix}/dt={binlogDt}/binlog-{timestamp}-{rdd.id}-[${randomInt}]

3.4 生成鏡像

3.4.1  數據就緒檢查

spark-streaming 作業每 5 分鐘一個批次將 kafka simple_binlog 消息持久化到 HDFS,merge 任務是每天執行一次。每天 0 點 15 分,開始進行數據就緒檢查。我們對消息的全鏈路進行了監控,包括 binlog 採集延遲 t1 、kafka 同步延遲 t2 、spark-streaming consumer 延遲 t3。假設當前時間爲凌晨 0 點 30 分,設爲 t4,若 t4>(t1+t2+t3) 說明 T-1 日數據已全部落入 HDFS,即可執行下游的 ETL 作業(merge)。

3.4.2  Merge

HDFS 上的 simple binlog 數據就緒後,下一步就是對相應 MySQL 業務表數據進行還原。以下是 Merge 的執行流程,步驟如下:

1)加載 T-1 分區的 simple binlog 數據

數據就緒檢查通過後,通過 MSCK REPAIR PARTITION 加載 T-1 分區的 simple_binlog 數據,注意:這個表是原始的 simple binlog 數據,並未平鋪具體 mysql 表的字段。如果是首次做 mysql-hive 鏡像,歷史數據重放的 simple binlog 也會落入 T-1 分區。

2)檢查 Schema ,並抽取 T-1 增量

請求 mirror 後臺,獲取最新的 mysql schema,如果發生了變更則更新 mysql-hive 鏡像表 (snap),讓下游無感知;同時根據 mysql schema 的 field 列表 、以及 "hive 主鍵" 等配置信息,從上述 simple_binlog 分區抽取出 mysql 表的 T-1 日明細數據 (delta)。

3)判斷業務庫是否發生了歸檔操作,以決定後續合併時是否忽略 DELETE 事件。

業務 DELETE 數據有 2 種情況:業務修單等引起的正常 DELETE,需要同步變更到 Hive;業務庫歸檔歷史數據產生的 DELETE,這類 DELETE 操作需要忽略掉。

系統上線初期,我們等待業務或 DBA 通知,然後手工處理,比較繁瑣,很多時候會有通知不到位的情況,導致 Hive 數據缺失歷史數據。爲了解決這個問題,在 Merge 之前進行程自動判斷,參考規則如下:

a)業務歸檔通常是大批量的 DELETE(百萬 +),因此可以設置一個閾值,比如 500W 或日增量的 7 倍。 

 b)業務歸檔的時間段通常比較久,比如設置閾值爲 30 天。如果滿足了條件 1,且刪除的這些數據在 30 天以前,則屬於歸檔產生的 DELETE。

4)對增量數據(delta)和當前快照(snap T-2)進行合併去重,得到最新 snap T-1。

下面通過一個例子說明 merge 的過程,假設訂單 order 表共有 id,order_no,amount 三個字段,id 是全局唯一建;  snap 表 t3 是 mysql-hive 鏡像,merge 過程如圖展示。

1)加載目標(dt=T-1)分區裏的 simple binlog 數據,表格式如 t1;

2)請求 mirror 後臺獲取 mysql 的最新 schema,從 t1 抽取數據到臨時表 t2; 

3)snap 表 t3 與 mysql schema 進行適配(本例無變更);   

4)對增量表 t2、存量 snap t3 進行 union(對 t3 自動增加 type 列,值爲 INSERT),得到臨時表 t4;

5)對 t4 表按唯一鍵 id 進行 row_number,分組按 binlogOffset 降序排序,序號爲 1 的即爲最新數據。

3.4.3  check

在數據 merge 完成後,爲了保證 mysql-hive 鏡像表中數據準確性,會對 hive 表和 mysql 表進行字段和數據量對比,做好最後一道防線。我們在配置 mysql-hive 鏡像時,會指定一個檢查條件,通常是按 createTime 字段對比 7 天的數據;mirror 後臺每天凌晨會預先從 mysql 統計出過去 7 日增量,離線任務通過腳本(http)獲取上述數據,和 snap 表進行校驗。實踐中遇到一些問題:

1)T-1 的 binlog 落在 T 分區的情況

check 服務根據 createTime 生成查詢條件去 check mysql 和 Hive 數據,由於業務 sql 裏的 createTime 和 binlog executeTime 不一致,分別爲凌晨時刻的前後 1 秒,會導致 Hive 裏漏掉這條數據,這種情況可以通過一起加載 T 日分區的 binlog 數據,重新 merge。

2)業務表遷移,原錶停止更新,雖然 mysql 和 hive 數據量一致,但已經不符合要求了,這種情況可以通過波動率發現。

3.5 其他

在實踐中,可根據需要在 binlog 採集以及後續的消息流裏引入一些數據治理工作。比如:

1)明文檢測:binlog 採集環節對核心庫表數據做實時明文檢測,可以避免敏感數據流入數倉;

2)標準化:一些字段的標準化操作,比如 id 映射、不同密文的映射;

3)元數據:mysql→hive 鏡像是數倉 ODS 的核心,可以根據採集配置信息,實現二者映射關係的雙向檢索,便於數倉溯源。這塊是金融元數據管理的重要組成部分。

通過消費 binlog 實現 mysql 到實時數倉(kudu、es)、在線緩存(redis)的鏡像邏輯相對簡單,限於篇幅,本文不再贅述。

四、總結與展望

金融基於 binlog 的數據基礎層構建方案,順利完成了預期目標:

1)金融數據中心建設(ODS 層):數千張 mysql 表到攜程 DC 的鏡像, 全部 T+1 1:30 產出;

2)金融實時數倉建設:金融核心 mysql 表到 kudu 的鏡像,支持實時分析、分表合併查詢等偏實時的運營場景;

3)金融在線緩存服務:異地多活,緩存近 1000G 業務數據;支撐整個消金入口、風控業務近 100W/min 的請求。

該方案已經成爲金融在線和離線服務的基石,並在持續擴充使用場景。未來會在自動化配置(整合 mirror-admin 和 canal-admin,實現一鍵構造)、智能運維(數據 check 異常的識別與恢復)、元數據管理方面做更多的投入。

本文介紹了攜程金融構建大數據基礎層的技術方案,着重介紹了 binlog 採集和 mysql-hive 鏡像的設計,以及實踐中遇到的一些問題及解決辦法。希望能給大家帶來一些參考價值,也歡迎大家一起來交流。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/15aQ5oeV-hLeTUnBMzXiog