Spark 性能優化指南——基礎篇

在大數據計算領域,Spark 已經成爲了越來越流行、越來越受歡迎的計算平臺之一。Spark 的功能涵蓋了大數據領域的離線批處理、SQL 類處理、流式 / 實時計算、機器學習、圖計算等各種不同類型的計算操作,應用範圍與前景非常廣泛。在美團 • 大衆點評,已經有很多同學在各種項目中嘗試使用 Spark。大多數同學(包括筆者在內),最初開始嘗試使用 Spark 的原因很簡單,主要就是爲了讓大數據計算作業的執行速度更快、性能更高。

然而,通過 Spark 開發出高性能的大數據計算作業,並不是那麼簡單的。如果沒有對 Spark 作業進行合理的調優,Spark 作業的執行速度可能會很慢,這樣就完全體現不出 Spark 作爲一種快速大數據計算引擎的優勢來。因此,想要用好 Spark,就必須對其進行合理的性能優化。

Spark 的性能調優實際上是由很多部分組成的,不是調節幾個參數就可以立竿見影提升作業性能的。我們需要根據不同的業務場景以及數據情況,對 Spark 作業進行綜合性的分析,然後進行多個方面的調節和優化,才能獲得最佳性能。

筆者根據之前的 Spark 作業開發經驗以及實踐積累,總結出了一套 Spark 作業的性能優化方案。整套方案主要分爲開發調優、資源調優、數據傾斜調優、shuffle 調優幾個部分。開發調優和資源調優是所有 Spark 作業都需要注意和遵循的一些基本原則,是高性能 Spark 作業的基礎;數據傾斜調優,主要講解了一套完整的用來解決 Spark 作業數據傾斜的解決方案;shuffle 調優,面向的是對 Spark 的原理有較深層次掌握和研究的同學,主要講解了如何對 Spark 作業的 shuffle 運行過程以及細節進行調優。

本文作爲 Spark 性能優化指南的基礎篇,主要講解開發調優以及資源調優。

調優概述

Spark 性能優化的第一步,就是要在開發 Spark 作業的過程中注意和應用一些性能優化的基本原則。開發調優,就是要讓大家瞭解以下一些 Spark 基本開發原則,包括:RDD lineage 設計、算子的合理使用、特殊操作的優化等。在開發過程中,時時刻刻都應該注意以上原則,並將這些原則根據具體的業務以及實際的應用場景,靈活地運用到自己的 Spark 作業中。

原則一:避免創建重複的 RDD

通常來說,我們在開發一個 Spark 作業時,首先是基於某個數據源(比如 Hive 表或 HDFS 文件)創建一個初始的 RDD;接着對這個 RDD 執行某個算子操作,然後得到下一個 RDD;以此類推,循環往復,直到計算出最終我們需要的結果。在這個過程中,多個 RDD 會通過不同的算子操作(比如 map、reduce 等)串起來,這個 “RDD 串”,就是 RDD lineage,也就是 “RDD 的血緣關係鏈”。

我們在開發過程中要注意:對於同一份數據,只應該創建一個 RDD,不能創建多個 RDD 來代表同一份數據。

一些 Spark 初學者在剛開始開發 Spark 作業時,或者是有經驗的工程師在開發 RDD lineage 極其冗長的 Spark 作業時,可能會忘了自己之前對於某一份數據已經創建過一個 RDD 了,從而導致對於同一份數據,創建了多個 RDD。這就意味着,我們的 Spark 作業會進行多次重複計算來創建多個代表相同數據的 RDD,進而增加了作業的性能開銷。

一個簡單的例子

// 需要對名爲“hello.txt”的HDFS文件進行一次map操作,再進行一次reduce操作。也就是說,需要對一份數據執行兩次算子操作。

// 錯誤的做法:對於同一份數據執行多次算子操作時,創建多個RDD。
// 這裏執行了兩次textFile方法,針對同一個HDFS文件,創建了兩個RDD出來,然後分別對每個RDD都執行了一個算子操作。
// 這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內容,並創建兩個單獨的RDD;第二次加載HDFS文件以及創建RDD的性能開銷,很明顯是白白浪費掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)

