Spark 性能優化指南——高級篇

繼基礎篇講解了每個 Spark 開發人員都必須熟知的開發調優與資源調優之後,本文作爲《Spark 性能優化指南》的高級篇,將深入分析數據傾斜調優與 shuffle 調優,以解決更加棘手的性能問題。

調優概述

有的時候,我們可能會遇到大數據計算中一個最棘手的問題——數據傾斜,此時 Spark 作業的性能會比期望差很多。數據傾斜調優,就是使用各種技術方案解決不同類型的數據傾斜問題,以保證 Spark 作業的性能。

數據傾斜發生時的現象

數據傾斜發生的原理

數據傾斜的原理很簡單:在進行 shuffle 的時候,必須將各個節點上相同的 key 拉取到某個節點上的一個 task 來進行處理,比如按照 key 進行聚合或 join 等操作。此時如果某個 key 對應的數據量特別大的話,就會發生數據傾斜。比如大部分 key 對應 10 條數據,但是個別 key 卻對應了 100 萬條數據,那麼大部分 task 可能就只會分配到 10 條數據,然後 1 秒鐘就運行完了;但是個別 task 可能分配到了 100 萬數據,要運行一兩個小時。因此,整個 Spark 作業的運行進度是由運行時間最長的那個 task 決定的。

因此出現數據傾斜的時候,Spark 作業看起來會運行得非常緩慢,甚至可能因爲某個 task 處理的數據量過大導致內存溢出。

下圖就是一個很清晰的例子:hello 這個 key,在三個節點上對應了總共 7 條數據,這些數據都會被拉取到同一個 task 中進行處理;而 world 和 you 這兩個 key 分別纔對應 1 條數據,所以另外兩個 task 只要分別處理 1 條數據即可。此時第一個 task 的運行時間可能是另外兩個 task 的 7 倍,而整個 stage 的運行速度也由運行最慢的那個 task 所決定。

如何定位導致數據傾斜的代碼

數據傾斜只會發生在 shuffle 過程中。這裏給大家羅列一些常用的並且可能會觸發 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出現數據傾斜時,可能就是你的代碼中使用了這些算子中的某一個所導致的。

某個 task 執行特別慢的情況

首先要看的,就是數據傾斜發生在第幾個 stage 中。

如果是用 yarn-client 模式提交,那麼本地是直接可以看到 log 的,可以在 log 中找到當前運行到了第幾個 stage;如果是用 yarn-cluster 模式提交,則可以通過 Spark Web UI 來查看當前運行到了第幾個 stage。此外,無論是使用 yarn-client 模式還是 yarn-cluster 模式,我們都可以在 Spark Web UI 上深入看一下當前這個 stage 各個 task 分配的數據量,從而進一步確定是不是 task 分配的數據不均勻導致了數據傾斜。

比如下圖中,倒數第三列顯示了每個 task 的運行時間。明顯可以看到,有的 task 運行特別快,只需要幾秒鐘就可以運行完;而有的 task 運行特別慢,需要幾分鐘才能運行完,此時單從運行時間上看就已經能夠確定發生數據傾斜了。此外,倒數第一列顯示了每個 task 處理的數據量,明顯可以看到,運行時間特別短的 task 只需要處理幾百 KB 的數據即可,而運行時間特別長的 task 需要處理幾千 KB 的數據,處理的數據量差了 10 倍。此時更加能夠確定是發生了數據傾斜。

知道數據傾斜發生在哪一個 stage 之後,接着我們就需要根據 stage 劃分原理,推算出來發生傾斜的那個 stage 對應代碼中的哪一部分,這部分代碼中肯定會有一個 shuffle 類算子。精準推算 stage 與代碼的對應關係,需要對 Spark 的源碼有深入的理解,這裏我們可以介紹一個相對簡單實用的推算方法:只要看到 Spark 代碼中出現了一個 shuffle 類算子或者是 Spark SQL 的 SQL 語句中出現了會導致 shuffle 的語句(比如 group by 語句),那麼就可以判定,以那個地方爲界限劃分出了前後兩個 stage。

這裏我們就以 Spark 最基礎的入門程序——單詞計數來舉例,如何用最簡單的方法大致推算出一個 stage 對應的代碼。如下示例,在整個代碼中,只有一個 reduceByKey 是會發生 shuffle 的算子,因此就可以認爲,以這個算子爲界限,會劃分出前後兩個 stage。* stage0,主要是執行從 textFile 到 map 操作,以及執行 shuffle write 操作。shuffle write 操作,我們可以簡單理解爲對 pairs RDD 中的數據進行分區操作,每個 task 處理的數據中,相同的 key 會寫入同一個磁盤文件內。* stage1,主要是執行從 reduceByKey 到 collect 操作,stage1 的各個 task 一開始運行,就會首先執行 shuffle read 操作。執行 shuffle read 操作的 task,會從 stage0 的各個 task 所在節點拉取屬於自己處理的那些 key,然後對同一個 key 進行全局性的聚合或 join 等操作,在這裏就是對 key 的 value 值進行累加。stage1 在執行完 reduceByKey 算子之後,就計算出了最終的 wordCounts RDD,然後會執行 collect 算子,將所有數據拉取到 Driver 上,供我們遍歷和打印輸出。

