Hive SQL 遷移 Spark SQL 在滴滴的實踐

**桔妹導讀:**在滴滴 SQL 任務從 Hive 遷移到 Spark 後,Spark SQL 任務佔比提升至 85%,任務運行時間節省 40%,運行任務需要的計算資源節省 21%,內存資源節省 49%。在遷移過程中我們沉澱出一套遷移流程, 並且發現並解決了兩個引擎在語法,UDF,性能和功能方面的差異。

**1. **

遷移背景     

Spark 自從 2010 年面世,到 2020 年已經經過十年的發展,現在已經發展爲大數據批計算的首選引擎,在滴滴 Spark 是在 2015 年便開始落地使用,不過主要使用的場景是更多在數據挖掘和機器學習方向,對於數倉 SQL 方向,主要仍以 Hive SQL 爲主。

下圖是當前滴滴內部 SQL 任務的架構圖,滴滴各個業務線的離線任務是通過一站式數據開發平臺 DataStudio 調度的,DataStudio 把 SQL 任務提交到 HiveServer2 或者 Spark 兩種計算引擎上。兩個計算引擎均依賴資源管理器 YARN 和文件系統 HDFS。

在遷移之前我們面臨的主要問題有:

所以爲了 SQL 任務運行更快,更穩,團隊人力聚焦,對引擎有更強的掌控力,我們決定把 Hive SQL 遷移到 Spark SQL。

**2. **

遷移方案概要設計

Hive SQL 遷移到 Spark SQL 後需滿足以下條件:

爲了滿足以上三個條件, 一個很直觀的思路就是使用兩個引擎執行用戶 SQL,然後對比每個引擎的執行結果和資源消耗。 

爲了不影響用戶線上數據,使用兩個引擎執行用戶 SQL 有兩個可選方案:

下面詳細介紹這兩個方案:

1. 方案一:複用現有的 SQL 任務調度系統****

再部署一套 SQL 任務執行系統用來使用 Spark 執行所有的 SQL,包括 HDFS,HiveServer2&MetaStore 和 Spark,DataStudio。新部署的系統需要週期性從生產環境同步任務信息,元數據信息和 HDFS 數據,在這個新部署的系統中把 Hive SQL 任務改成 Spark SQL 類型任務,這樣一個用戶的 SQL 在原有系統中使用 Hive SQL 執行,在新部署的系統中使用 Spark 執行。如下圖所示,藍色的表示需要新部署的子系統。

2. 方案二:開發一個 SQL 雙跑工具****

SQL 雙跑工具,可以線下使用兩個引擎執行用戶的 SQL,具體流程如下:

3. 方案對比****

經過權衡, 我們決定採用方案二, 因爲:

**3. **

遷移方案詳細設計

1. Hive SQL 提取****

Hive SQL 提取包括以下步驟:

HiveHistoryParser 的主要功能是:

2. SQL 改寫 & 雙跑****

SQL 改寫會對上一步生成的每個原始 SQL 文件執行以下步驟:

SQL 雙跑步驟如下:

3. 結果對比****

結果對比時會遍歷每個回放記錄,統計以下指標:

具體流程如下:

分別對 Spark 和 Hive 的產出表執行以下 SQL,獲取表的概要信息

比較兩張表的概要信息:

4. 遷移****

遷移比較簡單, 步驟如下:

5. 問題排查 & 修復****

如果 SQL 是 “可遷移” 或者“經驗可遷移”,可以執行遷移,其它的任務需要排查,這部分是最耗時耗力的,遷移過程中大部分時間都是在調查和修復這些問題。修復之後再執行從頭開始,提取最新任務的 SQL,然後 SQL 改寫和雙跑,結果對比,滿足遷移條件則說明修復了問題,可以遷移,否則繼續排查,因此遷移過程是一個循環往復的過程,直到 SQL 滿足遷移條件,整體過程如下圖所示:

**4. **

引擎差異

在遷移的過程中我們發現了很多兩種引擎不同的地方,主要包括語法差異,UDF 差異,功能差異和性能差異。

1. 語法差異****

有些 Hive SQL 使用 Spark SQL 執行在語法分析階段就會出錯,有些語法差異我們在內部版本已經修復,目前正在反饋社區,正在和社區討論,還有一些目前沒有修復。

1.1 用例設計

1.2 未修復

2. ****UDF 差異

在排查數據不一致的 SQL 過程中,我們發現有些是因爲輸入數據的順序不同造成的, 這些差異邏輯上是正確的,而有些是 UDF 對異常值的處理方式不一致造成的,還有需要注意的是 UDF 執行環境不同造成的結果差異。

2.1 順序差異

這些因爲輸入數據的順序不同造成的結果差異邏輯上是一致的,對業務無影響,因此在遷移過程中可以忽略這些差異,這類差異的 SQL 任務屬於經驗可遷移。

2.1.1 collect_set

collect_set 執行結果的順序取決於記錄被掃描的順序,Spark SQL 執行過程中是多個任務併發執行的,因此記錄被讀取的順序是無法保證的.

2.1.2 collect_list

collect_list 執行結果的順序取決於記錄被掃描的順序,Spark SQL 執行過程中是多個任務併發執行的,因此記錄被讀取的順序是無法保證的。

2.1.3 row_number

執行 row_number 時,在一個分區內部,可以保證 order by 字段是有序的,對於非分區非 order by 字段的順序是沒有保證的。

2.1.4 map 類型字段讀寫

Map 類型是無序的,同一份數據,在 query 時顯示的各個 key 的順序會有變化。

2.1.5 sum(double/float)

這是由 float/double 類型的表示方式決定的,浮點數不能表示所有的實數,在執行運算過程中會有精度丟失,對於幾個浮點數,執行加法時的順序不同,結果有時就會不同。