// 正確的用法:對於一份數據執行多次算子操作時,只使用一個RDD。
// 這種寫法很明顯比上一種寫法要好多了,因爲我們對於同一份數據只創建了一個RDD,然後對這一個RDD執行了多次算子操作。
// 但是要注意到這裏爲止優化還沒有結束,由於rdd1被執行了兩次算子操作,第二次執行reduce操作的時候,還會再次從源頭處重新計算一次rdd1的數據,因此還是會有重複計算的性能開銷。
// 要徹底解決這個問題,必須結合“原則三:對多次使用的RDD進行持久化”,才能保證一個RDD被多次使用時只被計算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)

原則二:儘可能複用同一個 RDD

除了要避免在開發過程中對一份完全相同的數據創建多個 RDD 之外,在對不同的數據執行算子操作時還要儘可能地複用一個 RDD。比如說,有一個 RDD 的數據格式是 key-value 類型的,另一個是單 value 類型的,這兩個 RDD 的 value 數據是完全一樣的。那麼此時我們可以只使用 key-value 類型的那個 RDD,因爲其中已經包含了另一個的數據。對於類似這種多個 RDD 的數據有重疊或者包含的情況,我們應該儘量複用一個 RDD,這樣可以儘可能地減少 RDD 的數量,從而儘可能減少算子執行的次數。

一個簡單的例子

// 錯誤的做法。

// 有一個<Long, String>格式的RDD,即rdd1。
// 接着由於業務需要,對rdd1執行了一個map操作,創建了一個rdd2,而rdd2中的數據僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)

// 分別對rdd1和rdd2執行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)

// 正確的做法。

// 上面這個case中,其實rdd1和rdd2的區別無非就是數據格式不同而已,rdd2的數據完全就是rdd1的子集而已,卻創建了兩個rdd,並對兩個rdd都執行了一次算子操作。
// 此時會因爲對rdd1執行map算子來創建rdd2,而多執行一次算子操作,進而增加性能開銷。

// 其實在這種情況下完全可以複用同一個RDD。
// 我們可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在進行第二個map操作時,只使用每個數據的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)

// 第二種方式相較於第一種方式而言,很明顯減少了一次rdd2的計算開銷。
// 但是到這裏爲止,優化還沒有結束,對rdd1我們還是執行了兩次算子操作,rdd1實際上還是會被計算兩次。
// 因此還需要配合“原則三:對多次使用的RDD進行持久化”進行使用,才能保證一個RDD被多次使用時只被計算一次。

原則三:對多次使用的 RDD 進行持久化

當你在 Spark 代碼中多次對一個 RDD 做了算子操作後,恭喜,你已經實現 Spark 作業第一步的優化了,也就是儘可能複用 RDD。此時就該在這個基礎之上,進行第二步優化了,也就是要保證對一個 RDD 執行多次算子操作時,這個 RDD 本身僅僅被計算一次。

Spark 中對於一個 RDD 執行多次算子的默認原理是這樣的:每次你對一個 RDD 執行一個算子操作時,都會重新從源頭處計算一遍,計算出那個 RDD 來,然後再對這個 RDD 執行你的算子操作。這種方式的性能是很差的。

因此對於這種情況,我們的建議是:對多次使用的 RDD 進行持久化。此時 Spark 就會根據你的持久化策略,將 RDD 中的數據保存到內存或者磁盤中。以後每次對這個 RDD 進行算子操作時,都會直接從內存或磁盤中提取持久化的 RDD 數據,然後執行算子,而不會從源頭處重新計算一遍這個 RDD,再執行算子操作。

對多次使用的 RDD 進行持久化的代碼示例

// 如果要對一個RDD進行持久化,只要對這個RDD調用cache()和persist()即可。

// 正確的做法。
// cache()方法表示:使用非序列化的方式將RDD中的數據全部嘗試持久化到內存中。
// 此時再對rdd1執行兩次算子操作時,只有在第一次執行map算子時,纔會將這個rdd1從源頭處計算一次。
// 第二次執行reduce算子時,就會直接從內存中提取數據進行計算,不會重複計算一個rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)