val conf = new SparkConf()
val sc = new SparkContext(conf)
 
val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
 
wordCounts.collect().foreach(println(_))

通過對單詞計數程序的分析,希望能夠讓大家瞭解最基本的 stage 劃分的原理,以及 stage 劃分後 shuffle 操作是如何在兩個 stage 的邊界處執行的。然後我們就知道如何快速定位出發生數據傾斜的 stage 對應代碼的哪一個部分了。比如我們在 Spark Web UI 或者本地 log 中發現,stage1 的某幾個 task 執行得特別慢,判定 stage1 出現了數據傾斜,那麼就可以回到代碼中定位出 stage1 主要包括了 reduceByKey 這個 shuffle 類算子,此時基本就可以確定是由 educeByKey 算子導致的數據傾斜問題。比如某個單詞出現了 100 萬次,其他單詞纔出現 10 次,那麼 stage1 的某個 task 就要處理 100 萬數據,整個 stage 的速度就會被這個 task 拖慢。

某個 task 莫名其妙內存溢出的情況

這種情況下去定位出問題的代碼就比較容易了。我們建議直接看 yarn-client 模式下本地 log 的異常棧,或者是通過 YARN 查看 yarn-cluster 模式下的 log 中的異常棧。一般來說,通過異常棧信息就可以定位到你的代碼中哪一行發生了內存溢出。然後在那行代碼附近找找,一般也會有 shuffle 類算子,此時很可能就是這個算子導致了數據傾斜。

但是大家要注意的是,不能單純靠偶然的內存溢出就判定發生了數據傾斜。因爲自己編寫的代碼的 bug,以及偶然出現的數據異常,也可能會導致內存溢出。因此還是要按照上面所講的方法,通過 Spark Web UI 查看報錯的那個 stage 的各個 task 的運行時間以及分配的數據量,才能確定是否是由於數據傾斜才導致了這次內存溢出。

查看導致數據傾斜的 key 的數據分佈情況

知道了數據傾斜發生在哪裏之後,通常需要分析一下那個執行了 shuffle 操作並且導致了數據傾斜的 RDD/Hive 表,查看一下其中 key 的分佈情況。這主要是爲之後選擇哪一種技術方案提供依據。針對不同的 key 分佈與不同的 shuffle 算子組合起來的各種情況,可能需要選擇不同的技術方案來解決。

此時根據你執行操作的情況不同,可以有很多種查看 key 分佈的方式:

  1. 如果是 Spark SQL 中的 group by、join 語句導致的數據傾斜,那麼就查詢一下 SQL 中使用的表的 key 分佈情況。

  2. 如果是對 Spark RDD 執行 shuffle 算子導致的數據傾斜,那麼可以在 Spark 作業中加入查看 key 分佈的代碼,比如 RDD.countByKey()。然後對統計出來的各個 key 出現的次數,collect/take 到客戶端打印一下,就可以看到 key 的分佈情況。

舉例來說,對於上面所說的單詞計數程序,如果確定了是 stage1 的 reduceByKey 算子導致了數據傾斜,那麼就應該看看進行 reduceByKey 操作的 RDD 中的 key 分佈情況,在這個例子中指的就是 pairs RDD。如下示例,我們可以先對 pairs 採樣 10% 的樣本數據,然後使用 countByKey 算子統計出每個 key 出現的次數,最後在客戶端遍歷和打印樣本數據中各個 key 的出現次數。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

數據傾斜的解決方案

解決方案一:使用 Hive ETL 預處理數據

方案適用場景:導致數據傾斜的是 Hive 表。如果該 Hive 表中的數據本身很不均勻(比如某個 key 對應了 100 萬數據,其他 key 纔對應了 10 條數據),而且業務場景需要頻繁使用 Spark 對 Hive 表執行某個分析操作,那麼比較適合使用這種技術方案。

方案實現思路:此時可以評估一下,是否可以通過 Hive 來進行數據預處理(即通過 Hive ETL 預先對數據按照 key 進行聚合,或者是預先和其他表進行 join),然後在 Spark 作業中針對的數據源就不是原來的 Hive 表了,而是預處理後的 Hive 表。此時由於數據已經預先進行過聚合或 join 操作了,那麼在 Spark 作業中也就不需要使用原先的 shuffle 類算子執行這類操作了。

方案實現原理:這種方案從根源上解決了數據傾斜,因爲徹底避免了在 Spark 中執行 shuffle 類算子,那麼肯定就不會有數據傾斜的問題了。但是這裏也要提醒一下大家,這種方式屬於治標不治本。因爲畢竟數據本身就存在分佈不均勻的問題,所以 Hive ETL 中進行 group by 或者 join 等 shuffle 操作時,還是會出現數據傾斜,導致 Hive ETL 的速度很慢。我們只是把數據傾斜的發生提前到了 Hive ETL 中,避免 Spark 程序發生數據傾斜而已。

