上萬字詳解 Spark Core

先來一個問題,也是面試中常問的:

Spark 爲什麼會流行?

原因 1:優秀的數據模型和豐富計算抽象

Spark 產生之前,已經有 MapReduce 這類非常成熟的計算系統存在了,並提供了高層次的 API(map/reduce),把計算運行在集羣中並提供容錯能力,從而實現分佈式計算。

雖然 MapReduce 提供了對數據訪問和計算的抽象,但是對於數據的複用就是簡單的將中間數據寫到一個穩定的文件系統中 (例如 HDFS),所以會產生數據的複製備份,磁盤的 I/O 以及數據的序列化,所以在遇到需要在多個計算之間複用中間結果的操作時效率就會非常的低。而這類操作是非常常見的,例如迭代式計算,交互式數據挖掘,圖計算等。

認識到這個問題後,學術界的 AMPLab 提出了一個新的模型,叫做 RDD。RDD 是一個可以容錯且並行的數據結構 (其實可以理解成分佈式的集合,操作起來和操作本地集合一樣簡單),它可以讓用戶顯式的將中間結果數據集保存在內存中,並且通過控制數據集的分區來達到數據存放處理最優化。同時 RDD 也提供了豐富的 API (map、reduce、filter、foreach、redeceByKey...) 來操作數據集。後來 RDD 被 AMPLab 在一個叫做 Spark 的框架中提供並開源。

簡而言之,Spark 借鑑了 MapReduce 思想發展而來,保留了其分佈式並行計算的優點並改進了其明顯的缺陷。讓中間數據存儲在內存中提高了運行速度、並提供豐富的操作數據的 API 提高了開發速度。

原因 2:完善的生態圈 - fullstack

目前,Spark 已經發展成爲一個包含多個子項目的集合,其中包含 SparkSQL、Spark Streaming、GraphX、MLlib 等子項目。

Spark Core:實現了 Spark 的基本功能,包含 RDD、任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。

Spark SQL:Spark 用來操作結構化數據的程序包。通過 Spark SQL,我們可以使用 SQL 操作數據。

Spark Streaming:Spark 提供的對實時數據進行流式計算的組件。提供了用來操作數據流的 API。

Spark MLlib:提供常見的機器學習 (ML) 功能的程序庫。包括分類、迴歸、聚類、協同過濾等,還提供了模型評估、數據導入等額外的支持功能。

GraphX(圖計算):Spark 中用於圖計算的 API,性能良好,擁有豐富的功能和運算符,能在海量數據上自如地運行復雜的圖算法。

集羣管理器:Spark 設計爲可以高效地在一個計算節點到數千個計算節點之間伸縮計算。

StructuredStreaming:處理結構化流, 統一了離線和實時的 API。

Spark VS Hadoop

Vq3AjR

❣️注意:
儘管 Spark 相對於 Hadoop 而言具有較大優勢,但 Spark 並不能完全替代 Hadoop,Spark 主要用於替代 Hadoop 中的 MapReduce 計算模型。存儲依然可以使用 HDFS,但是中間結果可以存放在內存中;調度可以使用 Spark 內置的,也可以使用更成熟的調度系統 YARN 等。
實際上,Spark 已經很好地融入了 Hadoop 生態圈,併成爲其中的重要一員,它可以藉助於 YARN 實現資源調度管理,藉助於 HDFS 實現分佈式存儲。
此外,Hadoop 可以使用廉價的、異構的機器來做分佈式存儲與計算,但是,Spark 對硬件的要求稍高一些,對內存與 CPU 有一定的要求。

Spark Core

一、RDD 詳解

1. 爲什麼要有 RDD?

在許多迭代式算法 (比如機器學習、圖算法等) 和交互式數據挖掘中,不同計算階段之間會重用中間結果,即一個階段的輸出結果會作爲下一個階段的輸入。但是,之前的 MapReduce 框架採用非循環式的數據流模型,把中間結果寫入到 HDFS 中,帶來了大量的數據複製、磁盤 IO 和序列化開銷。且這些框架只能支持一些特定的計算模式(map/reduce),並沒有提供一種通用的數據抽象。

AMP 實驗室發表的一篇關於 RDD 的論文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》就是爲了解決這些問題的。

RDD 提供了一個抽象的數據模型,讓我們不必擔心底層數據的分佈式特性,只需將具體的應用邏輯表達爲一系列轉換操作 (函數),不同 RDD 之間的轉換操作之間還可以形成依賴關係,進而實現管道化,從而避免了中間結果的存儲,大大降低了數據複製、磁盤 IO 和序列化開銷,並且還提供了更多的 API(map/reduec/filter/groupBy...)。

2. RDD 是什麼?

RDD(Resilient Distributed Dataset) 叫做彈性分佈式數據集,是 Spark 中最基本的數據抽象,代表一個不可變、可分區、裏面的元素可並行計算的集合。單詞拆解:

3. RDD 主要屬性

進入 RDD 的源碼中看下:

RDD 源碼

在源碼中可以看到有對 RDD 介紹的註釋,我們來翻譯下:

  1. A list of partitions :一組分片 (Partition)/ 一個分區(Partition) 列表,即數據集的基本組成單位。對於 RDD 來說,每個分片都會被一個計算任務處理,分片數決定並行度。用戶可以在創建 RDD 時指定 RDD 的分片個數,如果沒有指定,那麼就會採用默認值。

  2. A function for computing each split :一個函數會被作用在每一個分區。Spark 中 RDD 的計算是以分片爲單位的,compute 函數會被作用到每個分區上。

  3. A list of dependencies on other RDDs :一個 RDD 會依賴於其他多個 RDD。RDD 的每次轉換都會生成一個新的 RDD,所以 RDD 之間就會形成類似於流水線一樣的前後依賴關係。在部分分區數據丟失時,Spark 可以通過這個依賴關係重新計算丟失的分區數據,而不是對 RDD 的所有分區進行重新計算。(Spark 的容錯機制)

  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):可選項,對於 KV 類型的 RDD 會有一個 Partitioner,即 RDD 的分區函數,默認爲 HashPartitioner。

  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):可選項, 一個列表,存儲存取每個 Partition 的優先位置 (preferred location)。對於一個 HDFS 文件來說,這個列表保存的就是每個 Partition 所在的塊的位置。按照 "移動數據不如移動計算" 的理念,Spark 在進行任務調度的時候,會盡可能選擇那些存有數據的 worker 節點來進行任務計算。

總結

RDD 是一個數據集的表示,不僅表示了數據集,還表示了這個數據集從哪來,如何計算,主要屬性包括:

  1. 分區列表

  2. 計算函數

  3. 依賴關係

  4. 分區函數 (默認是 hash)

  5. 最佳位置

分區列表、分區函數、最佳位置,這三個屬性其實說的就是數據集在哪,在哪計算更合適,如何分區;
計算函數、依賴關係,這兩個屬性其實說的是數據集怎麼來的。

二、RDD-API

1. RDD 的創建方式

  1. 由外部存儲系統的數據集創建,包括本地的文件系統,還有所有 Hadoop 支持的數據集,比如 HDFS、Cassandra、HBase 等:
    val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

  2. 通過已有的 RDD 經過算子轉換生成新的 RDD:
    val rdd2=rdd1.flatMap(_.split(" "))

  3. 由一個已經存在的 Scala 集合創建:
    val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD 方法底層調用了 parallelize 方法:

RDD 源碼

2. RDD 的算子分類

RDD 的算子分爲兩類:

  1. Transformation 轉換操作: 返回一個新的 RDD

  2. Action 動作操作: 返回值不是 RDD(無返回值或返回其他的)

❣️注意:
1、RDD 不實際存儲真正要計算的數據,而是記錄了數據的位置在哪裏,數據的轉換關係 (調用了什麼方法,傳入什麼函數)。
2、RDD 中的所有轉換都是惰性求值 / 延遲執行的,也就是說並不會直接計算。只有當發生一個要求返回結果給 Driver 的 Action 動作時,這些轉換纔會真正運行。
3、之所以使用惰性求值 / 延遲執行,是因爲這樣可以在 Action 時對 RDD 操作形成 DAG 有向無環圖進行 Stage 的劃分和並行優化,這種設計讓 Spark 更加有效率地運行。

3. Transformation 轉換算子

E5GZwb 19n2O0

4. Action 動作算子

H9TryO

統計操作:

PxuJ9H

三、RDD 的持久化 / 緩存

在實際開發中某些 RDD 的計算或轉換可能會比較耗費時間,如果這些 RDD 後續還會頻繁的被使用到,那麼可以將這些 RDD 進行持久化 / 緩存,這樣下次再使用到的時候就不用再重新計算了,提高了程序運行的效率。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發action,會去讀取HDFS的文件,rdd2會真正執行持久化
rdd2.sortBy(_._2,false).collect//觸發action,會去讀緩存中的數據,執行速度會比之前快,因爲rdd2已經持久化到內存中了

持久化 / 緩存 API 詳解

RDD 通過 persist 或 cache 方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存而是****觸發後面的 action ,該 RDD 將會被緩存在計算節點的內存中,並供後面重用。
通過查看 RDD 的源碼發現 cache 最終也是調用了 persist 無參方法 (默認存儲只存在內存中):

RDD 源碼

默認的存儲級別都是僅在內存存儲一份,Spark 的存儲級別還有好多種,存儲級別在 object StorageLevel 中定義的。

HasaJN

