億級數據服務平臺:跟低效率、指標難統一的數倉說再見!

數據服務是數據中臺體系中的關鍵組成部分。作爲數倉對接上層應用的統一出入口,數據服務將數倉當作一個統一的 DB 來訪問,提供統一的 API 接口控制數據的流入及流出,能夠滿足用戶對不同類型數據的訪問需求。

電商平臺唯品會的數據服務自 2019 年開始建設,在公司內經歷了從無到有落地,再到爲超過 30 + 業務方提供 to B、to C 的數據服務的過程。本文主要介紹唯品會自研數據服務 Hera 的相關背景、架構設計和核心功能。

一、背景

在統一數倉數據服務之前,數倉提供的訪問接入方式往往存在效率問題低、數據指標難統一等問題,具體而言有以下幾個比較突出的情況:

圖 1. 在統一數倉數據服務之前,數據流入流出方式

爲解決以上問題,數據服務應運而生。目前數據服務的主要優勢有:屏蔽底層的存儲引擎、計算引擎,使用同一個 API(one service),數倉數據分層存儲,不同 engine 的 SQL 生成能力,自適應 SQL 執行以及統一緩存架構保障業務 SLA,支持數據註冊並授權給任何調用方進行使用,提高數據交付效率。

通過唯一的 ID 標識,數據產品可通過 ID 查閱數據,而非直接訪問對應的數倉表。一方面,指標服務統一了指標的口徑,同時也支持快速構建新的數據產品。

二、架構設計

數據服務能給業務帶來運營和商業價值,核心在於給用戶提供自助分析數據能力。Hera 整體架構基於典型的 Master/slave 模型,數據流與控制流單獨鏈路,從而保障系統的高可用性。數據服務系統主要分爲三層:

圖 2. 數據服務整體架構圖

調度系統的整體流程大致包含以下模塊:

圖 3. 數據服務調度流程圖

三、主要功能

Hera 數據服務的主要功能有:多隊列調度策略、多引擎查詢、多任務類型、文件導出、資源隔離、引擎參數動態組裝、自適應 Engine 執行和 SQL 構建。

1、多隊列調度策略

數據服務支持按照不同用戶、不同任務類型並根據權重劃分不同調度隊列,以滿足不同任務類型的 SLA。

2、多引擎查詢

數據服務支持目前公司內部所有 OLAP 和數據庫類型,包括 Spark、Presto、Clickhouse、Hive 、MySQL、Redis。會根據業務具體場景和要求,選擇當前最佳的查詢引擎。

3、多任務類型

數據服務支持的任務類型有:ETL、Adhoc、文件導出、數據導入。加上多引擎功能,實現多種功能組合,如 Spark adhoc 和 Presto adhoc。

4、文件導出

主要是支持大量的數據從數據倉庫中導出,便於業務分析和處理,比如供應商發券和信息推送等。

具體執行過程如下:

用戶提交需要導出數據的 SQL,通過分佈式 engine 執行完成後,落地文件到 hdfs/alluxio. 客戶端通過 TCP 拉取文件到本地。千萬億級的數據導出耗時最多 10min。數據導出在人羣數據導出上性能由原來的 30min+ ,提升到最多不超過 3min,性能提升 10~30 倍。具體流程如下:

圖 4. 數據服務文件下載流程圖

5、資源隔離(Worker 資源和計算資源)

業務一般分爲核心和非核心,在資源分配和調度上也不同。主要是從執行任務 Worker 和引擎資源,都可以實現物理級別的隔離,最大化減少不同業務之間相互影響。

6、引擎參數動態組裝

線上業務執行需要根據業務情況進行調優,動態限制用戶資源使用,集羣整體切換等操作,這個時候就需要對用戶作業參數動態修改,如 OLAP 引擎執行任務時,經常都要根據任務調優,設置不同參數。針對這類問題,數據服務提供了根據引擎類型自動組裝引擎參數,並且引擎參數支持動態調整,也可以針對特定任務、執行賬號、業務類型來設定 OLAP 引擎執行參數。

7、自適應 Engine 執行

業務方在查詢時,有可能因爲引擎資源不足或者查詢條件數據類型不匹配從而導致執行失敗。爲了提高查詢成功率和服務 SLA 保障,設計了 Ad Hoc 自適應引擎執行,當一個引擎執行報錯後,會切換到另外一個引擎繼續執行。具體自適應執行邏輯如下圖所示:

圖 5. 自適應 Engine 執行

8、SQL 構建

數據服務 SQL 構建基於維度事實建模,支持單表模型、星型模型和雪花模型。

圖 6.SQL 維度模型

1)自定義語法(Lisp)描述指標的計算公式

Lisp 是一套自定義的語法,用戶可以使用 Lisp 來描述指標的計算公式。其目標是爲了構建統一的指標計算公式處理範式,屏蔽底層的執行引擎的語法細節,最大化優化業務配置和生成指標的效率。

Lisp 總體格式 (oprator param1 param2 ...) param 可以是一個參數,也可以是一個 Lisp 表達式。目前已經實現的功能:

①聚合表達式

(count x [y,z...]), count distinct x over (partition by y,z);

在 Presto 中的實現是 approx_distinct(x,e) over (partition by y,z),在 Spark 中的實現是 approx_count_distinct(x,e) over (partition by y,z)。y,z 只在開窗函數模式下才生效。目前也支持嵌套的聚合表達式 (sum (sum (max x)))。

②條件表達式

case when 實現 when1 爲條件 bool 或者被比較值 then1 爲對應輸出 elseX 爲最後的 else 輸出

eg:(case subject_id (int 2) (int 1)) ->  case subject_id when 2 then 1 end)

eg:(case (= subject_id (string goods_base)) (int 2) (int 1)) ->  case when subject_id = 'goods_base' then 2 else 1 end

③類型標識表達式

(int xx)        xx 標識成 數字型

(string xx)    xx 標識成 字符串類型

null             直接返回 null

④類型轉換表達式

(cast bigint xx)

(cast double xx)

(cast string xx)

⑤聚合通用表達式

(func a b c ...) 通用 Lisp 表達式  a 爲函數名 後續字段爲表達式元素  如 (func bar 1 2 3) 解析爲 bar(1, 2, 3)

⑥非聚合通用表達式

(func_none a b c ...) 通用 Lisp 表達式  a 爲函數名 後續字段爲表達式元素  如 (func_none bar 1 2 3) 解析爲 bar(1, 2, 3) ,設置 Lisp 對象的 aggregation 屬性爲 false

例如:(func_none json_extract_scalar 40079 '$.m_name')

2)Lisp 語法的解析

Lisp 的解析和翻譯是基於 antlr4 來實現的,處理流程如下:

圖 7. Lisp 處理流程圖

將 Lisp(count x y) 表達式通過 antlr 翻譯成語法樹,如下圖所示:

圖 8. 語法樹

通過自定義的 Listener 遍歷語法樹

在遍歷語法樹的過程中,結合指標的 query engine(presto/spark/clickhouse/mysql) 元數據生成對應的查詢引擎的 SQL 代碼 (approx_distinct(x,e) over (partition by y))

9、任務調度

基於 Netty 庫收發集羣消息,系統僅僅使用同一個線程池對象 EventLoopGroup 來收發消息,而用戶的業務邏輯,則交由一個單獨的線程池。

選用 Netty 的另外一個原因是 “零拷貝” 的能力,在大量數據返回時,通過文件的形式直接將結果送給調用者。

1)多隊列 + 多用戶調度

業務需求通常包含時間敏感與不敏感作業,爲了提高作業的穩定性和系統的可配置性,Hera 提供了多隊列作業調度的功能。

用戶在提交作業時可以顯式地指定一個作業隊列名,當這個作業在提交到集羣時,如果相應的隊列有空閒,則就會被添加進相應的隊列中,否則返回具體的錯誤給客戶端,如任務隊列滿、隊列名不存在、隊列已經關閉等,客戶端可以選擇 “是否重試提交”。

當一個作業被添加進隊列之後,Master 就會立即嘗試調度這個隊列中的作業,基於以下條件選擇合適的作業運行:

一個可用的計算作業評分模型如下:

隊列動態因子 = 隊列大小 / 隊列容量 * (1 - 作業運行數 / 隊列並行度)

這個等式表示的意義是:如果某個隊列正在等待的作業的佔比比較大,同時並行運行的作業數佔比也比較大時,這個隊列的作業就擁有一個更大的因子,也就意味着在隊列權重相同時,這個隊列中的作業應該被優先調度。