方案優點:實現起來簡單便捷,效果還非常好,完全規避掉了數據傾斜,Spark 作業的性能會大幅度提升。

方案缺點:治標不治本,Hive ETL 中還是會發生數據傾斜。

方案實踐經驗:在一些 Java 系統與 Spark 結合使用的項目中,會出現 Java 代碼頻繁調用 Spark 作業的場景,而且對 Spark 作業的執行性能要求很高,就比較適合使用這種方案。將數據傾斜提前到上游的 Hive ETL,每天僅執行一次,只有那一次是比較慢的,而之後每次 Java 調用 Spark 作業時,執行速度都會很快,能夠提供更好的用戶體驗。

項目實踐經驗:在美團 · 點評的交互式用戶行爲分析系統中使用了這種方案,該系統主要是允許用戶通過 Java Web 系統提交數據分析統計任務,後端通過 Java 提交 Spark 作業進行數據分析統計。要求 Spark 作業速度必須要快,儘量在 10 分鐘以內,否則速度太慢,用戶體驗會很差。所以我們將有些 Spark 作業的 shuffle 操作提前到了 Hive ETL 中,從而讓 Spark 直接使用預處理的 Hive 中間表,儘可能地減少 Spark 的 shuffle 操作,大幅度提升了性能,將部分作業的性能提升了 6 倍以上。

解決方案二:過濾少數導致傾斜的 key

方案適用場景:如果發現導致傾斜的 key 就少數幾個,而且對計算本身的影響並不大的話,那麼很適合使用這種方案。比如 99% 的 key 就對應 10 條數據,但是隻有一個 key 對應了 100 萬數據,從而導致了數據傾斜。

方案實現思路:如果我們判斷那少數幾個數據量特別多的 key,對作業的執行和計算結果不是特別重要的話,那麼幹脆就直接過濾掉那少數幾個 key。比如,在 Spark SQL 中可以使用 where 子句過濾掉這些 key 或者在 Spark Core 中對 RDD 執行 filter 算子過濾掉這些 key。如果需要每次作業執行時,動態判定哪些 key 的數據量最多然後再進行過濾,那麼可以使用 sample 算子對 RDD 進行採樣,然後計算出每個 key 的數量,取數據量最多的 key 過濾掉即可。

方案實現原理:將導致數據傾斜的 key 給過濾掉之後,這些 key 就不會參與計算了,自然不可能產生數據傾斜。

方案優點:實現簡單,而且效果也很好,可以完全規避掉數據傾斜。

方案缺點:適用場景不多,大多數情況下,導致傾斜的 key 還是很多的,並不是只有少數幾個。

方案實踐經驗:在項目中我們也採用過這種方案解決數據傾斜。有一次發現某一天 Spark 作業在運行的時候突然 OOM 了,追查之後發現,是 Hive 表中的某一個 key 在那天數據異常,導致數據量暴增。因此就採取每次執行前先進行採樣,計算出樣本中數據量最大的幾個 key 之後,直接在程序中將那些 key 給過濾掉。

解決方案三:提高 shuffle 操作的並行度

方案適用場景:如果我們必須要對數據傾斜迎難而上,那麼建議優先使用這種方案,因爲這是處理數據傾斜最簡單的一種方案。

方案實現思路:在對 RDD 執行 shuffle 算子時,給 shuffle 算子傳入一個參數,比如 reduceByKey(1000),該參數就設置了這個 shuffle 算子執行時 shuffle read task 的數量。對於 Spark SQL 中的 shuffle 類語句,比如 group by、join 等,需要設置一個參數,即 spark.sql.shuffle.partitions,該參數代表了 shuffle read task 的並行度,該值默認是 200,對於很多場景來說都有點過小。

方案實現原理:增加 shuffle read task 的數量,可以讓原本分配給一個 task 的多個 key 分配給多個 task,從而讓每個 task 處理比原來更少的數據。舉例來說,如果原本有 5 個 key,每個 key 對應 10 條數據,這 5 個 key 都是分配給一個 task 的,那麼這個 task 就要處理 50 條數據。而增加了 shuffle read task 以後,每個 task 就分配到一個 key,即每個 task 就處理 10 條數據,那麼自然每個 task 的執行時間都會變短了。具體原理如下圖所示。

方案優點:實現起來比較簡單,可以有效緩解和減輕數據傾斜的影響。

方案缺點:只是緩解了數據傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。

方案實踐經驗:該方案通常無法徹底解決數據傾斜,因爲如果出現一些極端情況,比如某個 key 對應的數據量有 100 萬,那麼無論你的 task 數量增加到多少,這個對應着 100 萬數據的 key 肯定還是會分配到一個 task 中去處理,因此註定還是會發生數據傾斜的。所以這種方案只能說是在發現數據傾斜時嘗試使用的第一種手段,嘗試去用嘴簡單的方法緩解數據傾斜而已,或者是和其他方案結合起來使用。

