億級數據服務平臺:跟低效率、指標難統一的數倉說再見!
數據服務是數據中臺體系中的關鍵組成部分。作爲數倉對接上層應用的統一出入口,數據服務將數倉當作一個統一的 DB 來訪問,提供統一的 API 接口控制數據的流入及流出,能夠滿足用戶對不同類型數據的訪問需求。
電商平臺唯品會的數據服務自 2019 年開始建設,在公司內經歷了從無到有落地,再到爲超過 30 + 業務方提供 to B、to C 的數據服務的過程。本文主要介紹唯品會自研數據服務 Hera 的相關背景、架構設計和核心功能。
一、背景
在統一數倉數據服務之前,數倉提供的訪問接入方式往往存在效率問題低、數據指標難統一等問題,具體而言有以下幾個比較突出的情況:
-
廣告人羣 USP、DMP 系統每天需要通過 HiveServer 以流的方式從數倉導出數據到本地,每個人羣的數據量從幾十萬到幾個億,人羣數量 2w+,每個人羣運行時間在 30min +,部分大人羣的運行直接超過 1h,在資源緊張的情況下,人羣延遲情況嚴重。
-
數倉的數據在被數據產品使用時,需要爲每個表新生成一個單獨的接口,應用端需要爲每一種訪問方式(如 Presto、ClickHouse)區分使用不同的接口,導致數據產品接口暴漲,不方便維護,影響開發及維護效率。數據在不同的存儲時,需要包含 clickhouse-client,presto-client 等等第三方 jar 包。
-
不同數據產品中都需要使用一些常用的數據指標,如銷售額、訂單數、PV、UV 等,而這些數據在不同數據產品的實現口徑、實現方式都不一樣,無法形成數據共享,每個數據產品都重複進行相同的指標建設。因此,在不同數據產品查看相同指標卻發現數值不同的情況下,難以判斷哪個數據產品提供的數據是準確的。
圖 1. 在統一數倉數據服務之前,數據流入流出方式
爲解決以上問題,數據服務應運而生。目前數據服務的主要優勢有:屏蔽底層的存儲引擎、計算引擎,使用同一個 API(one service),數倉數據分層存儲,不同 engine 的 SQL 生成能力,自適應 SQL 執行以及統一緩存架構保障業務 SLA,支持數據註冊並授權給任何調用方進行使用,提高數據交付效率。
通過唯一的 ID 標識,數據產品可通過 ID 查閱數據,而非直接訪問對應的數倉表。一方面,指標服務統一了指標的口徑,同時也支持快速構建新的數據產品。
二、架構設計
數據服務能給業務帶來運營和商業價值,核心在於給用戶提供自助分析數據能力。Hera 整體架構基於典型的 Master/slave 模型,數據流與控制流單獨鏈路,從而保障系統的高可用性。數據服務系統主要分爲三層:
-
應用接入層:業務申請接入時,可以根據業務要求選擇數據服務 API(TCP Client), HTTP 以及 OSP 服務接口(公司內部 RPC 框架)。
-
數據服務層:主要執行業務提交的任務,並返回結果。主要功能點包括:路由策略,多引擎支持,引擎資源配置,引擎參數動態組裝,SQLLispengine 生成,SQL 自適應執行,統一數據查詢緩存,FreeMaker SQL 動態生成等功能。
-
數據層:業務查詢的數據無論在數倉、Clickhouse、MySQL 還是 Redis 中,都可以很好地得到支持,用戶都使用同一套 API。
圖 2. 數據服務整體架構圖
調度系統的整體流程大致包含以下模塊:
-
Master:負責管理所有的 Worker、TransferServer、AdhocWorker 節點,同時負責調度分發作業;
-
Worker:負責執行 ETL 和數據文件導出類型的作業,拉起 AdhocWorker 進程(Adhoc 任務在 AdhocWorker 進程中的線程池中執行),ETL 類型的作業通過子進程的方式完成;
-
Client:客戶端,用於編程式地提交 SQL 作業;
-
ConfigCenter:負責向集羣推送統一配置信息及其它運行時相關的配置和 SQLParser (根據給定的規則解析、替換、生成改寫 SQL 語句,以支持不同計算引擎的執行);
-
TransferServer:文件傳輸服務。
圖 3. 數據服務調度流程圖
三、主要功能
Hera 數據服務的主要功能有:多隊列調度策略、多引擎查詢、多任務類型、文件導出、資源隔離、引擎參數動態組裝、自適應 Engine 執行和 SQL 構建。
1、多隊列調度策略
數據服務支持按照不同用戶、不同任務類型並根據權重劃分不同調度隊列,以滿足不同任務類型的 SLA。
2、多引擎查詢
數據服務支持目前公司內部所有 OLAP 和數據庫類型,包括 Spark、Presto、Clickhouse、Hive 、MySQL、Redis。會根據業務具體場景和要求,選擇當前最佳的查詢引擎。
3、多任務類型
數據服務支持的任務類型有:ETL、Adhoc、文件導出、數據導入。加上多引擎功能,實現多種功能組合,如 Spark adhoc 和 Presto adhoc。
4、文件導出
主要是支持大量的數據從數據倉庫中導出,便於業務分析和處理,比如供應商發券和信息推送等。
具體執行過程如下:
用戶提交需要導出數據的 SQL,通過分佈式 engine 執行完成後,落地文件到 hdfs/alluxio. 客戶端通過 TCP 拉取文件到本地。千萬億級的數據導出耗時最多 10min。數據導出在人羣數據導出上性能由原來的 30min+ ,提升到最多不超過 3min,性能提升 10~30 倍。具體流程如下:
圖 4. 數據服務文件下載流程圖
5、資源隔離(Worker 資源和計算資源)
業務一般分爲核心和非核心,在資源分配和調度上也不同。主要是從執行任務 Worker 和引擎資源,都可以實現物理級別的隔離,最大化減少不同業務之間相互影響。
6、引擎參數動態組裝
線上業務執行需要根據業務情況進行調優,動態限制用戶資源使用,集羣整體切換等操作,這個時候就需要對用戶作業參數動態修改,如 OLAP 引擎執行任務時,經常都要根據任務調優,設置不同參數。針對這類問題,數據服務提供了根據引擎類型自動組裝引擎參數,並且引擎參數支持動態調整,也可以針對特定任務、執行賬號、業務類型來設定 OLAP 引擎執行參數。
7、自適應 Engine 執行
業務方在查詢時,有可能因爲引擎資源不足或者查詢條件數據類型不匹配從而導致執行失敗。爲了提高查詢成功率和服務 SLA 保障,設計了 Ad Hoc 自適應引擎執行,當一個引擎執行報錯後,會切換到另外一個引擎繼續執行。具體自適應執行邏輯如下圖所示:
圖 5. 自適應 Engine 執行
8、SQL 構建
數據服務 SQL 構建基於維度事實建模,支持單表模型、星型模型和雪花模型。
-
單表模型:一張事實表,一般爲 DWS 或者 ADS 的彙總事實表。
-
星型模型:1 張事實表(如 DWD 明細事實表)+ N 張維表,例如訂單明細表 (事實表 FK = 商品 ID) + 商品維表 (維度表 PK = 商品 ID) 。
-
雪花模型:1 張事實表(如 DWD 明細事實表)+ N 張維表 + M 張沒有直接連接到事實表的維表,例如訂單明細表 (事實表 FK = 商品 ID) + 商品維表 (維度表 PK = 商品 ID,FK = 品類 ID) + 品類維表 (維度表 PK = 品類 ID)。
圖 6.SQL 維度模型
1)自定義語法(Lisp)描述指標的計算公式
Lisp 是一套自定義的語法,用戶可以使用 Lisp 來描述指標的計算公式。其目標是爲了構建統一的指標計算公式處理範式,屏蔽底層的執行引擎的語法細節,最大化優化業務配置和生成指標的效率。
Lisp 總體格式 (oprator param1 param2 ...) param 可以是一個參數,也可以是一個 Lisp 表達式。目前已經實現的功能:
①聚合表達式
(count x [y,z...]), count distinct x over (partition by y,z);
在 Presto 中的實現是 approx_distinct(x,e) over (partition by y,z),在 Spark 中的實現是 approx_count_distinct(x,e) over (partition by y,z)。y,z 只在開窗函數模式下才生效。目前也支持嵌套的聚合表達式 (sum (sum (max x)))。
②條件表達式
case when 實現 when1 爲條件 bool 或者被比較值 then1 爲對應輸出 elseX 爲最後的 else 輸出
- 簡單模式 (case value val1 then1 [val2 then2] ... [elseVal])
eg:(case subject_id (int 2) (int 1)) -> case subject_id when 2 then 1 end)
- 查找模式 (case when1 then1 [when2 then2] ... [elseVal])
eg:(case (= subject_id (string goods_base)) (int 2) (int 1)) -> case when subject_id = 'goods_base' then 2 else 1 end
③類型標識表達式
(int xx) xx 標識成 數字型
(string xx) xx 標識成 字符串類型
null 直接返回 null
④類型轉換表達式
(cast bigint xx)
(cast double xx)
(cast string xx)
⑤聚合通用表達式
(func a b c ...) 通用 Lisp 表達式 a 爲函數名 後續字段爲表達式元素 如 (func bar 1 2 3) 解析爲 bar(1, 2, 3)
⑥非聚合通用表達式
(func_none a b c ...) 通用 Lisp 表達式 a 爲函數名 後續字段爲表達式元素 如 (func_none bar 1 2 3) 解析爲 bar(1, 2, 3) ,設置 Lisp 對象的 aggregation 屬性爲 false
例如:(func_none json_extract_scalar 40079 '$.m_name')
2)Lisp 語法的解析
Lisp 的解析和翻譯是基於 antlr4 來實現的,處理流程如下:
圖 7. Lisp 處理流程圖
將 Lisp(count x y) 表達式通過 antlr 翻譯成語法樹,如下圖所示:
圖 8. 語法樹
通過自定義的 Listener 遍歷語法樹
在遍歷語法樹的過程中,結合指標的 query engine(presto/spark/clickhouse/mysql) 元數據生成對應的查詢引擎的 SQL 代碼 (approx_distinct(x,e) over (partition by y))
9、任務調度
基於 Netty 庫收發集羣消息,系統僅僅使用同一個線程池對象 EventLoopGroup 來收發消息,而用戶的業務邏輯,則交由一個單獨的線程池。
選用 Netty 的另外一個原因是 “零拷貝” 的能力,在大量數據返回時,通過文件的形式直接將結果送給調用者。
1)多隊列 + 多用戶調度
業務需求通常包含時間敏感與不敏感作業,爲了提高作業的穩定性和系統的可配置性,Hera 提供了多隊列作業調度的功能。
用戶在提交作業時可以顯式地指定一個作業隊列名,當這個作業在提交到集羣時,如果相應的隊列有空閒,則就會被添加進相應的隊列中,否則返回具體的錯誤給客戶端,如任務隊列滿、隊列名不存在、隊列已經關閉等,客戶端可以選擇 “是否重試提交”。
當一個作業被添加進隊列之後,Master 就會立即嘗試調度這個隊列中的作業,基於以下條件選擇合適的作業運行:
-
每個隊列都有自己的權重,同時會設置佔用整個集羣的資源總量,如最多使用多少內存、最多運行的任務數量等。
-
隊列中的任務也有自己的權重,同時會記錄這個作業入隊的時間,在排序當前隊列的作業時,利用入隊的時間偏移量和總的超時時間,計算得到一個最終的評分。
-
除了調度系統本身的調度策略外,還需要考慮外部計算集羣的負載,在從某個隊列中拿出一個作業後,再進行一次過濾,或者是先過濾,再進行作業的評分計算。
一個可用的計算作業評分模型如下:
隊列動態因子 = 隊列大小 / 隊列容量 * (1 - 作業運行數 / 隊列並行度)
這個等式表示的意義是:如果某個隊列正在等待的作業的佔比比較大,同時並行運行的作業數佔比也比較大時,這個隊列的作業就擁有一個更大的因子,也就意味着在隊列權重相同時,這個隊列中的作業應該被優先調度。
作業權重 = 1 - (當前時間 - 入隊時間) / 超時時間
這個等式表示的意義是:在同一個隊列中,如果一個作業的剩餘超時時間越少,則意味着此作業將更快達到超時,因此它應該獲得更大的選擇機會。
score = 作業權重 + 隊列動態因子 + 隊列權重
這個等式表示的意義是:對於所有的隊列中的所有任務,首先決定一個作業是否優先被調度的因子是設置的隊列權重,例如權重爲 10 的隊列的作業,應該比權重爲 1 的隊列中的作業被優先調度,而不管作業本身的權重(是否會有很大的機率超時);其次影響作業調度優先級的因子是隊列動態因子,例如有兩個相同權重的隊列時,如果一個隊列的動態因子爲 0.5,另外一個隊列的動態因子是 0.3,那麼應該優先選擇動態因子爲 0.5 的隊列作業進行調度,而不管作業本身的權重;最後影響作業調度優先級的因子是作業權重,例如在同一個隊列中,有兩個權重分別爲 0.2 和 0.5 的作業,那麼爲了避免更多的作業超時,權重爲 0.2 的作業應該被優先調度。
簡單描述作業的排序過程就是,首先按隊列權重排序所有的隊列;對於有重複的隊列,則會計算每個隊列的動態因子,並按此因子排序;對於每一個隊列,作業的排序規則按作業的超時比率來排序;最終依次按序遍歷每一個隊列,嘗試從中選擇足夠多的作業運行,直到作業都被運行或是達到集羣限制條件。這裏說足夠多,是指每一個隊列都會有一個最大的並行度和最大資源佔比,這兩個限制隊列的參數組合,是爲了避免因某一個隊列的容量和並行度被設置的過大,可能超過了整個集羣,導致其它隊列被 “餓死” 的情況。
2)SQL 作業流程
用戶通過 Client 提交原始 SQL,這裏以 Presto SQL 爲例,Client 在提交作業時,指定了 SQL 路由,則會首先通過訪問 SQLParser 服務,在發送給 Master 之前,會首先提交 SQL 語句到 SQLParser 服務器,將 SQL 解析成後端計算集羣可以支持的 SQL 語句,如 Spark、Presto、ClickHouse 等,爲了能夠減少 RPC 交互次數,SQLParser 會一次返回所有可能被改寫的 SQL 語句。
在接收到 SQLParser 服務返回的多個可能 SQL 語句後,就會填充當前的作業對象,真正開始向 Master 提交運行。
Master 在收到用戶提交的作業後,會根據一定的調度策略,最終將任務分發到合適的 Worker 上,開始執行。Worker 會首先採用 SQL 作業默認的執行引擎,比如 Presto,提交到對應的計算集羣運行,但如果因爲某種原因不能得到結果,則會嘗試使用其它的計算引擎進行計算。當然這裏也可以同時向多個計算集羣提交作業,一旦某個集羣首先返回結果時,就取消所有其它的作業,不過這需要其它計算集羣的入口能夠支持取消操作。
當 SQL 作業完成後,將結果返回到 Worker 端,爲了能夠更加高效地將查詢結果返回給 Client 端,Worker 會從 Master 發送的任務對象中提取 Client 側信息,並將結果直接發送給 Client,直到收到確認信息,至此整個任務纔算執行完畢。
在整個作業的流轉過程中,會以任務的概念在調度系統中進行傳播,並經歷幾個狀態的更新,分別標識 new、waiting、running、succeed、failed 階段。
圖 9. SQL 作業處理流程
3)Metrics 採集
數據服務蒐集兩類 metrics,一類靜態的,用於描述 master/worker/client 的基本信息;一類是動態的,描述 master/worker 的運行時信息。這裏主要說明一下有關集羣動態信息的採集過程及作用。以 worker 爲例,當 worker 成功註冊到 master 時,就會開啓定時心跳彙報動作,並借道心跳請求,將自己的運行時信息彙報給 master。
這裏主要是內存使用情況,例如當前 worker 通過估算方法,統計目前運行的任務佔據了多少內存,以便 master 能夠在後續的任務分發過程中,能夠根據內存信息進行決策。master 會統計它所管理的集羣整個情況,例如每個任務隊列的快照信息、worker 的快照信息、集羣的運行時配置信息等,並通過參數控制是否打印這些信息,以便調試。
四、調用情況
目前數據服務每天調用量:
-
toC: 9000W+/ 每天。
-
toB:150W+ / 每天(透傳到執行 engine 端調用量)。
-
ETL 任務執行時間基本在 3 分鐘左右完成;
-
adhoc 查詢目前主要有 Spark Thrift Server,Presto,Clickhouse 3 種引擎,大部分 SQL 90% 2s 左右完成,Clickhouse 查詢 99% 在 1s 左右完成,Presto 調用量 50W+/ 日, Clickhouse 調用量 44W+/ 日。
五、解決的性能問題
數據服務主要解決 SLA 方面的問題。如人羣計算、數據無縫遷移、數據產品 SLA 等,這裏用人羣舉例說明如下:
1)人羣計算遇到的問題:
-
人羣計算任務的數據本地性不好;
-
HDFS 存在數據熱點問題;
-
HDFS 讀寫本身存在長尾現象。
2)數據服務改造新的架構方案:
-
計算與存儲同置,這樣數據就不需通過網絡反覆讀取,造成網絡流量浪費。
-
減少 HDFS 讀寫長尾對人羣計算造成的額外影響,同時減少人羣計算對於 HDFS 穩定性的影響。
-
廣告人羣計算介於線上生產任務跟離線任務之間的任務類型。這裏我們希望能保證這類應用的可靠性和穩定性,從而更好地爲公司業務賦能
-
通過數據服務執行人羣計算。
圖 10. Alluxio 和 Spark 集羣混部
1、基於 Alluxio 的緩存表同步
將 Hive 表的 location 從 HDFS 路徑替換爲 Alluxio 路徑,即表示該表的數據存儲於 Alluxio 中。我們使用的方案不是直接寫通過 ETL 任務寫 Alluxio 表的數據,而是由 Alluxio 主動去拉取同樣 Hive 表結構的 HDFS 中的數據,即我們創建了一個 HDFS 表的 Alluxio 緩存表。
基於 HDFS 的人羣計算底表的表結構如下:
CREATE TABLE hdfs.ads_tags_table
(
oaid_md5
string,
mid
string,
user_id
bigint,
.........
)
PARTITIONED BY (
dt
string)
LOCATION
'hdfs://xxxx/hdfs.db/ads_tags_table'
基於 Alluxio 的人羣計算底表的表結構如下:
CREATE TABLE alluxio.ads_tags_table
(
oaid_md5
string,
mid
string,
user_id
bigint,
.........
)
PARTITIONED BY (
dt
string COMMENT '????')
LOCATION
'alluxio://zk@IP1:2181,IP2:2181/alluxio.db/ads_tags_table'
兩個表結構的字段和分區定義完全相同。只有兩處不同點:通過不同的庫名區分了是 HDFS 的表還是 Alluxio 的表;location 具體確認了數據存儲的路徑是 HDFS 還是 Alluxio。
由於 Alluxio 不能感知到分區表的變化,我們開發了一個定時任務去自動感知源表的分區變化,使得 Hive 表的數據能夠同步到 Alluxio 中。
3)具體步驟如下:
-
定時任務發起輪詢,檢測源表是否有新增分區。
-
發起一個 SYN2ALLUXIO 的任務由數據服務執行。
-
任務執行腳本爲將 Alluxio 表添加與 HDFS 表相同的分區。
-
分區添加完成之後,Alluxio 會自動從 mount 的 HDFS 路徑完成數據同步。
圖 11. Alluxio 緩存表同步
2、人羣計算任務
上小節介紹瞭如何讓 Alluxio 和 HDFS 的 Hive 表保持數據同步,接下來需要做的就是讓任務計算的 Spark 任務跑在 Spark 與 Alluxio 混部的集羣上,充分利用數據的本地性以及計算資源的隔離性,提高人羣計算效率。
人羣標籤計算的 SQL 樣例如下:
INSERT INTO hive_advert.cd0000127760_full
SELECT result_id, '20210703'
FROM
(SELECT oaid_md5 AS result_id
FROM hdfs.ads_tags_table AS ta
WHERE ta.dt = '20210702' and xxxxxxx) AS t
上面是一個 Spark SQL 的 ETL,此處的 hdfs.ads_tags_table 即爲人羣計算依賴的底表,此表爲一個 HDFS location 的表。
人羣服務通過調用數據服務執行。數據服務根據底表分區是否同步到 Alluxio 決定是否需要下推是用 Alluxio 表來完成計算。如果底表數據已經同步到 Alluxio,則使用 Alluxio 表來做爲底表計算人羣。
下推邏輯是用 Alluxio 的表名替換原表,假設此處緩存的 Alluxio 表名爲 alluxio.ads_tags_table,那麼原 SQL 就會被改寫成如下:
INSERT INTO hive_advert.cd0000127760_full
SELECT result_id, '20210703'
FROM
(SELECT oaid_md5 AS result_id
FROM alluxio.ads_tags_table AS ta
WHERE ta.dt = '20210702' and xxxxxxx) AS t
依靠數據服務調度系統,通過用戶 SQL 改寫以及 Alluxio 和 Spark 計算結點混部模式,人羣計算任務提速了 10%~30%
六、小結
雖然截至今天,Hera 數據服務已經支持了很多生產業務,但目前仍有很多需要完善的地方:
-
不同 engine 存在同一個含義函數寫法不一致的情況。這種情況在 Presto 跟 ClickHouse 的函數比較時尤爲突出,如 Presto 的 strpos(string,substring)函數,在 Clickhouse 中爲 position(haystack, needle[, start_pos]),且這些函數的參數順序存在不一致的情況,如何更優雅地支持不同 engine 的差異情況還需要進一步思考。
-
人羣計算採用業界通用的 ClickHouse BitMap 解決方案落地,提升人羣的計算效率同時擴展數據服務的業務邊界。
-
數據服務支持調度的 HA 和災備完善,更好地在 K8s 上進行部署。
作者丨 唯品會數據中臺團隊
來源丨網址:https://www.infoq.cn/article/SNhV8IWXfl6j7O0GuVRB
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/321yJj4tP1Erp858R1M7JA