2.1.6 順序差異解決方案

由以上 UDF 造成的差異可以忽略,相關任務如果在資源方面也有節省,那麼最終的狀態是經驗可遷移狀態,符合遷移條件。

2.2 非順序差異

下面幾個日期 / 時間相關函數,當有異常輸入是 Spark SQL 會返回 NULL,而 Hive SQL 會返回一個非 NULL 值。

2.2.1 datediff

對於異常日期,比如 0000-00-00 執行 datediff 兩者會存在差異。

2.2.2 unix_timestamp

對於 24 點 Spark 認爲是非法的返回 NULL,而 Hive 任務是正常的,下表時執行 unix_timestamp(concat('2020-06-01', '24:00:00')) 時的差異。

2.2.3 to_date

當月或者日是 00 時 Hive 仍然會返回一個日期,但是 Spark 會返回 NULL。

2.2.4 date_sub

當月或者日是 00 時 Hive 仍然會返回一個日期,但是 Spark 會返回 NULL。

2.2.5 date_add

當月或者日是 00 時 Hive 仍然會返回一個日期,但是 Spark 會返回 NULL。

2.2.6 非順序差異解決方案

這些差異是是因爲對異常 UDF 參數的處理邏輯不同造成的,雖然 Spark SQL 返回 NULL 更合理,但是現有的 Hive SQL 任務用戶適應了這種處理邏輯,所以爲了不影響現有 SQL 任務,我們對這類 UDF 做了兼容處理,用戶可以通過配置來決定使用 Hive 內置函數還是 Spark 的內置 UDF。

2.3 UDF 執行環境差異

2.3.1 差異說明

基於 MapReduce 的 Hive SQL 一個 Task 會啓動一個進程,進程中的主線程負責數據處理, 因此在 Hive SQL 中 UDF 只會在單程中執行。

而 Spark 一個 Executor 可能會啓動多個 Task,如下圖所示。因此在 Spark SQL 中自定義 UDF 時需要考慮線程安全問題。

2.3.2 差異解決方案

下面是一個非線程安全的示例,UDF 內部共享靜態變量,在執行 UDF 時會讀寫這個靜態變量。

解決方案也比較簡單,一種是加鎖,如下圖所示:

另一種是取消靜態成員,如下圖所示:

3. ****性能 & 功能差異

3.1 小文件合併

Hive SQL 可以通過設置以下配置合併小文件,MR Job 結束後,判斷生成文件的平均大小,如果小於閥值,就再啓動一個 Job 來合併文件。

目前 Spark SQL 不支持小文件合併,在遷移過程中,我們經常發現 Spark SQL 生成的文件數多於 Hive SQL,爲此我們參考 Hive SQL 的實現在 Spark SQL 中引入了小文件合併功能。

在 InsertIntoHiveTable 中判斷如果開啓小文件合併,並且文件的平均大小低於閾值則執行合併,合併之後再執行 loadTable 或者 loadPartition 操作。

3.2 Spark SQL 支持 Cluster 模式

Hive SQL 任務是 DataStudio 通過 beeline -f 執行的,客戶端只負責發送 SQL 語句給 HS2,已經獲取執行結果,因此是非常輕量的。而 Spark SQL 只支持 Client 模式,Driver 在 Client 進程中,因此 Client 模式執行 Spark SQL 時,有時會佔用很多的資源,DataStudio 無法感知 Spark Driver 的資源開銷,所以在 DataStudio 層面會帶來以下問題:

所以我們開發了 Spark SQL 支持 Cluster 模式,該模式只支持非交互式方式執行 SQL,包括 spark-sql -e 和 spark-sql -f,不支持交互式模式。

3.3 分區剪裁優化

遷移過程中我們發現大部分任務的分區條件包括 concat, concat_ws, substr 等 UDF, HiveServer2 會調用 MetaStore 的 getPartitionsByExpr 方法返回符合分區條件的有效分區,避免無效的掃描, 但是 Spark SQL 的分區剪裁只支持由 Attribute 和 Literal 組成 key/value 結構的謂詞條件,這一方面導致無法有效分區剪裁,會查詢所有分區的數據, 造成讀取大量無效數據,另一方面查詢所有分區的元數據,導致 MetaStore 對 MySQL 查詢壓力激增,導致 mysql 進程把 cpu 打滿。我們在社區版本的基礎上迭代支持了多種場景的分區聯合剪裁,目前能夠覆蓋生產任務 90% 以上的場景。

目前已經反饋社區,正在討論中,具體可參考 [SPARK-33707][SQL] Support multiple types of function partition pruning on hive metastore

**5. **

遷移結果

經過 6 個多月的團隊的努力,我們遷移了 1 萬多個 Hive SQL 任務到 Spark SQL,在遷移過程中,隨着 spark SQL 任務的增加,SQL 任務的執行時間在逐漸減少,從最初的 1000 + 秒下降到 600 + 秒如下圖所示:

遷移後 Spark SQL 任務佔比 85%,SQL 任務運行時間節省 40%,計算資源節省 21%,內存資源節省 49%,遷移的收益是非常大的。

**6. **

下一步計劃

遷移之後 Spark 已經成爲 SQL 任務的主流引擎,但是還有大量的 shell 類型任務使用 Hive 執行 SQL,所以後續我們會遷移 shell 類型任務,把 shell 中的 Hive SQL 遷移到 Spark SQL。

在生產環境中,有些 shuffle 比較中的任務經常會因爲 shuffle fetch 重試甚至失敗,我們想優化 Spark External Shuffle Service。

社區推出 Spark 3.x 也半年多了,在功能和性能上有很大提升,所以我們也想和社區保持同步,升級 Spark 到 3.x 版本。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/Gb6s59jkZPUS9NjTTeHKXQ