解決方案四:兩階段聚合(局部聚合 + 全局聚合)

方案適用場景:對 RDD 執行 reduceByKey 等聚合類 shuffle 算子或者在 Spark SQL 中使用 group by 語句進行分組聚合時,比較適用這種方案。

方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個 key 都打上一個隨機數,比如 10 以內的隨機數,此時原先一樣的 key 就變成不一樣的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成 (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行 reduceByKey 等聚合操作,進行局部聚合,那麼局部聚合結果,就會變成了 (1_hello, 2) (2_hello, 2)。然後將各個 key 的前綴給去掉,就會變成 (hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如 (hello, 4)。

方案實現原理:將原本相同的 key 通過附加隨機前綴的方式,變成多個不同的 key,就可以讓原本被一個 task 處理的數據分散到多個 task 上去做局部聚合,進而解決單個 task 處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。具體原理見下圖。

方案優點:對於聚合類的 shuffle 操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將 Spark 作業的性能提升數倍以上。

方案缺點:僅僅適用於聚合類的 shuffle 操作,適用範圍相對較窄。如果是 join 類的 shuffle 操作,還得用其他的解決方案。

// 第一步,給RDD中的每個key都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });
  
// 第二步,對打上隨機前綴的key進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
  
// 第三步,去除RDD中每個key的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });
  
// 第四步,對去除了隨機前綴的RDD進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

解決方案五:將 reduce join 轉爲 map join

方案適用場景:在對 RDD 使用 join 類操作,或者是在 Spark SQL 中使用 join 語句時,而且 join 操作中的一個 RDD 或表的數據量比較小(比如幾百 M 或者一兩 G),比較適用此方案。

方案實現思路:不使用 join 算子進行連接操作,而使用 Broadcast 變量與 map 類算子實現 join 操作,進而完全規避掉 shuffle 類的操作,徹底避免數據傾斜的發生和出現。將較小 RDD 中的數據直接通過 collect 算子拉取到 Driver 端的內存中來,然後對其創建一個 Broadcast 變量;接着對另外一個 RDD 執行 map 類算子,在算子函數內,從 Broadcast 變量中獲取較小 RDD 的全量數據,與當前 RDD 的每一條數據按照連接 key 進行比對,如果連接 key 相同的話,那麼就將兩個 RDD 的數據用你需要的方式連接起來。

方案實現原理:普通的 join 是會走 shuffle 過程的,而一旦 shuffle,就相當於會將相同 key 的數據拉取到一個 shuffle read task 中再進行 join,此時就是 reduce join。但是如果一個 RDD 是比較小的,則可以採用廣播小 RDD 全量數據 + map 算子來實現與 join 同樣的效果,也就是 map join,此時就不會發生 shuffle 操作,也就不會發生數據傾斜。具體原理如下圖所示。

方案優點:對 join 操作導致的數據傾斜,效果非常好,因爲根本就不會發生 shuffle,也就根本不會發生數據傾斜。

方案缺點:適用場景較少,因爲這個方案只適用於一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗內存資源,driver 和每個 Executor 內存中都會駐留一份小 RDD 的全量數據。如果我們廣播出去的 RDD 數據比較大,比如 10G 以上,那麼就可能發生內存溢出了。因此並不適合兩個都是大表的情況。

// 首先將數據量比較小的RDD的數據,collect到Driver中來。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然後使用Spark的廣播功能,將小RDD的數據轉換成廣播變量,這樣每個Executor就只有一份RDD的數據。
// 可以儘可能節省內存空間,並且減少網絡傳輸性能開銷。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
  
// 對另外一個RDD執行map類操作,而不再是join類操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函數中,通過廣播變量,獲取到本地Executor中的rdd1數據。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以將rdd1的數據轉換爲一個Map,便於後面進行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 獲取當前RDD數據的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 從rdd1數據Map中,根據key獲取到可以join到的數據。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });
  
// 這裏得提示一下。
// 上面的做法,僅僅適用於rdd1中的key沒有重複,全部是唯一的場景。
// 如果rdd1中有多個相同的key,那麼就得用flatMap類的操作,在進行join的時候不能用map,而是得遍歷rdd1所有數據進行join。
// rdd2中每條數據都可能會返回多條join後的數據。

解決方案六:採樣傾斜 key 並分拆 join 操作

方案適用場景:兩個 RDD/Hive 表進行 join 的時候,如果數據量都比較大,無法採用 “解決方案五”,那麼此時可以看一下兩個 RDD/Hive 表中的 key 分佈情況。如果出現數據傾斜,是因爲其中某一個 RDD/Hive 表中的少數幾個 key 的數據量過大,而另一個 RDD/Hive 表中的所有 key 都分佈比較均勻,那麼採用這個解決方案是比較合適的。

方案實現思路

方案實現原理:對於 join 導致的數據傾斜,如果只是某幾個 key 導致了傾斜,可以將少數幾個 key 分拆成獨立 RDD,並附加隨機前綴打散成 n 份去進行 join,此時這幾個 key 對應的數據就不會集中在少數幾個 task 上,而是分散到多個 task 進行 join 了。具體原理見下圖。

方案優點:對於 join 導致的數據傾斜,如果只是某幾個 key 導致了傾斜,採用該方式可以用最有效的方式打散 key 進行 join。而且只需要針對少數傾斜 key 對應的數據進行擴容 n 倍,不需要對全量數據進行擴容。避免了佔用過多內存。

方案缺點:如果導致傾斜的 key 特別多的話,比如成千上萬個 key 都導致數據傾斜,那麼這種方式也不適合。

// 首先將數據量比較小的RDD的數據,collect到Driver中來。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然後使用Spark的廣播功能,將小RDD的數據轉換成廣播變量,這樣每個Executor就只有一份RDD的數據。
// 可以儘可能節省內存空間,並且減少網絡傳輸性能開銷。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
  
// 對另外一個RDD執行map類操作,而不再是join類操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函數中,通過廣播變量,獲取到本地Executor中的rdd1數據。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以將rdd1的數據轉換爲一個Map,便於後面進行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 獲取當前RDD數據的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 從rdd1數據Map中,根據key獲取到可以join到的數據。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });
  
// 這裏得提示一下。
// 上面的做法,僅僅適用於rdd1中的key沒有重複,全部是唯一的場景。
// 如果rdd1中有多個相同的key,那麼就得用flatMap類的操作,在進行join的時候不能用map,而是得遍歷rdd1所有數據進行join。
// rdd2中每條數據都可能會返回多條join後的數據。// 首先從包含了少數幾個導致數據傾斜key的rdd1中,採樣10%的樣本數據。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
  
// 對樣本數據RDD統計出每個key的出現次數,並按出現次數降序排序。
// 對降序排序後的數據,取出top 1或者top 100的數據,也就是key最多的前n個數據。
// 具體取出多少個數據量最多的key,由大家自己決定,我們這裏就取1個作爲示範。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
  
// 從rdd1中分拆出導致數據傾斜的key,形成獨立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 從rdd1中分拆出不導致數據傾斜的普通key,形成獨立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });
  
// rdd2,就是那個所有key的分佈相對較爲均勻的rdd。
// 這裏將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
// 對擴容的每條數據,都打上0~100的前綴。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }
              
        });
 
