Hive 千億級數據傾斜解決方案

數據傾斜問題剖析

數據傾斜是分佈式系統不可避免的問題,任何分佈式系統都有幾率發生數據傾斜,但有些小夥伴在平時工作中感知不是很明顯。這裏要注意本篇文章的標題—“千億級數據”,爲什麼說千億級,因爲如果一個任務的數據量只有幾百萬,它即使發生了數據傾斜,所有數據都跑到一臺機器去執行,對於幾百萬的數據量,一臺機器執行起來還是毫無壓力的,這時數據傾斜對我們感知不大,只有數據達到一個量級時,一臺機器應付不了這麼多數據,這時如果發生數據傾斜,最後就很難算出結果。

所以就需要我們對數據傾斜的問題進行優化,儘量避免或減輕數據傾斜帶來的影響。

在解決數據傾斜問題之前,還要再提一句:沒有瓶頸時談論優化,都是自尋煩惱。

大家想想,在 map 和 reduce 兩個階段中,最容易出現數據傾斜的就是 reduce 階段,因爲 map 到 reduce 會經過 shuffle 階段,在 shuffle 中默認會按照 key 進行 hash,如果相同的 key 過多,那麼 hash 的結果就是大量相同的 key 進入到同一個 reduce 中,導致數據傾斜。

那麼有沒有可能在 map 階段就發生數據傾斜呢,是有這種可能的。

一個任務中,數據文件在進入 map 階段之前會進行切分,默認是 128M 一個數據塊,但是如果當對文件使用 GZIP 壓縮等不支持文件分割操作的壓縮方式時,MR 任務讀取壓縮後的文件時,是對它切分不了的,該壓縮文件只會被一個任務所讀取,如果有一個超大的不可切分的壓縮文件被一個 map 讀取時,就會發生 map 階段的數據傾斜。

所以,從本質上來說,發生數據傾斜的原因有兩種:一是任務中需要處理大量相同的 key 的數據。二是任務讀取不可分割的大文件

數據傾斜解決方案

MapReduce 和 Spark 中的數據傾斜解決方案原理都是類似的,以下討論 Hive 使用 MapReduce 引擎引發的數據傾斜,Spark 數據傾斜也可以此爲參照。

1. 空值引發的數據傾斜

實際業務中有些大量的 null 值或者一些無意義的數據參與到計算作業中,表中有大量的 null 值,如果表之間進行 join 操作,就會有 shuffle 產生,這樣所有的 null 值都會被分配到一個 reduce 中,必然產生數據傾斜。

之前有小夥伴問,如果 A、B 兩表 join 操作,假如 A 表中需要 join 的字段爲 null,但是 B 表中需要 join 的字段不爲 null,這兩個字段根本就 join 不上啊,爲什麼還會放到一個 reduce 中呢?

這裏我們需要明確一個概念,數據放到同一個 reduce 中的原因不是因爲字段能不能 join 上,而是因爲 shuffle 階段的 hash 操作,只要 key 的 hash 結果是一樣的,它們就會被拉到同一個 reduce 中。

解決方案

第一種:可以直接不讓 null 值參與 join 操作,即不讓 null 值有 shuffle 階段

SELECT *
FROM log a
 JOIN users b
 ON a.user_id IS NOT NULL
  AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;

第二種:因爲 null 值參與 shuffle 時的 hash 結果是一樣的,那麼我們可以給 null 值隨機賦值,這樣它們的 hash 結果就不一樣,就會進到不同的 reduce 中:

SELECT *
FROM log a
 LEFT JOIN users b ON CASE 
   WHEN a.user_id IS NULL THEN concat('hive_', rand())
   ELSE a.user_id
  END = b.user_id;

2. 不同數據類型引發的數據傾斜

對於兩個表 join,表 a 中需要 join 的字段 key 爲 int,表 b 中 key 字段既有 string 類型也有 int 類型。當按照 key 進行兩個表的 join 操作時,默認的 Hash 操作會按 int 型的 id 來進行分配,這樣所有的 string 類型都被分配成同一個 id,結果就是所有的 string 類型的字段進入到一個 reduce 中,引發數據傾斜。

解決方案

如果 key 字段既有 string 類型也有 int 類型,默認的 hash 就都會按 int 類型來分配,那我們直接把 int 類型都轉爲 string 就好了,這樣 key 字段都爲 string,hash 時就按照 string 類型分配了:

SELECT *
FROM users a
 LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);

3. 不可拆分大文件引發的數據傾斜

當集羣的數據量增長到一定規模,有些數據需要歸檔或者轉儲,這時候往往會對數據進行壓縮;當對文件使用 GZIP 壓縮等不支持文件分割操作的壓縮方式,在日後有作業涉及讀取壓縮後的文件時,該壓縮文件只會被一個任務所讀取。如果該壓縮文件很大,則處理該文件的 Map 需要花費的時間會遠多於讀取普通文件的 Map 時間,該 Map 任務會成爲作業運行的瓶頸。這種情況也就是 Map 讀取文件的數據傾斜。

解決方案:

這種數據傾斜問題沒有什麼好的解決方案,只能將使用 GZIP 壓縮等不支持文件分割的文件轉爲 bzip 和 zip 等支持文件分割的壓縮方式。

所以,我們在對文件進行壓縮時,爲避免因不可拆分大文件而引發數據讀取的傾斜,在數據壓縮的時候可以採用 bzip2 和 Zip 等支持文件分割的壓縮算法

4. 數據膨脹引發的數據傾斜