// persist()方法表示:手動選擇持久化級別,並使用指定的方式進行持久化。
// 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,內存充足時優先持久化到內存中,內存不充足時持久化到磁盤文件中。
// 而且其中的_SER後綴表示,使用序列化的方式來保存RDD數據,此時RDD中的每個partition都會序列化成一個大的字節數組,然後再持久化到內存或磁盤中。
// 序列化的方式可以減少持久化的數據對內存/磁盤的佔用量,進而避免內存被持久化數據佔用過多,從而發生頻繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

對於 persist() 方法而言,我們可以根據不同的業務場景選擇不同的持久化級別。

Spark 的持久化級別

如何選擇一種最合適的持久化策略

默認情況下,性能最高的當然是 MEMORY_ONLY,但前提是你的內存必須足夠足夠大,可以綽綽有餘地存放下整個 RDD 的所有數據。因爲不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個 RDD 的後續算子操作,都是基於純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要複製一份數據副本,並遠程傳送到其他節點上。但是這裏必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果 RDD 中數據比較多時(比如幾十億),直接用這種持久化級別,會導致 JVM 的 OOM 內存溢出異常。

如果使用 MEMORY_ONLY 級別時發生了內存溢出,那麼建議嘗試使用 MEMORY_ONLY_SER 級別。該級別會將 RDD 數據序列化後再保存在內存中,此時每個 partition 僅僅是一個字節數組而已,大大減少了對象數量,並降低了內存佔用。這種級別比 MEMORY_ONLY 多出來的性能開銷,主要就是序列化與反序列化的開銷。但是後續算子可以基於純內存進行操作,因此性能總體還是比較高的。此外,可能發生的問題同上,如果 RDD 中的數據量過多的話,還是可能會導致 OOM 內存溢出的異常。

如果純內存的級別都無法使用,那麼建議使用 MEMORY_AND_DISK_SER 策略,而不是 MEMORY_AND_DISK 策略。因爲既然到了這一步,就說明 RDD 的數據量很大,內存無法完全放下。序列化後的數據比較少,可以節省內存和磁盤的空間開銷。同時該策略會優先儘量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。

通常不建議使用 DISK_ONLY 和後綴爲_2 的級別:因爲完全基於磁盤文件進行數據的讀寫,會導致性能急劇降低,有時還不如重新計算一次所有 RDD。後綴爲_2 的級別,必須將所有數據都複製一份副本,併發送到其他節點上,數據複製以及網絡傳輸會導致較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。

原則四:儘量避免使用 shuffle 類算子

如果有可能的話,要儘量避免使用 shuffle 類算子。因爲 Spark 作業運行過程中,最消耗性能的地方就是 shuffle 過程。shuffle 過程,簡單來說,就是將分佈在集羣中多個節點上的同一個 key,拉取到同一個節點上,進行聚合或 join 等操作。比如 reduceByKey、join 等算子,都會觸發 shuffle 操作。

shuffle 過程中,各個節點上的相同 key 都會先寫入本地磁盤文件中,然後其他節點需要通過網絡傳輸拉取各個節點上的磁盤文件中的相同 key。而且相同 key 都拉取到同一個節點進行聚合操作時,還有可能會因爲一個節點上處理的 key 過多,導致內存不夠存放,進而溢寫到磁盤文件中。因此在 shuffle 過程中,可能會發生大量的磁盤文件讀寫的 IO 操作,以及數據的網絡傳輸操作。磁盤 IO 和網絡數據傳輸也是 shuffle 性能較差的主要原因。

因此在我們的開發過程中,能避免則儘可能避免使用 reduceByKey、join、distinct、repartition 等會進行 shuffle 的算子,儘量使用 map 類的非 shuffle 算子。這樣的話,沒有 shuffle 操作或者僅有較少 shuffle 操作的 Spark 作業,可以大大減少性能開銷。

Broadcast 與 map 進行 join 代碼示例