// 將rdd1中分拆出來的導致傾斜的key的獨立rdd,每條數據都打上100以內的隨機前綴。
// 然後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });
 
// 將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
 
// 將傾斜key join後的結果與普通key join後的結果,uinon起來。
// 就是最終的join結果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);// 首先從包含了少數幾個導致數據傾斜key的rdd1中,採樣10%的樣本數據。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
  
// 對樣本數據RDD統計出每個key的出現次數,並按出現次數降序排序。
// 對降序排序後的數據,取出top 1或者top 100的數據,也就是key最多的前n個數據。
// 具體取出多少個數據量最多的key,由大家自己決定,我們這裏就取1個作爲示範。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
  
// 從rdd1中分拆出導致數據傾斜的key,形成獨立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 從rdd1中分拆出不導致數據傾斜的普通key,形成獨立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });
  
// rdd2,就是那個所有key的分佈相對較爲均勻的rdd。
// 這裏將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
// 對擴容的每條數據,都打上0~100的前綴。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }
              
        });
 
// 將rdd1中分拆出來的導致傾斜的key的獨立rdd,每條數據都打上100以內的隨機前綴。
// 然後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });
 
// 將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
 
// 將傾斜key join後的結果與普通key join後的結果,uinon起來。
// 就是最終的join結果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

解決方案七:使用隨機前綴和擴容 RDD 進行 join

方案適用場景:如果在進行 join 操作時,RDD 中有大量的 key 導致數據傾斜,那麼進行分拆 key 也沒什麼意義,此時就只能使用最後一種方案來解決問題了。

方案實現思路

方案實現原理:將原先一樣的 key 通過附加隨機前綴變成不一樣的 key,然後就可以將這些處理後的 “不同 key” 分散到多個 task 中去處理,而不是讓一個 task 處理大量的相同 key。該方案與 “解決方案六” 的不同之處就在於,上一種方案是儘量只對少數傾斜 key 對應的數據進行特殊處理,由於處理過程需要擴容 RDD,因此上一種方案擴容 RDD 後對內存的佔用並不大;而這一種方案是針對有大量傾斜 key 的情況,沒法將部分 key 拆分出來進行單獨處理,因此只能對整個 RDD 進行數據擴容,對內存資源要求很高。

方案優點:對 join 類型的數據傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。

方案缺點:該方案更多的是緩解數據傾斜,而不是徹底避免數據傾斜。而且需要對整個 RDD 進行擴容,對內存資源要求很高。

方案實踐經驗:曾經開發一個數據需求的時候,發現一個 join 導致了數據傾斜。優化之前,作業的執行時間大約是 60 分鐘左右;使用該方案優化之後,執行時間縮短到 10 分鐘左右,性能提升了 6 倍。