總結:

  1. RDD 持久化 / 緩存的目的是爲了提高後續操作的速度

  2. 緩存的級別有很多,默認只存在內存中, 開發中使用 memory_and_disk

  3. 只有執行 action 操作的時候纔會真正將 RDD 數據進行持久化 / 緩存

  4. 實際開發中如果某一個 RDD 後續會被頻繁的使用,可以將該 RDD 進行持久化 / 緩存

四、RDD 容錯機制 Checkpoint

持久化 / 緩存可以把數據放在內存中,雖然是快速的,但是也是最不可靠的;也可以把數據放在磁盤上,也不是完全可靠的!例如磁盤會損壞等。

Checkpoint 的產生就是爲了更加可靠的數據持久化,在 Checkpoint 的時候一般把數據放在在 HDFS 上,這就天然的藉助了 HDFS 天生的高容錯、高可靠來實現數據最大程度上的安全,實現了 RDD 的容錯和高可用。

用法:

SparkContext.setCheckpointDir("目錄") //HDFS的目錄

RDD.checkpoint
  1. 位置:Persist 和 Cache 只能保存在本地的磁盤和內存中 (或者堆外內存 -- 實驗中)Checkpoint 可以保存數據到 HDFS 這類可靠的存儲上。

  2. 生命週期:Cache 和 Persist 的 RDD 會在程序結束後會被清除或者手動調用 unpersist 方法 Checkpoint 的 RDD 在程序結束後依然存在,不會被刪除。

五、RDD 依賴關係

1. 寬窄依賴

寬窄依賴

窄依賴: 父 RDD 的一個分區只會被子 RDD 的一個分區依賴;
寬依賴: 父 RDD 的一個分區會被子 RDD 的多個分區依賴 (涉及到 shuffle)。

2. 爲什麼要設計寬窄依賴

  1. 對於窄依賴:

窄依賴的多個分區可以並行計算;
窄依賴的一個分區的數據如果丟失只需要重新計算對應的分區的數據就可以了。

  1. 對於寬依賴:

劃分 Stage(階段) 的依據: 對於寬依賴, 必須等到上一階段計算完成才能計算下一階段。

六、DAG 的生成和劃分 Stage

1. DAG 介紹

DAG(Directed Acyclic Graph 有向無環圖) 指的是數據轉換執行的過程,有方向,無閉環 (其實就是 RDD 執行的流程);
原始的 RDD 通過一系列的轉換操作就形成了 DAG 有向無環圖,任務執行時,可以按照 DAG 的描述,執行真正的計算 (數據被操作的一個過程)。

開始: 通過 SparkContext 創建的 RDD;
結束: 觸發 Action,一旦觸發 Action 就形成了一個完整的 DAG。

2.DAG 劃分 Stage

DAG 劃分 Stage

一個 Spark 程序可以有多個 DAG(有幾個 Action,就有幾個 DAG,上圖最後只有一個 Action(圖中未表現), 那麼就是一個 DAG)

一個 DAG 可以有多個 Stage(根據寬依賴 / shuffle 進行劃分)。

同一個 Stage 可以有多個 Task 並行執行 (task 數 = 分區數,如上圖,Stage1 中有三個分區 P1、P2、P3,對應的也有三個 Task)。

可以看到這個 DAG 中只 reduceByKey 操作是一個寬依賴,Spark 內核會以此爲邊界將其前後劃分成不同的 Stage。

同時我們可以注意到,在圖中 Stage1 中,從 textFile 到 flatMap 到 map 都是窄依賴,這幾步操作可以形成一個流水線操作,通過 flatMap 操作生成的 partition 可以不用等待整個 RDD 計算結束,而是繼續進行 map 操作,這樣大大提高了計算的效率

一個複雜的業務邏輯如果有 shuffle,那麼就意味着前面階段產生結果後,才能執行下一個階段,即下一個階段的計算要依賴上一個階段的數據。那麼我們按照 shuffle 進行劃分 (也就是按照寬依賴就行劃分),就可以將一個 DAG 劃分成多個 Stage / 階段,在同一個 Stage 中,會有多個算子操作,可以形成一個 pipeline 流水線,流水線內的多個平行的分區可以並行執行。

對於窄依賴,partition 的轉換處理在 stage 中完成計算,不劃分 (將窄依賴儘量放在在同一個 stage 中,可以實現流水線計算)。

對於寬依賴,由於有 shuffle 的存在,只能在父 RDD 處理完成後,才能開始接下來的計算,也就是說需要要劃分 stage。

總結:

Spark 會根據 shuffle / 寬依賴使用回溯算法來對 DAG 進行 Stage 劃分,從後往前,遇到寬依賴就斷開,遇到窄依賴就把當前的 RDD 加入到當前的 stage / 階段中

具體的劃分算法請參見 AMP 實驗室發表的論文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/bre4VNQvhuGT4Hp1Mym9rQ