作業權重 = 1 - (當前時間 - 入隊時間) / 超時時間

這個等式表示的意義是:在同一個隊列中,如果一個作業的剩餘超時時間越少,則意味着此作業將更快達到超時,因此它應該獲得更大的選擇機會。

score = 作業權重 + 隊列動態因子 + 隊列權重

這個等式表示的意義是:對於所有的隊列中的所有任務,首先決定一個作業是否優先被調度的因子是設置的隊列權重,例如權重爲 10 的隊列的作業,應該比權重爲 1 的隊列中的作業被優先調度,而不管作業本身的權重(是否會有很大的機率超時);其次影響作業調度優先級的因子是隊列動態因子,例如有兩個相同權重的隊列時,如果一個隊列的動態因子爲 0.5,另外一個隊列的動態因子是 0.3,那麼應該優先選擇動態因子爲 0.5 的隊列作業進行調度,而不管作業本身的權重;最後影響作業調度優先級的因子是作業權重,例如在同一個隊列中,有兩個權重分別爲 0.2 和 0.5 的作業,那麼爲了避免更多的作業超時,權重爲 0.2 的作業應該被優先調度。

簡單描述作業的排序過程就是,首先按隊列權重排序所有的隊列;對於有重複的隊列,則會計算每個隊列的動態因子,並按此因子排序;對於每一個隊列,作業的排序規則按作業的超時比率來排序;最終依次按序遍歷每一個隊列,嘗試從中選擇足夠多的作業運行,直到作業都被運行或是達到集羣限制條件。這裏說足夠多,是指每一個隊列都會有一個最大的並行度和最大資源佔比,這兩個限制隊列的參數組合,是爲了避免因某一個隊列的容量和並行度被設置的過大,可能超過了整個集羣,導致其它隊列被 “餓死” 的情況。

2)SQL 作業流程

用戶通過 Client 提交原始 SQL,這裏以 Presto SQL 爲例,Client 在提交作業時,指定了 SQL 路由,則會首先通過訪問 SQLParser 服務,在發送給 Master 之前,會首先提交 SQL 語句到 SQLParser 服務器,將 SQL 解析成後端計算集羣可以支持的 SQL 語句,如 Spark、Presto、ClickHouse 等,爲了能夠減少 RPC 交互次數,SQLParser 會一次返回所有可能被改寫的 SQL 語句。

在接收到 SQLParser 服務返回的多個可能 SQL 語句後,就會填充當前的作業對象,真正開始向 Master 提交運行。

Master 在收到用戶提交的作業後,會根據一定的調度策略,最終將任務分發到合適的 Worker 上,開始執行。Worker 會首先採用 SQL 作業默認的執行引擎,比如 Presto,提交到對應的計算集羣運行,但如果因爲某種原因不能得到結果,則會嘗試使用其它的計算引擎進行計算。當然這裏也可以同時向多個計算集羣提交作業,一旦某個集羣首先返回結果時,就取消所有其它的作業,不過這需要其它計算集羣的入口能夠支持取消操作。

當 SQL 作業完成後,將結果返回到 Worker 端,爲了能夠更加高效地將查詢結果返回給 Client 端,Worker 會從 Master 發送的任務對象中提取 Client 側信息,並將結果直接發送給 Client,直到收到確認信息,至此整個任務纔算執行完畢。

在整個作業的流轉過程中,會以任務的概念在調度系統中進行傳播,並經歷幾個狀態的更新,分別標識 new、waiting、running、succeed、failed 階段。

圖 9. SQL 作業處理流程

3)Metrics 採集

數據服務蒐集兩類 metrics,一類靜態的,用於描述 master/worker/client 的基本信息;一類是動態的,描述 master/worker 的運行時信息。這裏主要說明一下有關集羣動態信息的採集過程及作用。以 worker 爲例,當 worker 成功註冊到 master 時,就會開啓定時心跳彙報動作,並借道心跳請求,將自己的運行時信息彙報給 master。

這裏主要是內存使用情況,例如當前 worker 通過估算方法,統計目前運行的任務佔據了多少內存,以便 master 能夠在後續的任務分發過程中,能夠根據內存信息進行決策。master 會統計它所管理的集羣整個情況,例如每個任務隊列的快照信息、worker 的快照信息、集羣的運行時配置信息等,並通過參數控制是否打印這些信息,以便調試。