// 首先將其中一個key分佈相對較爲均勻的RDD膨脹100倍。
JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(
        new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                }
                return list;
            }
        });
  
// 其次,將另一個有數據傾斜key的RDD,每條數據都打上100以內的隨機前綴。
JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        });
  
// 將兩個處理後的RDD進行join即可。
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

解決方案八:多種方案組合使用

在實踐中發現,很多情況下,如果只是處理較爲簡單的數據傾斜場景,那麼使用上述方案中的某一種基本就可以解決。但是如果要處理一個較爲複雜的數據傾斜場景,那麼可能需要將多種方案組合起來使用。比如說,我們針對出現了多個數據傾斜環節的 Spark 作業,可以先運用解決方案一和二,預處理一部分數據,並過濾一部分數據來緩解;其次可以對某些 shuffle 操作提升並行度,優化其性能;最後還可以針對不同的聚合或 join 操作,選擇一種方案來優化其性能。大家需要對這些方案的思路和原理都透徹理解之後,在實踐中根據各種不同的情況,靈活運用多種方案,來解決自己的數據傾斜問題。

調優概述

大多數 Spark 作業的性能主要就是消耗在了 shuffle 環節,因爲該環節包含了大量的磁盤 IO、序列化、網絡數據傳輸等操作。因此,如果要讓作業的性能更上一層樓,就有必要對 shuffle 過程進行調優。但是也必須提醒大家的是,影響一個 Spark 作業性能的因素,主要還是代碼開發、資源參數以及數據傾斜,shuffle 調優只能在整個 Spark 的性能調優中佔到一小部分而已。因此大家務必把握住調優的基本原則,千萬不要捨本逐末。下面我們就給大家詳細講解 shuffle 的原理,以及相關參數的說明,同時給出各個參數的調優建議。

ShuffleManager 發展概述

在 Spark 的源碼中,負責 shuffle 過程的執行、計算和處理的組件主要就是 ShuffleManager,也即 shuffle 管理器。而隨着 Spark 的版本的發展,ShuffleManager 也在不斷迭代,變得越來越先進。

在 Spark 1.2 以前,默認的 shuffle 計算引擎是 HashShuffleManager。該 ShuffleManager 而 HashShuffleManager 有着一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤 IO 操作影響了性能。

因此在 Spark 1.2 以後的版本中,默認的 ShuffleManager 改成了 SortShuffleManager。SortShuffleManager 相較於 HashShuffleManager 來說,有了一定的改進。主要就在於,每個 Task 在進行 shuffle 操作時,雖然也會產生較多的臨時磁盤文件,但是最後會將所有的臨時文件合併(merge)成一個磁盤文件,因此每個 Task 就只有一個磁盤文件。在下一個 stage 的 shuffle read task 拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。

下面我們詳細分析一下 HashShuffleManager 和 SortShuffleManager 的原理。

HashShuffleManager 運行原理

未經優化的 HashShuffleManager

下圖說明了未經優化的 HashShuffleManager 的原理。這裏我們先明確一個假設前提:每個 Executor 只有 1 個 CPU core,也就是說,無論這個 Executor 上分配多少個 task 線程,同一時間都只能執行一個 task 線程。

我們先從 shuffle write 開始說起。shuffle write 階段,主要就是在一個 stage 結束計算之後,爲了下一個 stage 可以執行 shuffle 類的算子(比如 reduceByKey),而將每個 task 處理的數據按 key 進行 “分類”。所謂 “分類”,就是對相同的 key 執行 hash 算法,從而將相同 key 都寫入同一個磁盤文件中,而每一個磁盤文件都只屬於下游 stage 的一個 task。在將數據寫入磁盤之前,會先將數據寫入內存緩衝中,當內存緩衝填滿之後,纔會溢寫到磁盤文件中去。

那麼每個執行 shuffle write 的 task,要爲下一個 stage 創建多少個磁盤文件呢?很簡單,下一個 stage 的 task 有多少個,當前 stage 的每個 task 就要創建多少份磁盤文件。比如下一個 stage 總共有 100 個 task,那麼當前 stage 的每個 task 都要創建 100 份磁盤文件。如果當前 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 Task,那麼每個 Executor 上總共就要創建 500 個磁盤文件,所有 Executor 上會創建 5000 個磁盤文件。由此可見,未經優化的 shuffle write 操作所產生的磁盤文件的數量是極其驚人的。

接着我們來說說 shuffle read。shuffle read,通常就是一個 stage 剛開始時要做的事情。此時該 stage 的每一個 task 就需要將上一個 stage 的計算結果中的所有相同 key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行 key 的聚合或連接等操作。由於 shuffle write 的過程中,task 給下游 stage 的每個 task 都創建了一個磁盤文件,因此 shuffle read 的過程中,每個 task 只要從上游 stage 的所有 task 所在節點上,拉取屬於自己的那一個磁盤文件即可。