// 傳統的join操作會導致shuffle操作。
// 因爲兩個RDD中,相同的key都需要通過網絡拉取到一個節點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操作,不會導致shuffle操作。
// 使用Broadcast將一個數據量較小的RDD作爲廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數據。
// 然後進行遍歷,如果發現rdd2中某條數據的key與rdd1的當前數據的key是相同的,那麼就判定可以進行join。
// 此時就可以根據自己需要的方式,將rdd1當前數據與rdd2中可以連接的數據,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建議僅僅在rdd2的數據量比較少(比如幾百M,或者一兩G)的情況下使用。
// 因爲每個Executor的內存中,都會駐留一份rdd2的全量數據。

原則五:使用 map-side 預聚合的 shuffle 操作

如果因爲業務需要,一定要使用 shuffle 操作,無法用 map 類的算子來替代,那麼儘量使用可以 map-side 預聚合的算子。

所謂的 map-side 預聚合,說的是在每個節點本地對相同的 key 進行一次聚合操作,類似於 MapReduce 中的本地 combiner。map-side 預聚合之後,每個節點本地就只會有一條相同的 key,因爲多條相同的 key 都被聚合起來了。其他節點在拉取所有節點上的相同 key 時,就會大大減少需要拉取的數據數量,從而也就減少了磁盤 IO 以及網絡傳輸開銷。通常來說,在可能的情況下,建議使用 reduceByKey 或者 aggregateByKey 算子來替代掉 groupByKey 算子。因爲 reduceByKey 和 aggregateByKey 算子都會使用用戶自定義的函數對每個節點本地的相同 key 進行預聚合。而 groupByKey 算子是不會進行預聚合的,全量的數據會在集羣的各個節點之間分發和傳輸,性能相對來說比較差。

比如如下兩幅圖,就是典型的例子,分別基於 reduceByKey 和 groupByKey 進行單詞計數。其中第一張圖是 groupByKey 的原理圖,可以看到,沒有進行任何本地聚合時,所有數據都會在集羣節點之間傳輸;第二張圖是 reduceByKey 的原理圖,可以看到,每個節點本地的相同 key 數據,都進行了預聚合,然後才傳輸到其他節點上進行全局聚合。

原則六:使用高性能的算子

除了 shuffle 相關的算子有優化原則之外,其他的算子也都有着相應的優化原則。

使用 reduceByKey/aggregateByKey 替代 groupByKey

詳情見 “原則五:使用 map-side 預聚合的 shuffle 操作”。

使用 mapPartitions 替代普通 map

mapPartitions 類的算子,一次函數調用會處理一個 partition 所有的數據,而不是一次函數調用處理一條,性能相對來說會高一些。但是有的時候,使用 mapPartitions 會出現 OOM(內存溢出)的問題。因爲單次函數調用就要處理掉一個 partition 所有的數據,如果內存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現 OOM 異常。所以使用這類操作時要慎重!

使用 foreachPartitions 替代 foreach

原理類似於 “使用 mapPartitions 替代 map”,也是一次函數調用處理一個 partition 的所有數據,而不是一次函數調用處理一條數據。在實踐中發現,foreachPartitions 類的算子,對性能的提升還是很有幫助的。比如在 foreach 函數中,將 RDD 中所有數據寫 MySQL,那麼如果是普通的 foreach 算子,就會一條數據一條數據地寫,每次函數調用可能就會創建一個數據庫連接,此時就勢必會頻繁地創建和銷燬數據庫連接,性能是非常低下;但是如果用 foreachPartitions 算子一次性處理一個 partition 的數據,那麼對於每個 partition,只要創建一個數據庫連接即可,然後執行批量插入操作,此時性能是比較高的。實踐中發現,對於 1 萬條左右的數據量寫 MySQL,性能可以提升 30% 以上。

使用 filter 之後進行 coalesce 操作

