Hive 千億級數據傾斜解決方案
數據傾斜問題剖析
數據傾斜是分佈式系統不可避免的問題,任何分佈式系統都有幾率發生數據傾斜,但有些小夥伴在平時工作中感知不是很明顯。這裏要注意本篇文章的標題—“千億級數據”,爲什麼說千億級,因爲如果一個任務的數據量只有幾百萬,它即使發生了數據傾斜,所有數據都跑到一臺機器去執行,對於幾百萬的數據量,一臺機器執行起來還是毫無壓力的,這時數據傾斜對我們感知不大,只有數據達到一個量級時,一臺機器應付不了這麼多數據,這時如果發生數據傾斜,最後就很難算出結果。
所以就需要我們對數據傾斜的問題進行優化,儘量避免或減輕數據傾斜帶來的影響。
在解決數據傾斜問題之前,還要再提一句:沒有瓶頸時談論優化,都是自尋煩惱。
大家想想,在 map 和 reduce 兩個階段中,最容易出現數據傾斜的就是 reduce 階段,因爲 map 到 reduce 會經過 shuffle 階段,在 shuffle 中默認會按照 key 進行 hash,如果相同的 key 過多,那麼 hash 的結果就是大量相同的 key 進入到同一個 reduce 中,導致數據傾斜。
那麼有沒有可能在 map 階段就發生數據傾斜呢,是有這種可能的。
一個任務中,數據文件在進入 map 階段之前會進行切分,默認是 128M 一個數據塊,但是如果當對文件使用 GZIP 壓縮等不支持文件分割操作的壓縮方式時,MR 任務讀取壓縮後的文件時,是對它切分不了的,該壓縮文件只會被一個任務所讀取,如果有一個超大的不可切分的壓縮文件被一個 map 讀取時,就會發生 map 階段的數據傾斜。
所以,從本質上來說,發生數據傾斜的原因有兩種:一是任務中需要處理大量相同的 key 的數據。二是任務讀取不可分割的大文件。
數據傾斜解決方案
MapReduce 和 Spark 中的數據傾斜解決方案原理都是類似的,以下討論 Hive 使用 MapReduce 引擎引發的數據傾斜,Spark 數據傾斜也可以此爲參照。
1. 空值引發的數據傾斜
實際業務中有些大量的 null 值或者一些無意義的數據參與到計算作業中,表中有大量的 null 值,如果表之間進行 join 操作,就會有 shuffle 產生,這樣所有的 null 值都會被分配到一個 reduce 中,必然產生數據傾斜。
之前有小夥伴問,如果 A、B 兩表 join 操作,假如 A 表中需要 join 的字段爲 null,但是 B 表中需要 join 的字段不爲 null,這兩個字段根本就 join 不上啊,爲什麼還會放到一個 reduce 中呢?
這裏我們需要明確一個概念,數據放到同一個 reduce 中的原因不是因爲字段能不能 join 上,而是因爲 shuffle 階段的 hash 操作,只要 key 的 hash 結果是一樣的,它們就會被拉到同一個 reduce 中。
解決方案:
第一種:可以直接不讓 null 值參與 join 操作,即不讓 null 值有 shuffle 階段
SELECT *
FROM log a
JOIN users b
ON a.user_id IS NOT NULL
AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;
第二種:因爲 null 值參與 shuffle 時的 hash 結果是一樣的,那麼我們可以給 null 值隨機賦值,這樣它們的 hash 結果就不一樣,就會進到不同的 reduce 中:
SELECT *
FROM log a
LEFT JOIN users b ON CASE
WHEN a.user_id IS NULL THEN concat('hive_', rand())
ELSE a.user_id
END = b.user_id;
2. 不同數據類型引發的數據傾斜
對於兩個表 join,表 a 中需要 join 的字段 key 爲 int,表 b 中 key 字段既有 string 類型也有 int 類型。當按照 key 進行兩個表的 join 操作時,默認的 Hash 操作會按 int 型的 id 來進行分配,這樣所有的 string 類型都被分配成同一個 id,結果就是所有的 string 類型的字段進入到一個 reduce 中,引發數據傾斜。
解決方案:
如果 key 字段既有 string 類型也有 int 類型,默認的 hash 就都會按 int 類型來分配,那我們直接把 int 類型都轉爲 string 就好了,這樣 key 字段都爲 string,hash 時就按照 string 類型分配了:
SELECT *
FROM users a
LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);
3. 不可拆分大文件引發的數據傾斜
當集羣的數據量增長到一定規模,有些數據需要歸檔或者轉儲,這時候往往會對數據進行壓縮;當對文件使用 GZIP 壓縮等不支持文件分割操作的壓縮方式,在日後有作業涉及讀取壓縮後的文件時,該壓縮文件只會被一個任務所讀取。如果該壓縮文件很大,則處理該文件的 Map 需要花費的時間會遠多於讀取普通文件的 Map 時間,該 Map 任務會成爲作業運行的瓶頸。這種情況也就是 Map 讀取文件的數據傾斜。
解決方案:
這種數據傾斜問題沒有什麼好的解決方案,只能將使用 GZIP 壓縮等不支持文件分割的文件轉爲 bzip 和 zip 等支持文件分割的壓縮方式。
所以,我們在對文件進行壓縮時,爲避免因不可拆分大文件而引發數據讀取的傾斜,在數據壓縮的時候可以採用 bzip2 和 Zip 等支持文件分割的壓縮算法。
4. 數據膨脹引發的數據傾斜
在多維聚合計算時,如果進行分組聚合的字段過多,如下:
select a,b,c,count(1)from log group by a,b,c with rollup;
注:對於最後的
with rollup
關鍵字不知道大家用過沒,with rollup 是用來在分組統計數據的基礎上再進行統計彙總,即用來得到 group by 的彙總信息。
如果上面的 log 表的數據量很大,並且 Map 端的聚合不能很好地起到數據壓縮的情況下,會導致 Map 端產出的數據急速膨脹,這種情況容易導致作業內存溢出的異常。如果 log 表含有數據傾斜 key,會加劇 Shuffle 過程的數據傾斜。
解決方案:
可以拆分上面的 sql,將with rollup
拆分成如下幾個 sql:
SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;
SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;
SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;
SELECT NULL, NULL, NULL, COUNT(1)
FROM log;
但是,上面這種方式不太好,因爲現在是對 3 個字段進行分組聚合,那如果是 5 個或者 10 個字段呢,那麼需要拆解的 SQL 語句會更多。
在 Hive 中可以通過參數 hive.new.job.grouping.set.cardinality
配置的方式自動控制作業的拆解,該參數默認值是 30。表示針對 grouping sets/rollups/cubes 這類多維聚合的操作,如果最後拆解的鍵組合大於該值,會啓用新的任務去處理大於該值之外的組合。如果在處理數據時,某個分組聚合的列有較大的傾斜,可以適當調小該值。
5. 表連接時引發的數據傾斜
兩表進行普通的 repartition join 時,如果表連接的鍵存在傾斜,那麼在 Shuffle 階段必然會引起數據傾斜。
解決方案:
通常做法是將傾斜的數據存到分佈式緩存中,分發到各個 Map 任務所在節點。在 Map 階段完成 join 操作,即 MapJoin,這避免了 Shuffle,從而避免了數據傾斜。
MapJoin 是 Hive 的一種優化操作,其適用於小表 JOIN 大表的場景,由於表的 JOIN 操作是在 Map 端且在內存進行的,所以其並不需要啓動 Reduce 任務也就不需要經過 shuffle 階段,從而能在一定程度上節省資源提高 JOIN 效率。
在 Hive 0.11 版本之前,如果想在 Map 階段完成 join 操作,必須使用 MAPJOIN 來標記顯示地啓動該優化操作,由於其需要將小表加載進內存所以要注意小表的大小。
如將 a 表放到 Map 端內存中執行,在 Hive 0.11 版本之前需要這樣寫:
select /* +mapjoin(a) */ a.id , a.name, b.age
from a join b
on a.id = b.id;
如果想將多個表放到 Map 端內存中,只需在 mapjoin() 中寫多個表名稱即可,用逗號分隔,如將 a 表和 c 表放到 Map 端內存中,則 /* +mapjoin(a,c) */
。
在 Hive 0.11 版本及之後,Hive 默認啓動該優化,也就是不在需要顯示的使用 MAPJOIN 標記,其會在必要的時候觸發該優化操作將普通 JOIN 轉換成 MapJoin,可以通過以下兩個屬性來設置該優化的觸發時機:
hive.auto.convert.join=true
默認值爲 true,自動開啓 MAPJOIN 優化。
hive.mapjoin.smalltable.filesize=2500000
默認值爲 2500000(25M),通過配置該屬性來確定使用該優化的表的大小,如果表的大小小於此值就會被加載進內存中。
注意:使用默認啓動該優化的方式如果出現莫名其妙的 BUG(比如 MAPJOIN 並不起作用),就將以下兩個屬性置爲 fase 手動使用 MAPJOIN 標記來啓動該優化:
hive.auto.convert.join=false
(關閉自動 MAPJOIN 轉換操作)
hive.ignore.mapjoin.hint=false
(不忽略 MAPJOIN 標記)
再提一句:將表放到 Map 端內存時,如果節點的內存很大,但還是出現內存溢出的情況,我們可以通過這個參數 mapreduce.map.memory.mb
調節 Map 端內存的大小。
6. 確實無法減少數據量引發的數據傾斜
在一些操作中,我們沒有辦法減少數據量,如在使用 collect_list 函數時:
select s_age,collect_list(s_score) list_score
from student
group by s_age
collect_list:將分組中的某列轉爲一個數組返回。
在上述 sql 中,s_age 如果存在數據傾斜,當數據量大到一定的數量,會導致處理傾斜的 reduce 任務產生內存溢出的異常。
注:collect_list 輸出一個數組,中間結果會放到內存中,所以如果 collect_list 聚合太多數據,會導致內存溢出。
有小夥伴說這是 group by 分組引起的數據傾斜,可以開啓hive.groupby.skewindata
參數來優化。我們接下來分析下:
開啓該配置會將作業拆解成兩個作業,第一個作業會盡可能將 Map 的數據平均分配到 Reduce 階段,並在這個階段實現數據的預聚合,以減少第二個作業處理的數據量;第二個作業在第一個作業處理的數據基礎上進行結果的聚合。
hive.groupby.skewindata
的核心作用在於生成的第一個作業能夠有效減少數量。但是對於 collect_list 這類要求全量操作所有數據的中間結果的函數來說,明顯起不到作用,反而因爲引入新的作業增加了磁盤和網絡 I/O 的負擔,而導致性能變得更爲低下。
解決方案:
這類問題最直接的方式就是調整 reduce 所執行的內存大小。
調整 reduce 的內存大小使用mapreduce.reduce.memory.mb
這個配置。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/hz_6io_ZybbOlmBQE4KSBQ