shuffle read 的拉取過程是一邊拉取一邊進行聚合的。每個 shuffle read task 都會有一個自己的 buffer 緩衝,每次都只能拉取與 buffer 緩衝相同大小的數據,然後通過內存中的一個 Map 進行聚合等操作。聚合完一批數據後,再拉取下一批數據,並放到 buffer 緩衝中進行聚合操作。以此類推,直到最後將所有數據到拉取完,並得到最終的結果。

優化後的 HashShuffleManager

下圖說明了優化後的 HashShuffleManager 的原理。這裏說的優化,是指我們可以設置一個參數,spark.shuffle.consolidateFiles。該參數默認值爲 false,將其設置爲 true 即可開啓優化機制。通常來說,如果我們使用 HashShuffleManager,那麼都建議開啓這個選項。

開啓 consolidate 機制之後,在 shuffle write 過程中,task 就不是爲下游 stage 的每個 task 創建一個磁盤文件了。此時會出現 shuffleFileGroup 的概念,每個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量與下游 stage 的 task 數量是相同的。一個 Executor 上有多少個 CPU core,就可以並行執行多少個 task。而第一批並行執行的每個 task 都會創建一個 shuffleFileGroup,並將數據寫入對應的磁盤文件內。

當 Executor 的 CPU core 執行完一批 task,接着執行下一批 task 時,下一批 task 就會複用之前已有的 shuffleFileGroup,包括其中的磁盤文件。也就是說,此時 task 會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate 機制允許不同的 task 複用同一批磁盤文件,這樣就可以有效將多個 task 的磁盤文件進行一定程度上的合併,從而大幅度減少磁盤文件的數量,進而提升 shuffle write 的性能。

假設第二個 stage 有 100 個 task,第一個 stage 有 50 個 task,總共還是有 10 個 Executor,每個 Executor 執行 5 個 task。那麼原本使用未經優化的 HashShuffleManager 時,每個 Executor 會產生 500 個磁盤文件,所有 Executor 會產生 5000 個磁盤文件的。但是此時經過優化之後,每個 Executor 創建的磁盤文件的數量的計算公式爲:CPU core 的數量 * 下一個 stage 的 task 數量。也就是說,每個 Executor 此時只會創建 100 個磁盤文件,所有 Executor 只會創建 1000 個磁盤文件。

SortShuffleManager 運行原理

SortShuffleManager 的運行機制主要分成兩種,一種是普通運行機制,另一種是 bypass 運行機制。當 shuffle read task 的數量小於等於 spark.shuffle.sort.bypassMergeThreshold 參數的值時(默認爲 200),就會啓用 bypass 機制。

普通運行機制

下圖說明了普通的 SortShuffleManager 的原理。在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的 shuffle 算子,可能選用不同的數據結構。如果是 reduceByKey 這種聚合類的 shuffle 算子,那麼會選用 Map 數據結構,一邊通過 Map 進行聚合,一邊寫入內存;如果是 join 這種普通的 shuffle 算子,那麼會選用 Array 數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構之後,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那麼就會嘗試將內存數據結構中的數據溢寫到磁盤,然後清空內存數據結構。

在溢寫到磁盤文件之前,會先根據 key 對內存數據結構中已有的數據進行排序。排序過後,會分批將數據寫入磁盤文件。默認的 batch 數量是 10000 條,也就是說,排序好的數據,會以每批 1 萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通過 Java 的 BufferedOutputStream 實現的。BufferedOutputStream 是 Java 的緩衝輸出流,首先會將數據緩衝在內存中,當內存緩衝滿溢之後再一次寫入磁盤文件中,這樣可以減少磁盤 IO 次數,提升性能。

一個 task 將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最後會將之前所有的臨時磁盤文件都進行合併,這就是 merge 過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然後依次寫入最終的磁盤文件之中。此外,由於一個 task 就只對應一個磁盤文件,也就意味着該 task 爲下游 stage 的 task 準備的數據都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個 task 的數據在文件中的 start offset 與 end offset。

SortShuffleManager 由於有一個磁盤文件 merge 的過程,因此大大減少了文件數量。比如第一個 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,而第二個 stage 有 100 個 task。由於每個 task 最終只有一個磁盤文件,因此此時每個 Executor 上只有 5 個磁盤文件,所有 Executor 只有 50 個磁盤文件。

bypass 運行機制

下圖說明了 bypass SortShuffleManager 的原理。bypass 運行機制的觸發條件如下:

此時 task 會爲每個下游 task 都創建一個臨時磁盤文件,並將數據按 key 進行 hash 然後根據 key 的 hash 值,將 key 寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩衝,緩衝寫滿之後再溢寫到磁盤文件的。最後,同樣會將所有臨時磁盤文件都合併成一個磁盤文件,並創建一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的 HashShuffleManager 是一模一樣的,因爲都要創建數量驚人的磁盤文件,只是在最後會做一個磁盤文件的合併而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的 HashShuffleManager 來說,shuffle read 的性能會更好。

而該機制與普通 SortShuffleManager 運行機制的不同在於:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啓用該機制的最大好處在於,shuffle write 過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