通常對一個 RDD 執行 filter 算子過濾掉 RDD 中較多數據後(比如 30% 以上的數據),建議使用 coalesce 算子,手動減少 RDD 的 partition 數量,將 RDD 中的數據壓縮到更少的 partition 中去。因爲 filter 之後,RDD 的每個 partition 中都會有很多數據被過濾掉,此時如果照常進行後續的計算,其實每個 task 處理的 partition 中的數據量並不是很多,有一點資源浪費,而且此時處理的 task 越多,可能速度反而越慢。因此用 coalesce 減少 partition 數量,將 RDD 中的數據壓縮到更少的 partition 之後,只要使用更少的 task 即可處理完所有的 partition。在某些場景下,對於性能的提升會有一定的幫助。

使用 repartitionAndSortWithinPartitions 替代 repartition 與 sort 類操作

repartitionAndSortWithinPartitions 是 Spark 官網推薦的一個算子,官方建議,如果需要在 repartition 重分區之後,還要進行排序,建議直接使用 repartitionAndSortWithinPartitions 算子。因爲該算子可以一邊進行重分區的 shuffle 操作,一邊進行排序。shuffle 與 sort 兩個操作同時進行,比先 shuffle 再 sort 來說,性能可能是要高的。

原則七:廣播大變量

有時在開發過程中,會遇到需要在算子函數中使用外部變量的場景(尤其是大變量,比如 100M 以上的大集合),那麼此時就應該使用 Spark 的廣播(Broadcast)功能來提升性能。

在算子函數中使用到外部變量時,默認情況下,Spark 會將該變量複製多個副本,通過網絡傳輸到 task 中,此時每個 task 都有一個變量副本。如果變量本身比較大的話(比如 100M,甚至 1G),那麼大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的 Executor 中佔用過多內存導致的頻繁 GC,都會極大地影響性能。

因此對於上述情況,如果使用的外部變量比較大,建議使用 Spark 的廣播功能,對該變量進行廣播。廣播後的變量,會保證每個 Executor 的內存中,只駐留一份變量副本,而 Executor 中的 task 執行時共享該 Executor 中的那份變量副本。這樣的話,可以大大減少變量副本的數量,從而減少網絡傳輸的性能開銷,並減少對 Executor 內存的佔用開銷,降低 GC 的頻率。

廣播大變量的代碼示例

// 以下代碼在算子函數中,使用了外部的變量。
// 此時沒有做任何特殊操作,每個task都會有一份list1的副本。
val list1 = ...
rdd1.map(list1...)

// 以下代碼將list1封裝成了Broadcast類型的廣播變量。
// 在算子函數中,使用廣播變量時,首先會判斷當前task所在Executor內存中,是否有變量副本。
// 如果有則直接使用;如果沒有則從Driver或者其他Executor節點上遠程拉取一份放到本地Executor內存中。
// 每個Executor內存中,就只會駐留一份廣播變量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原則八:使用 Kryo 優化序列化性能

在 Spark 中,主要有三個地方涉及到了序列化:

對於這三種出現序列化的地方,我們都可以通過使用 Kryo 序列化類庫,來優化序列化和反序列化的性能。Spark 默認使用的是 Java 的序列化機制,也就是 ObjectOutputStream/ObjectInputStream API 來進行序列化和反序列化。但是 Spark 同時支持使用 Kryo 序列化庫,Kryo 序列化類庫的性能比 Java 序列化類庫的性能要高很多。官方介紹,Kryo 序列化機制比 Java 序列化機制,性能高 10 倍左右。Spark 之所以默認沒有使用 Kryo 作爲序列化類庫,是因爲 Kryo 要求最好要註冊所有需要進行序列化的自定義類型,因此對於開發者來說,這種方式比較麻煩。

以下是使用 Kryo 的代碼示例,我們只要設置序列化類,再註冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作爲 RDD 泛型類型的自定義類型等):

// 創建SparkConf對象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設置序列化器爲KryoSerializer。
conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")
// 註冊要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原則九:優化數據結構

Java 中,有三種類型比較耗費內存:

因此 Spark 官方建議,在 Spark 編碼實現中,特別是對於算子函數中的代碼,儘量不要使用上述三種數據結構,儘量使用字符串替代對象,使用原始類型(比如 Int、Long)替代字符串,使用數組替代集合類型,這樣儘可能地減少內存佔用,從而降低 GC 頻率,提升性能。