四、調用情況

目前數據服務每天調用量:

五、解決的性能問題

數據服務主要解決 SLA 方面的問題。如人羣計算、數據無縫遷移、數據產品 SLA 等,這裏用人羣舉例說明如下:

1)人羣計算遇到的問題:

2)數據服務改造新的架構方案:

圖 10. Alluxio 和 Spark 集羣混部

1、基於 Alluxio 的緩存表同步

將 Hive 表的 location 從 HDFS 路徑替換爲 Alluxio 路徑,即表示該表的數據存儲於 Alluxio 中。我們使用的方案不是直接寫通過 ETL 任務寫 Alluxio 表的數據,而是由 Alluxio 主動去拉取同樣 Hive 表結構的 HDFS 中的數據,即我們創建了一個 HDFS 表的 Alluxio 緩存表。

基於 HDFS 的人羣計算底表的表結構如下:

CREATE TABLE hdfs.ads_tags_table(

  oaid_md5 string, 

  mid string, 

  user_id bigint, 

   .........

  )

PARTITIONED BY ( 

  dt string)

LOCATION

  'hdfs://xxxx/hdfs.db/ads_tags_table'

基於 Alluxio 的人羣計算底表的表結構如下:

CREATE TABLE alluxio.ads_tags_table(

  oaid_md5 string, 

  mid string, 

  user_id bigint, 

   .........

  )

PARTITIONED BY ( 

  dt string COMMENT '????')

LOCATION

  'alluxio://zk@IP1:2181,IP2:2181/alluxio.db/ads_tags_table'

兩個表結構的字段和分區定義完全相同。只有兩處不同點:通過不同的庫名區分了是 HDFS 的表還是 Alluxio 的表;location 具體確認了數據存儲的路徑是 HDFS 還是 Alluxio。

由於 Alluxio 不能感知到分區表的變化,我們開發了一個定時任務去自動感知源表的分區變化,使得 Hive 表的數據能夠同步到 Alluxio 中。

3)具體步驟如下:

圖 11. Alluxio 緩存表同步

2、人羣計算任務

上小節介紹瞭如何讓 Alluxio 和 HDFS 的 Hive 表保持數據同步,接下來需要做的就是讓任務計算的 Spark 任務跑在 Spark 與 Alluxio 混部的集羣上,充分利用數據的本地性以及計算資源的隔離性,提高人羣計算效率。

人羣標籤計算的 SQL 樣例如下:

INSERT INTO hive_advert.cd0000127760_full 

SELECT result_id, '20210703'

FROM 

       (SELECT oaid_md5 AS result_id

       FROM hdfs.ads_tags_table AS ta

       WHERE ta.dt = '20210702' and xxxxxxx) AS t

上面是一個 Spark SQL 的 ETL,此處的 hdfs.ads_tags_table 即爲人羣計算依賴的底表,此表爲一個 HDFS location 的表。

人羣服務通過調用數據服務執行。數據服務根據底表分區是否同步到 Alluxio 決定是否需要下推是用 Alluxio 表來完成計算。如果底表數據已經同步到 Alluxio,則使用 Alluxio 表來做爲底表計算人羣。

下推邏輯是用 Alluxio 的表名替換原表,假設此處緩存的 Alluxio 表名爲 alluxio.ads_tags_table,那麼原 SQL 就會被改寫成如下:

INSERT INTO hive_advert.cd0000127760_full 

SELECT result_id, '20210703'

FROM 

       (SELECT oaid_md5 AS result_id

       FROM alluxio.ads_tags_table AS ta

       WHERE ta.dt = '20210702' and xxxxxxx) AS t

依靠數據服務調度系統,通過用戶 SQL 改寫以及 Alluxio 和 Spark 計算結點混部模式,人羣計算任務提速了 10%~30%

六、小結

雖然截至今天,Hera 數據服務已經支持了很多生產業務,但目前仍有很多需要完善的地方:

作者丨 唯品會數據中臺團隊

來源丨網址:https://www.infoq.cn/article/SNhV8IWXfl6j7O0GuVRB

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/321yJj4tP1Erp858R1M7JA