shuffle 相關參數調優

以下是 Shffule 過程中的一些主要參數,這裏詳細講解了各個參數的功能、默認值以及基於實踐經驗給出的調優建議。

spark.shuffle.file.buffer

默認值:32k

參數說明:該參數用於設置 shuffle write task 的 BufferedOutputStream 的 buffer 緩衝大小。將數據寫到磁盤文件之前,會先寫入 buffer 緩衝中,待緩衝寫滿之後,纔會溢寫到磁盤。

調優建議:如果作業可用的內存資源較爲充足的話,可以適當增加這個參數的大小(比如 64k),從而減少 shuffle write 過程中溢寫磁盤文件的次數,也就可以減少磁盤 IO 次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提升。

spark.reducer.maxSizeInFlight

默認值:48m

參數說明:該參數用於設置 shuffle read task 的 buffer 緩衝大小,而這個 buffer 緩衝決定了每次能夠拉取多少數據。

調優建議:如果作業可用的內存資源較爲充足的話,可以適當增加這個參數的大小(比如 96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有 1%~5% 的提升。

spark.shuffle.io.maxRetries

默認值:3

參數說明:shuffle read task 從 shuffle write task 所在節點拉取屬於自己的數據時,如果因爲網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。

調優建議:對於那些包含了特別耗時的 shuffle 操作的作業,建議增加重試最大次數(比如 60 次),以避免由於 JVM 的 full gc 或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~ 上百億)的 shuffle 過程,調節該參數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

默認值:5s

參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是 5s。

調優建議:建議加大間隔時長(比如 60s),以增加 shuffle 操作的穩定性。

spark.shuffle.memoryFraction

默認值:0.2

參數說明:該參數代表了 Executor 內存中,分配給 shuffle read task 進行聚合操作的內存比例,默認是 20%。

調優建議:在資源參數調優中講解過這個參數。如果內存充足,而且很少使用持久化操作,建議調高這個比例,給 shuffle read 的聚合操作更多內存,以避免由於內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升 10% 左右。

spark.shuffle.manager

默認值:sort

參數說明:該參數用於設置 ShuffleManager 的類型。Spark 1.5 以後,有三個可選項:hash、sort 和 tungsten-sort。HashShuffleManager 是 Spark 1.2 以前的默認選項,但是 Spark 1.2 以及之後的版本默認都是 SortShuffleManager 了。tungsten-sort 與 sort 類似,但是使用了 tungsten 計劃中的堆外內存管理機制,內存使用效率更高。

調優建議:由於 SortShuffleManager 默認會對數據進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用默認的 SortShuffleManager 就可以;而如果你的業務邏輯不需要對數據進行排序,那麼建議參考後面的幾個參數調優,通過 bypass 機制或優化的 HashShuffleManager 來避免排序操作,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort 要慎用,因爲之前發現了一些相應的 bug。

spark.shuffle.sort.bypassMergeThreshold

默認值:200

參數說明:當 ShuffleManager 爲 SortShuffleManager 時,如果 shuffle read task 的數量小於這個閾值(默認是 200),則 shuffle write 過程中不會進行排序操作,而是直接按照未經優化的 HashShuffleManager 的方式去寫數據,但是最後會將每個 task 產生的所有臨時磁盤文件都合併成一個文件,並會創建單獨的索引文件。

調優建議:當你使用 SortShuffleManager 時,如果的確不需要排序操作,那麼建議將這個參數調大一些,大於 shuffle read task 的數量。那麼此時就會自動啓用 bypass 機制,map-side 就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此 shuffle write 性能有待提高。

spark.shuffle.consolidateFiles

默認值:false

參數說明:如果使用 HashShuffleManager,該參數有效。如果設置爲 true,那麼就會開啓 consolidate 機制,會大幅度合併 shuffle write 的輸出文件,對於 shuffle read task 數量特別多的情況下,這種方法可以極大地減少磁盤 IO 開銷,提升性能。

調優建議:如果的確不需要 SortShuffleManager 的排序機制,那麼除了使用 bypass 機制,還可以嘗試將 spark.shffle.manager 參數手動指定爲 hash,使用 HashShuffleManager,同時開啓 consolidate 機制。在實踐中嘗試過,發現其性能比開啓了 bypass 機制的 SortShuffleManager 要高出 10%~30%。

本文分別講解了開發過程中的優化原則、運行前的資源參數設置調優、運行中的數據傾斜的解決方案、爲了精益求精的 shuffle 調優。希望大家能夠在閱讀本文之後,記住這些性能調優的原則以及方案,在 Spark 作業開發、測試以及運行的過程中多嘗試,只有這樣,我們才能開發出更優的 Spark 作業,不斷提升其性能。


作者:李雪蕤

來源:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

版權申明:內容來源網絡,版權歸原創者所有。除非無法確認,我們都會標明作者及出處,如有侵權煩請告知,我們會立即刪除並表示歉意。謝謝!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/uy8DQG57QevGHAM8EQcKKw