在多維聚合計算時,如果進行分組聚合的字段過多,如下:

select a,b,c,count(1)from log group by a,b,c with rollup;

注:對於最後的with rollup關鍵字不知道大家用過沒,with rollup 是用來在分組統計數據的基礎上再進行統計彙總,即用來得到 group by 的彙總信息。

如果上面的 log 表的數據量很大,並且 Map 端的聚合不能很好地起到數據壓縮的情況下,會導致 Map 端產出的數據急速膨脹,這種情況容易導致作業內存溢出的異常。如果 log 表含有數據傾斜 key,會加劇 Shuffle 過程的數據傾斜。

解決方案

可以拆分上面的 sql,將with rollup拆分成如下幾個 sql:

SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;

SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;

SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;

SELECT NULL, NULL, NULL, COUNT(1)
FROM log;

但是,上面這種方式不太好,因爲現在是對 3 個字段進行分組聚合,那如果是 5 個或者 10 個字段呢,那麼需要拆解的 SQL 語句會更多。

在 Hive 中可以通過參數 hive.new.job.grouping.set.cardinality 配置的方式自動控制作業的拆解,該參數默認值是 30。表示針對 grouping sets/rollups/cubes 這類多維聚合的操作,如果最後拆解的鍵組合大於該值,會啓用新的任務去處理大於該值之外的組合。如果在處理數據時,某個分組聚合的列有較大的傾斜,可以適當調小該值。

5. 表連接時引發的數據傾斜

兩表進行普通的 repartition join 時,如果表連接的鍵存在傾斜,那麼在 Shuffle 階段必然會引起數據傾斜。

解決方案

通常做法是將傾斜的數據存到分佈式緩存中,分發到各個 Map 任務所在節點。在 Map 階段完成 join 操作,即 MapJoin,這避免了 Shuffle,從而避免了數據傾斜。

MapJoin 是 Hive 的一種優化操作,其適用於小表 JOIN 大表的場景,由於表的 JOIN 操作是在 Map 端且在內存進行的,所以其並不需要啓動 Reduce 任務也就不需要經過 shuffle 階段,從而能在一定程度上節省資源提高 JOIN 效率。

在 Hive 0.11 版本之前,如果想在 Map 階段完成 join 操作,必須使用 MAPJOIN 來標記顯示地啓動該優化操作,由於其需要將小表加載進內存所以要注意小表的大小

如將 a 表放到 Map 端內存中執行,在 Hive 0.11 版本之前需要這樣寫:

select /* +mapjoin(a) */ a.id , a.name, b.age 
from a join b 
on a.id = b.id;

如果想將多個表放到 Map 端內存中,只需在 mapjoin() 中寫多個表名稱即可,用逗號分隔,如將 a 表和 c 表放到 Map 端內存中,則 /* +mapjoin(a,c) */

在 Hive 0.11 版本及之後,Hive 默認啓動該優化,也就是不在需要顯示的使用 MAPJOIN 標記,其會在必要的時候觸發該優化操作將普通 JOIN 轉換成 MapJoin,可以通過以下兩個屬性來設置該優化的觸發時機:

hive.auto.convert.join=true 默認值爲 true,自動開啓 MAPJOIN 優化。

hive.mapjoin.smalltable.filesize=2500000 默認值爲 2500000(25M),通過配置該屬性來確定使用該優化的表的大小,如果表的大小小於此值就會被加載進內存中。

注意:使用默認啓動該優化的方式如果出現莫名其妙的 BUG(比如 MAPJOIN 並不起作用),就將以下兩個屬性置爲 fase 手動使用 MAPJOIN 標記來啓動該優化:

hive.auto.convert.join=false (關閉自動 MAPJOIN 轉換操作)

hive.ignore.mapjoin.hint=false (不忽略 MAPJOIN 標記)

再提一句:將表放到 Map 端內存時,如果節點的內存很大,但還是出現內存溢出的情況,我們可以通過這個參數 mapreduce.map.memory.mb 調節 Map 端內存的大小。

6. 確實無法減少數據量引發的數據傾斜

在一些操作中,我們沒有辦法減少數據量,如在使用 collect_list 函數時:

select s_age,collect_list(s_score) list_score
from student
group by s_age

collect_list:將分組中的某列轉爲一個數組返回。

在上述 sql 中,s_age 如果存在數據傾斜,當數據量大到一定的數量,會導致處理傾斜的 reduce 任務產生內存溢出的異常。

注:collect_list 輸出一個數組,中間結果會放到內存中,所以如果 collect_list 聚合太多數據,會導致內存溢出。

有小夥伴說這是 group by 分組引起的數據傾斜,可以開啓hive.groupby.skewindata參數來優化。我們接下來分析下:

開啓該配置會將作業拆解成兩個作業,第一個作業會盡可能將 Map 的數據平均分配到 Reduce 階段,並在這個階段實現數據的預聚合,以減少第二個作業處理的數據量;第二個作業在第一個作業處理的數據基礎上進行結果的聚合。

hive.groupby.skewindata的核心作用在於生成的第一個作業能夠有效減少數量。但是對於 collect_list 這類要求全量操作所有數據的中間結果的函數來說,明顯起不到作用,反而因爲引入新的作業增加了磁盤和網絡 I/O 的負擔,而導致性能變得更爲低下。

解決方案

這類問題最直接的方式就是調整 reduce 所執行的內存大小。

調整 reduce 的內存大小使用mapreduce.reduce.memory.mb這個配置。


本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/hz_6io_ZybbOlmBQE4KSBQ