Hive SQL 遷移 Spark SQL 在滴滴的實踐
**桔妹導讀:**在滴滴 SQL 任務從 Hive 遷移到 Spark 後,Spark SQL 任務佔比提升至 85%,任務運行時間節省 40%,運行任務需要的計算資源節省 21%,內存資源節省 49%。在遷移過程中我們沉澱出一套遷移流程, 並且發現並解決了兩個引擎在語法,UDF,性能和功能方面的差異。
**1. **
遷移背景
Spark 自從 2010 年面世,到 2020 年已經經過十年的發展,現在已經發展爲大數據批計算的首選引擎,在滴滴 Spark 是在 2015 年便開始落地使用,不過主要使用的場景是更多在數據挖掘和機器學習方向,對於數倉 SQL 方向,主要仍以 Hive SQL 爲主。
下圖是當前滴滴內部 SQL 任務的架構圖,滴滴各個業務線的離線任務是通過一站式數據開發平臺 DataStudio 調度的,DataStudio 把 SQL 任務提交到 HiveServer2 或者 Spark 兩種計算引擎上。兩個計算引擎均依賴資源管理器 YARN 和文件系統 HDFS。
在遷移之前我們面臨的主要問題有:
-
**SQL 任務運行慢:**遷移前 SQL 任務運行的平均時間是 20 分鐘,主要原因是佔比高達 83% 的 Hive SQL 任務運行時間長,Hive 任務執行過程中會啓動多個 MR Job,Job 間的中間結果存儲在 HDFS,所以同一個 SQL, Hive 比 Spark 執行的時間更長;
-
**Hive SQL 穩定性差:**一個 HS2 會同時執行多個用戶的 Hive SQL 任務,當一個異常任務導致 HS2 進程響應慢甚至異常退出時,運行在同一個實例的 SQL 任務也會運行緩慢甚至失敗。而異常任務場景各異。我們曾經遇到的異常任務有多個大 SQL 加載過多的分區元數據導致 HS2 FullGC,加載 UDF 時導致 HS2 進程 core dump,UDF 訪問 HDFS 沒有關閉流導致 HS2 機器端口被打滿,這些沒有通用解法, 問題很難收斂;
-
**人力分散:**兩個引擎需要投入雙倍的人力,在人員有限的情況下,對引擎的掌控力會減弱;
所以爲了 SQL 任務運行更快,更穩,團隊人力聚焦,對引擎有更強的掌控力,我們決定把 Hive SQL 遷移到 Spark SQL。
**2. **
遷移方案概要設計
Hive SQL 遷移到 Spark SQL 後需滿足以下條件:
-
保證數據一致性,也就是相同的 SQL 使用 Spark 和 Hive 執行的結果應該是一樣的;
-
保證用戶有收益,也就是使用 Spark 執行 SQL 後應該節省資源,包括時間,cpu 和 memroy;
-
遷移過程對用戶透明;
爲了滿足以上三個條件, 一個很直觀的思路就是使用兩個引擎執行用戶 SQL,然後對比每個引擎的執行結果和資源消耗。
爲了不影響用戶線上數據,使用兩個引擎執行用戶 SQL 有兩個可選方案:
-
複用現有的 SQL 任務調度系統,再部署一套 SQL 任務調度系統用來遷移,這個系統與生產環境物理隔離;
-
開發一個 SQL 雙跑工具,可以支持使用兩個引擎執行同一個 SQL 任務;
下面詳細介紹這兩個方案:
▍1. 方案一:複用現有的 SQL 任務調度系統****
再部署一套 SQL 任務執行系統用來使用 Spark 執行所有的 SQL,包括 HDFS,HiveServer2&MetaStore 和 Spark,DataStudio。新部署的系統需要週期性從生產環境同步任務信息,元數據信息和 HDFS 數據,在這個新部署的系統中把 Hive SQL 任務改成 Spark SQL 類型任務,這樣一個用戶的 SQL 在原有系統中使用 Hive SQL 執行,在新部署的系統中使用 Spark 執行。如下圖所示,藍色的表示需要新部署的子系統。
▍2. 方案二:開發一個 SQL 雙跑工具****
SQL 雙跑工具,可以線下使用兩個引擎執行用戶的 SQL,具體流程如下:
-
**SQL 收集:**用戶的 SQL 是在 HS2 上執行的,所以理論上通過 HS2 可以收集到所有的 SQL;
-
**SQL 改寫:**執行用戶原始 SQL 會覆蓋線上數據,所以在執行前需要改寫 SQL,把 SQL 的輸出的庫表名替換爲用來遷移測試的的庫表名;
-
**SQL 雙跑:**分別使用 Hive 和 Spark 執行改寫後的 SQL;
▍3. 方案對比****
-
方案一
-
優勢
隔離性好, 單獨的 SQL 執行系統不會影響生產任務,也不會影響業務數據;
-
劣勢
-
需要的資源多:運行多個子系統需要較多物理資源;
-
部署複雜:部署多個子系統,需要多個不同的團隊相互配合;
-
容易出錯:子系統間需要週期性同步,任何一個子系統同步出問題,都可能導致執行 SQL 失敗;
-
方案二
-
優勢
非常輕量,不需要部署很多系統,而且對物理資源需要不高;
-
劣勢
-
與生產公共一套環境,回放時有影響用戶數據對風險;
-
需要開發 SQL 收集,SQL 改寫和 SQL 雙跑系統;
經過權衡, 我們決定採用方案二, 因爲:
-
通過 HiveServer 收集所有 SQL,SQL 改寫和 SQL 雙跑邏輯清晰,開發成本可控;
-
創建超讀帳號,對所有庫表有讀權限,但只對用戶遷移的測試庫有寫權限,可以避免影響用戶數據的風險;
**3. **
遷移方案詳細設計
▍1. Hive SQL 提取****
Hive SQL 提取包括以下步驟:
-
改造 HiveHistoryImpl,每個 session 內執行的所有 SQL 和 command 保存到 HiveServer2 的一個本地文件中,這些文件按天組織,每天一個目錄
-
定時將前一天的 History 目錄上傳到 hdfs
-
開發 HiveHistoryParser
HiveHistoryParser 的主要功能是:
-
每天從 HDFS 下載所有 HiveServer2 的 History 文件;
-
SQL 去重:DataStudio 上的一個 SQL 任務可能一天執行多次 (比如小時任務),任務執行一次會生成一個新的執行 Id,只保留一天中最大的執行 Id 的 SQL;
-
合併 SQL:一個 shell 任務可能建立多個 session 執行 SQL,爲了後面遷移 shell 任務,需要把多個 session 的 SQL 合併到一起;
-
輸出 Parse 結果:包括多個 SQL 文件和 meta 文件:
-
每個任務執行的 SQL 保存到一個文件中,文件名是任務名稱加執行 Id,我們稱作原始 SQL 文件;
-
meta 文件包含 SQL 文件路徑,任務名稱,項目名稱,用戶名;
▍2. SQL 改寫 & 雙跑****
SQL 改寫會對上一步生成的每個原始 SQL 文件執行以下步驟:
-
使用 Spark 的 SessionState 對 SQL 文件逐行分析,識別是否包含以下兩類子句:
-
insert overwrite into
-
create table as select
-
如果包含上面的兩類子句,則提取寫入的目標庫表名稱;
-
在測試庫中創建與目標庫表 schema 完全一致的兩個測試表;
-
分別使用上一步創建的測試庫表替換原始 SQL 文件中的庫表名生成用於回放的 SQL 文件,一個原始 SQL 文件改寫後會生成兩個 SQL 文件,用於後面兩個引擎分別執行;
SQL 雙跑步驟如下:
-
併發的使用 Spark 和 Hive 執行上一步生成的兩個 SQL 文件;
-
記錄使用兩種引擎執行 SQL 時啓動的 Application 和運行時間;
-
輸出回放結果到文件中,執行每個 SQL 文件對會生成一條結果記錄, 包括 Hive 和 Spark 執行 SQL 的時間,啓動的 Application 列表,和輸出的目標庫表名稱等, 如下圖所示:
▍3. 結果對比****
結果對比時會遍歷每個回放記錄,統計以下指標:
具體流程如下:
-
查詢 Spark SQL 和 Hive SQL 輸出的庫表的記錄數;
-
查詢兩種引擎輸出的 HDFS 文件個數和大小;
-
對比兩種引擎的輸出數據;
分別對 Spark 和 Hive 的產出表執行以下 SQL,獲取表的概要信息
比較兩張表的概要信息:
-
如果所有對應列的值相同則認爲結果一致;
-
如果存在不一致的列,如果該列是數值類型,則對該列計算最大精度差異, SQL 如下:
-
統計兩種引擎啓動的 Application 消耗的 vcore 和 memory 資源;
-
輸出對比結果, 包括運行時間, 消耗的 vcore 和 memory,是否一致,如果不一致輸出不一致的列名以及最大差異;
-
彙總數據結果,並對回放的 SQL 分爲以下幾類:
-
**可遷移:**數據完全一致, 並且使用 Spark SQL 執行使用更少資源,包括運行時間,vcore 和 memory 以及文件數;
-
**經驗可遷移:**在排查不一致時發現有些是邏輯正確的 (比如 collect_set 結果順序不一致),如果有些任務符合這些經驗,則認爲是經驗可遷移;
-
**數據不一致:**兩種引擎產出的結果存在不一致的列,而且沒有命中經驗;
-
**Time_High:**兩種引擎產出的結果完全一致,但是 Spark 執行 SQL 的運行時間大於 Hive 執行 SQL 的時間;
-
**Cpu_High:**兩種引擎產出的結果完全一致,但是 Spark 執行 SQL 消耗的 cpu 資源大於 Hive 執行 SQL 消耗的 cpu 資源;
-
**Memory_High:**兩種引擎產出的結果完全一致,但是 Spark 執行 SQL 消耗的 memory 資源大於 Hive 執行 SQL 消耗的 memory 資源;
-
**Files_High:**兩種引擎產出的結果完全一致,但是 Spark 執行 SQL 產生的文件數大於 Hive 執行 SQL 產生的文件數;
-
**語法不兼容:**在 SQL 改寫階段解析 SQL 時報語法錯誤;
-
**運行時異常:**在雙跑階段,Hive SQL 或者 Spark SQL 在運行過程中失敗;
▍4. 遷移****
遷移比較簡單, 步驟如下:
-
整理遷移任務列表以及對應的配置參數;
-
調用 DataStudio 接口把任務類型修改爲 SparkSQL 類型;
-
重跑任務;
▍5. 問題排查 & 修復****
如果 SQL 是 “可遷移” 或者“經驗可遷移”,可以執行遷移,其它的任務需要排查,這部分是最耗時耗力的,遷移過程中大部分時間都是在調查和修復這些問題。修復之後再執行從頭開始,提取最新任務的 SQL,然後 SQL 改寫和雙跑,結果對比,滿足遷移條件則說明修復了問題,可以遷移,否則繼續排查,因此遷移過程是一個循環往復的過程,直到 SQL 滿足遷移條件,整體過程如下圖所示:
**4. **
引擎差異
在遷移的過程中我們發現了很多兩種引擎不同的地方,主要包括語法差異,UDF 差異,功能差異和性能差異。
▍1. 語法差異****
有些 Hive SQL 使用 Spark SQL 執行在語法分析階段就會出錯,有些語法差異我們在內部版本已經修復,目前正在反饋社區,正在和社區討論,還有一些目前沒有修復。
1.1 用例設計
-
UDTF 新版 initialize 接口支持,對齊 Hive SQL [SPARK-33704]
-
Window Function 不支持沒有 order by 子句的場景
-
Join 子查詢支持 rand 隨機分佈條件,增強語法兼容
-
Orc/Orcfile 存儲類型創建語句屏蔽 ROW FORMAT DELIMITED 限制 [SPARK-33755]
-
DB.TB
識別支持,對齊 Hive SQL [SPARK-33686] -
支持 CREATE TEMPORARY TABLE
-
各類 Hive UDF 的支持調用,主要包括 get_json_object,datediff,unix_timestamp,to_date,collect_set,date_sub [SPARK-33721]
-
DROP 不存在的表和分區,Spark SQL 報錯,Hive SQL 正常 [SPARK-33637]
-
刪除分區時支持設置過濾條件 [SPARK-33691]
1.2 未修復
-
Map 類型字段不支持 GROUP BY 操作
-
Operation not allowed:ALTER TABLE CONCATENATE
▍2. ****UDF 差異
在排查數據不一致的 SQL 過程中,我們發現有些是因爲輸入數據的順序不同造成的, 這些差異邏輯上是正確的,而有些是 UDF 對異常值的處理方式不一致造成的,還有需要注意的是 UDF 執行環境不同造成的結果差異。
2.1 順序差異
這些因爲輸入數據的順序不同造成的結果差異邏輯上是一致的,對業務無影響,因此在遷移過程中可以忽略這些差異,這類差異的 SQL 任務屬於經驗可遷移。
2.1.1 collect_set
- 假設數據表如下:
- 執行如下 SQL:
- 執行結果:
- 差異說明:
collect_set 執行結果的順序取決於記錄被掃描的順序,Spark SQL 執行過程中是多個任務併發執行的,因此記錄被讀取的順序是無法保證的.
2.1.2 collect_list
- 假設數據表如下:
- 執行如下 SQL:
- 執行結果:
- 差異說明:
collect_list 執行結果的順序取決於記錄被掃描的順序,Spark SQL 執行過程中是多個任務併發執行的,因此記錄被讀取的順序是無法保證的。
2.1.3 row_number
- 假設數據表如下:
- 執行如下 SQL:
- 執行結果:
- 差異說明:
執行 row_number 時,在一個分區內部,可以保證 order by 字段是有序的,對於非分區非 order by 字段的順序是沒有保證的。
2.1.4 map 類型字段讀寫
- 數據表建表語句:
- 假設數據表如下:
- 執行如下 SQL:
- 執行結果:
- 差異說明:
Map 類型是無序的,同一份數據,在 query 時顯示的各個 key 的順序會有變化。
2.1.5 sum(double/float)
- 假設數據表如下:
- 執行如下 SQL:
- 執行結果:
- 差異說明:
這是由 float/double 類型的表示方式決定的,浮點數不能表示所有的實數,在執行運算過程中會有精度丟失,對於幾個浮點數,執行加法時的順序不同,結果有時就會不同。
2.1.6 順序差異解決方案
由以上 UDF 造成的差異可以忽略,相關任務如果在資源方面也有節省,那麼最終的狀態是經驗可遷移狀態,符合遷移條件。
2.2 非順序差異
下面幾個日期 / 時間相關函數,當有異常輸入是 Spark SQL 會返回 NULL,而 Hive SQL 會返回一個非 NULL 值。
2.2.1 datediff
對於異常日期,比如 0000-00-00 執行 datediff 兩者會存在差異。
2.2.2 unix_timestamp
對於 24 點 Spark 認爲是非法的返回 NULL,而 Hive 任務是正常的,下表時執行 unix_timestamp(concat('2020-06-01', '24:00:00')) 時的差異。
2.2.3 to_date
當月或者日是 00 時 Hive 仍然會返回一個日期,但是 Spark 會返回 NULL。
2.2.4 date_sub
當月或者日是 00 時 Hive 仍然會返回一個日期,但是 Spark 會返回 NULL。
2.2.5 date_add
當月或者日是 00 時 Hive 仍然會返回一個日期,但是 Spark 會返回 NULL。
2.2.6 非順序差異解決方案
這些差異是是因爲對異常 UDF 參數的處理邏輯不同造成的,雖然 Spark SQL 返回 NULL 更合理,但是現有的 Hive SQL 任務用戶適應了這種處理邏輯,所以爲了不影響現有 SQL 任務,我們對這類 UDF 做了兼容處理,用戶可以通過配置來決定使用 Hive 內置函數還是 Spark 的內置 UDF。
2.3 UDF 執行環境差異
2.3.1 差異說明
基於 MapReduce 的 Hive SQL 一個 Task 會啓動一個進程,進程中的主線程負責數據處理, 因此在 Hive SQL 中 UDF 只會在單程中執行。
而 Spark 一個 Executor 可能會啓動多個 Task,如下圖所示。因此在 Spark SQL 中自定義 UDF 時需要考慮線程安全問題。
2.3.2 差異解決方案
下面是一個非線程安全的示例,UDF 內部共享靜態變量,在執行 UDF 時會讀寫這個靜態變量。
解決方案也比較簡單,一種是加鎖,如下圖所示:
另一種是取消靜態成員,如下圖所示:
▍3. ****性能 & 功能差異
3.1 小文件合併
Hive SQL 可以通過設置以下配置合併小文件,MR Job 結束後,判斷生成文件的平均大小,如果小於閥值,就再啓動一個 Job 來合併文件。
目前 Spark SQL 不支持小文件合併,在遷移過程中,我們經常發現 Spark SQL 生成的文件數多於 Hive SQL,爲此我們參考 Hive SQL 的實現在 Spark SQL 中引入了小文件合併功能。
在 InsertIntoHiveTable 中判斷如果開啓小文件合併,並且文件的平均大小低於閾值則執行合併,合併之後再執行 loadTable 或者 loadPartition 操作。
3.2 Spark SQL 支持 Cluster 模式
Hive SQL 任務是 DataStudio 通過 beeline -f 執行的,客戶端只負責發送 SQL 語句給 HS2,已經獲取執行結果,因此是非常輕量的。而 Spark SQL 只支持 Client 模式,Driver 在 Client 進程中,因此 Client 模式執行 Spark SQL 時,有時會佔用很多的資源,DataStudio 無法感知 Spark Driver 的資源開銷,所以在 DataStudio 層面會帶來以下問題:
-
形成資源熱點,影響任務執行;
-
隨着遷移到 Spark SQL 的任務越來越多,DataStudio 需要越來越多的機器調度 SQL 任務;
-
Client 模式日誌保留在本地,排查問題時不方便看日誌;
所以我們開發了 Spark SQL 支持 Cluster 模式,該模式只支持非交互式方式執行 SQL,包括 spark-sql -e 和 spark-sql -f,不支持交互式模式。
3.3 分區剪裁優化
遷移過程中我們發現大部分任務的分區條件包括 concat, concat_ws, substr 等 UDF, HiveServer2 會調用 MetaStore 的 getPartitionsByExpr 方法返回符合分區條件的有效分區,避免無效的掃描, 但是 Spark SQL 的分區剪裁只支持由 Attribute 和 Literal 組成 key/value 結構的謂詞條件,這一方面導致無法有效分區剪裁,會查詢所有分區的數據, 造成讀取大量無效數據,另一方面查詢所有分區的元數據,導致 MetaStore 對 MySQL 查詢壓力激增,導致 mysql 進程把 cpu 打滿。我們在社區版本的基礎上迭代支持了多種場景的分區聯合剪裁,目前能夠覆蓋生產任務 90% 以上的場景。
- concat/concat_ws 聯合剪裁場景
- substr 聯合剪裁場景
- concat/concat_ws&substr 組合場景
目前已經反饋社區,正在討論中,具體可參考 [SPARK-33707][SQL] Support multiple types of function partition pruning on hive metastore
**5. **
遷移結果
經過 6 個多月的團隊的努力,我們遷移了 1 萬多個 Hive SQL 任務到 Spark SQL,在遷移過程中,隨着 spark SQL 任務的增加,SQL 任務的執行時間在逐漸減少,從最初的 1000 + 秒下降到 600 + 秒如下圖所示:
遷移後 Spark SQL 任務佔比 85%,SQL 任務運行時間節省 40%,計算資源節省 21%,內存資源節省 49%,遷移的收益是非常大的。
**6. **
下一步計劃
遷移之後 Spark 已經成爲 SQL 任務的主流引擎,但是還有大量的 shell 類型任務使用 Hive 執行 SQL,所以後續我們會遷移 shell 類型任務,把 shell 中的 Hive SQL 遷移到 Spark SQL。
在生產環境中,有些 shuffle 比較中的任務經常會因爲 shuffle fetch 重試甚至失敗,我們想優化 Spark External Shuffle Service。
社區推出 Spark 3.x 也半年多了,在功能和性能上有很大提升,所以我們也想和社區保持同步,升級 Spark 到 3.x 版本。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/Gb6s59jkZPUS9NjTTeHKXQ