但是在筆者的編碼實踐中發現,要做到該原則其實並不容易。因爲我們同時要考慮到代碼的可維護性,如果一個代碼中,完全沒有任何對象抽象,全部是字符串拼接的方式,那麼對於後續的代碼維護和修改,無疑是一場巨大的災難。同理,如果所有操作都基於數組實現,而不使用 HashMap、LinkedList 等集合類型,那麼對於我們的編碼難度以及代碼可維護性,也是一個極大的挑戰。因此筆者建議,在可能以及合適的情況下,使用佔用內存較少的數據結構,但是前提是要保證代碼的可維護性。

調優概述

在開發完 Spark 作業之後,就該爲作業配置合適的資源了。Spark 的資源參數,基本都可以在 spark-submit 命令中作爲參數設置。很多 Spark 初學者,通常不知道該設置哪些必要的參數,以及如何設置這些參數,最後就只能胡亂設置,甚至壓根兒不設置。資源參數設置的不合理,可能會導致沒有充分利用集羣資源,作業運行會極其緩慢;或者設置的資源過大,隊列沒有足夠的資源來提供,進而導致各種異常。總之,無論是哪種情況,都會導致 Spark 作業的運行效率低下,甚至根本無法運行。因此我們必須對 Spark 作業的資源使用原理有一個清晰的認識,並知道在 Spark 作業運行過程中,有哪些資源參數是可以設置的,以及如何設置合適的參數值。

Spark 作業基本運行原理

詳細原理見上圖。我們使用 spark-submit 提交一個 Spark 作業之後,這個作業就會啓動一個對應的 Driver 進程。根據你使用的部署模式(deploy-mode)不同,Driver 進程可能在本地啓動,也可能在集羣中某個工作節點上啓動。Driver 進程本身會根據我們設置的參數,佔有一定數量的內存和 CPU core。而 Driver 進程要做的第一件事情,就是向集羣管理器(可以是 Spark Standalone 集羣,也可以是其他的資源管理集羣,美團 • 大衆點評使用的是 YARN 作爲資源管理集羣)申請運行 Spark 作業需要使用的資源,這裏的資源指的就是 Executor 進程。YARN 集羣管理器會根據我們爲 Spark 作業設置的資源參數,在各個工作節點上,啓動一定數量的 Executor 進程,每個 Executor 進程都佔有一定數量的內存和 CPU core。

在申請到了作業執行所需的資源之後,Driver 進程就會開始調度和執行我們編寫的作業代碼了。Driver 進程會將我們編寫的 Spark 作業代碼分拆爲多個 stage,每個 stage 執行一部分代碼片段,併爲每個 stage 創建一批 task,然後將這些 task 分配到各個 Executor 進程中執行。task 是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個 task 處理的數據不同而已。一個 stage 的所有 task 都執行完畢之後,會在各個節點本地的磁盤文件中寫入計算中間結果,然後 Driver 就會調度運行下一個 stage。下一個 stage 的 task 的輸入數據就是上一個 stage 輸出的中間結果。如此循環往復,直到將我們自己編寫的代碼邏輯全部執行完,並且計算完所有的數據,得到我們想要的結果爲止。

Spark 是根據 shuffle 類算子來進行 stage 的劃分。如果我們的代碼中執行了某個 shuffle 類算子(比如 reduceByKey、join 等),那麼就會在該算子處,劃分出一個 stage 界限來。可以大致理解爲,shuffle 算子執行之前的代碼會被劃分爲一個 stage,shuffle 算子執行以及之後的代碼會被劃分爲下一個 stage。因此一個 stage 剛開始執行的時候,它的每個 task 可能都會從上一個 stage 的 task 所在的節點,去通過網絡傳輸拉取需要自己處理的所有 key,然後對拉取到的所有相同的 key 使用我們自己編寫的算子函數執行聚合操作(比如 reduceByKey() 算子接收的函數)。這個過程就是 shuffle。

當我們在代碼中執行了 cache/persist 等持久化操作時,根據我們選擇的持久化級別的不同,每個 task 計算出來的數據也會保存到 Executor 進程的內存或者所在節點的磁盤文件中。

因此 Executor 的內存主要分爲三塊:第一塊是讓 task 執行我們自己編寫的代碼時使用,默認是佔 Executor 總內存的 20%;第二塊是讓 task 通過 shuffle 過程拉取了上一個 stage 的 task 的輸出後,進行聚合等操作時使用,默認也是佔 Executor 總內存的 20%;第三塊是讓 RDD 持久化時使用,默認佔 Executor 總內存的 60%。

task 的執行速度是跟每個 Executor 進程的 CPU core 數量有直接關係的。一個 CPU core 同一時間只能執行一個線程。而每個 Executor 進程上分配到的多個 task,都是以每個 task 一條線程的方式,多線程併發運行的。如果 CPU core 數量比較充足,而且分配到的 task 數量比較合理,那麼通常來說,可以比較快速和高效地執行完這些 task 線程。

以上就是 Spark 作業的基本運行原理的說明,大家可以結合上圖來理解。理解作業基本原理,是我們進行資源參數調優的基本前提。

資源參數調優

瞭解完了 Spark 作業運行的基本原理之後,對資源相關的參數就容易理解了。所謂的 Spark 資源參數調優,其實主要就是對 Spark 運行過程中各個使用資源的地方,通過調節各種參數,來優化資源使用的效率,從而提升 Spark 作業的執行性能。以下參數就是 Spark 中主要的資源參數,每個參數都對應着作業運行原理中的某個部分,我們同時也給出了一個調優的參考值。

num-executors

參數說明:該參數用於設置 Spark 作業總共要用多少個 Executor 進程來執行。Driver 在向 YARN 集羣管理器申請資源時,YARN 集羣管理器會盡可能按照你的設置來在集羣的各個工作節點上,啓動相應數量的 Executor 進程。這個參數非常之重要,如果不設置的話,默認只會給你啓動少量的 Executor 進程,此時你的 Spark 作業的運行速度是非常慢的。

參數調優建議:每個 Spark 作業的運行一般設置 50~100 個左右的 Executor 進程比較合適,設置太少或太多的 Executor 進程都不好。設置的太少,無法充分利用集羣資源;設置的太多的話,大部分隊列可能無法給予充分的資源。

executor-memory

參數說明:該參數用於設置每個 Executor 進程的內存。Executor 內存的大小,很多時候直接決定了 Spark 作業的性能,而且跟常見的 JVM OOM 異常,也有直接的關聯。

參數調優建議:每個 Executor 進程的內存設置 4G~8G 較爲合適。但是這只是一個參考值,具體的設置還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大內存限制是多少,num-executors 乘以 executor-memory,是不能超過隊列的最大內存量的。此外,如果你是跟團隊裏其他人共享這個資源隊列,那麼申請的內存量最好不要超過資源隊列最大總內存的 1/3~1/2,避免你自己的 Spark 作業佔用了隊列所有的資源,導致別的同學的作業無法運行。

executor-cores

參數說明:該參數用於設置每個 Executor 進程的 CPU core 數量。這個參數決定了每個 Executor 進程並行執行 task 線程的能力。因爲每個 CPU core 同一時間只能執行一個 task 線程,因此每個 Executor 進程的 CPU core 數量越多,越能夠快速地執行完分配給自己的所有 task 線程。

參數調優建議:Executor 的 CPU core 數量設置爲 2~4 個較爲合適。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大 CPU core 限制是多少,再依據設置的 Executor 數量,來決定每個 Executor 進程可以分配到幾個 CPU core。同樣建議,如果是跟他人共享這個隊列,那麼 num-executors * executor-cores 不要超過隊列總 CPU core 的 1/3~1/2 左右比較合適,也是避免影響其他同學的作業運行。

driver-memory

參數說明:該參數用於設置 Driver 進程的內存。

參數調優建議:Driver 的內存通常來說不設置,或者設置 1G 左右應該就夠了。唯一需要注意的一點是,如果需要使用 collect 算子將 RDD 的數據全部拉取到 Driver 上進行處理,那麼必須確保 Driver 的內存足夠大,否則會出現 OOM 內存溢出的問題。

spark.default.parallelism

參數說明:該參數用於設置每個 stage 的默認 task 數量。這個參數極爲重要,如果不設置可能會直接影響你的 Spark 作業性能。

參數調優建議:Spark 作業的默認 task 數量爲 500~1000 個較爲合適。很多同學常犯的一個錯誤就是不去設置這個參數,那麼此時就會導致 Spark 自己根據底層 HDFS 的 block 數量來設置 task 的數量,默認是一個 HDFS block 對應一個 task。通常來說,Spark 默認設置的數量是偏少的(比如就幾十個 task),如果 task 數量偏少的話,就會導致你前面設置好的 Executor 的參數都前功盡棄。試想一下,無論你的 Executor 進程有多少個,內存和 CPU 有多大,但是 task 只有 1 個或者 10 個,那麼 90% 的 Executor 進程可能根本就沒有 task 執行,也就是白白浪費了資源!因此 Spark 官網建議的設置原則是,設置該參數爲 num-executors * executor-cores 的 2~3 倍較爲合適,比如 Executor 的總 CPU core 數量爲 300 個,那麼設置 1000 個 task 是可以的,此時可以充分地利用 Spark 集羣的資源。

spark.storage.memoryFraction

參數說明:該參數用於設置 RDD 持久化數據在 Executor 內存中能佔的比例,默認是 0.6。也就是說,默認 Executor 60% 的內存,可以用來保存持久化的 RDD 數據。根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。

參數調優建議:如果 Spark 作業中,有較多的 RDD 持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。避免內存不夠緩存所有的數據,導致數據只能寫入磁盤中,降低了性能。但是如果 Spark 作業中的 shuffle 類操作比較多,而持久化操作比較少,那麼這個參數的值適當降低一些比較合適。此外,如果發現作業由於頻繁的 gc 導致運行緩慢(通過 spark web ui 可以觀察到作業的 gc 耗時),意味着 task 執行用戶代碼的內存不夠用,那麼同樣建議調低這個參數的值。

spark.shuffle.memoryFraction

參數說明:該參數用於設置 shuffle 過程中一個 task 拉取到上個 stage 的 task 的輸出後,進行聚合操作時能夠使用的 Executor 內存的比例,默認是 0.2。也就是說,Executor 默認只有 20% 的內存用來進行該操作。shuffle 操作在進行聚合時,如果發現使用的內存超出了這個 20% 的限制,那麼多餘的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。

參數調優建議:如果 Spark 作業中的 RDD 持久化操作較少,shuffle 操作較多時,建議降低持久化操作的內存佔比,提高 shuffle 操作的內存佔比比例,避免 shuffle 過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由於頻繁的 gc 導致運行緩慢,意味着 task 執行用戶代碼的內存不夠用,那麼同樣建議調低這個參數的值。資源參數的調優,沒有一個固定的值,需要同學們根據自己的實際情況(包括 Spark 作業中的 shuffle 操作數量、RDD 持久化操作數量以及 spark web ui 中顯示的作業 gc 情況),同時參考本篇文章中給出的原理以及調優建議,合理地設置上述參數。

資源參數參考示例

以下是一份 spark-submit 命令的示例,大家可以參考一下,並根據自己的實際情況進行調節:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

根據實踐經驗來看,大部分 Spark 作業經過本次基礎篇所講解的開發調優與資源調優之後,一般都能以較高的性能運行了,足以滿足我們的需求。但是在不同的生產環境和項目背景下,可能會遇到其他更加棘手的問題(比如各種數據傾斜),也可能會遇到更高的性能要求。爲了應對這些挑戰,需要使用更高級的技巧來處理這類問題。

如果您喜歡本文,歡迎點擊右上角,把文章分享到朋友圈~~
如果有想了解和學習的知識點或技術點,也可以留言給若飛安排分享

作者:李雪蕤

來源:https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

版權申明:內容來源網絡,版權歸原創者所有。除非無法確認,我們都會標明作者及出處,如有侵權煩請告知,我們會立即刪除並表示歉意。謝謝!

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/cocAoN7NrwQRKJnEaiRw0g