萬字長文 - 圖解,帶你輕鬆學習 Spark

大家好,我是大 D。

今天給大家分享一篇 Spark 核心知識點的梳理,對知識點的講解秉承着能用圖解的就不照本宣科地陳述,力求精簡、通俗易懂。希望能爲新手的入門學習掃清障礙,從基礎概念入手、再到原理深入,由淺入深地輕鬆掌握 Spark。

1、初識 Spark

Spark 不僅能夠在內存中進行高效運算,還是一個大一統的軟件棧,可以適用於各種各樣原本需要多種不同的分佈式平臺的場景。

1)背景

Spark 作爲一個用來快速實現大規模數據計算的通用分佈式大數據計算引擎,是大數據開發工程師必備的一項技術棧。Spark 相對 Hadoop 具有較大優勢,但 Spark 並不能完全替代 Hadoop。

實際上,Spark 已經很好地融入了 Hadoop 家族,作爲其中一員,主要用於替代 Hadoop 中的 MapReduce 計算模型。

2)Spark 的優勢

Spark 擁有 Hadoop MapReduce 所具備的優點,但不同的是,Hadoop 每次經過 job 執行的中間結果都會存儲在 HDFS 上,而 Spark 執行 job 的中間過程數據可以直接保存在內存中,無需讀寫到 HDFS 磁盤上。因爲內存的讀寫速度與磁盤的讀寫速度不在一個數量級上,所以 Spark 利用內存中的數據可以更快地完成數據的計算處理。

此外,由於 Spark 在內部使用了彈性分佈式數據集 (Resilient Distributed Dataset,RDD),經過了數據模型的優化,即便在磁盤上進行分佈式計算,其計算性能也是高於 Hadoop MapReduce 的。

3)Spark 的特點

2、Spark 的模塊組成

Spark 是包含多個緊密集成的組件,這些組件結合密切並且可以相互調用,這樣我們可以像在平常軟件項目中使用程序庫一樣,組合使用這些的組件。

1)Spark 的模塊組成

2)Spark Core

Spark Core 實現了 Spark 基本的核心功能,如下:

3)Spark SQL

Spark SQL 是 Spark 用來操作結構化數據的程序包,支持使用 SQL 或者 Hive SQL 或者與傳統的 RDD 編程的數據操作結合的方式來查詢數據,使得分佈式數據的處理變得更加簡單。

4)Spark Streaming

Spark Streaming 提供了對實時數據進行流式計算的 API,支持 Kafka、Flume、TCP 等多種流式數據源。此外,還提供了基於時間窗口的批量流操作,用於對一定時間週期內的流數據執行批量處理。

5)MLlib

Spark MLlib 作爲一個提供常見機器學習(ML)功能的程序庫,包括分類、迴歸、聚類等多種機器學習算法的實現,其簡單易用的 API 接口降低了機器學習的門檻。

6)GraphX

GraphX 用於分佈式圖計算,比如可以用來操作社交網絡的朋友關係圖,能夠通過其提供的 API 快速解決圖計算中的常見問題。

7)SparkR

SparkR 是一個 R 語言包,提供了輕量級的基於 R 語言使用 Spark 的方式,使得基於 R 語言能夠更方便地處理大規模的數據集。

3、Spark 的運行原理

1)Spark 的運行模式

就底層而言,Spark 設計爲可以高效地在一個到數千個計算節點之間伸縮計算。爲了實現這樣的要求,同時獲得最大靈活性,Spark 支持在各種集羣管理器上運行。

Spark 的運行模式主要有:

2)Spark 的集羣架構

Spark 的集羣架構主要由 Cluster Manager(資源管理器)、Worker (工作節點)、Executor(執行器)、Driver(驅動器)、Application(應用程序) 5 部分組成,如下圖:

3)Worker 的工作職責

  1. 通過註冊機制向 Cluster Manager 彙報自身的 CPU 和內存等資源使用信息;

  2. 在 Master 的指示下,創建並啓動 Executor(真正的計算單元);

  3. 將資源和任務進一步分配給 Executor 並運行;

  4. 同步資源信息和 Executor 狀態信息給 Cluster Manager。

4)Driver 的工作職責

Application 通過 Driver 與 Cluster Manager 和 Executor 進行通信。

  1. 運行 Application 的 main() 函數;

  2. 創建 SparkContext;

  3. 劃分 RDD 並生成 DAG;

  4. 構建 Job 並將每個 Job 都拆分爲多個 Stage,每個 Stage 由多個 Task 構成,也被稱爲 Task Set;

  5. 與 Spark 中的其他組件進行資源協調;

  6. 生成併發送 Task 到 Executor。

4、RDD 概念及核心結構

本節將介紹 Spark 中一個抽象的概念——RDD,要學習 Spark 就必須對 RDD 有一個清晰的認知,RDD 是 Spark 中最基本的數據抽象,代表一個不可變、可分區、元素可並行計算的集合。

1)RDD 的概念

RRD 全稱叫做彈性分佈式數據集(Resilient Distributed Dataset),從它的名字中可以拆解出三個概念。

2)RDD 的特點

RDD 具有自動容錯、位置感知性調度以及可伸縮等特點,並且允許用戶在執行多個查詢時,顯式地將數據集緩存在內存中,後續查詢能夠重用該數據集,這極大地提升了查詢效率。

下面是源碼中對 RDD 類介紹的註釋:

Internally, each RDD is characterized by five main properties:

 - A list of partitions
 - A function for computing each split
 - A list of dependencies on other RDDs
 - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

從源碼對 RDD 的定義中,可以看出 RDD 不僅能表示存有多個元素的數據集,而且還能通過依賴關係推算出這個數據集是從哪來的,在哪裏計算更合適。

3)RDD 的核心結構

在學習 RDD 轉換操作算子之前,根據 RDD 的特點對 RDD 中的核心結構進行一下梳理,這樣對 Spark 的執行原理會有一個更深的理解。

5、Spark RDD 的寬窄依賴關係

1)RDD 的依賴關係

在 Spark 中,RDD 分區的數據不支持修改,是隻讀的。如果想更新 RDD 分區中的數據,那麼只能對原有 RDD 進行轉化操作,也就是在原來 RDD 基礎上創建一個新的 RDD。

那麼,在整個任務的運算過程中,RDD 的每次轉換都會生成一個新的 RDD,因此 RDD 們之間會產生前後依賴的關係。

說白了,相當於對原始 RDD 分區數據的整個運算進行了拆解,當運算中出現異常情況導致分區數據丟失時,Spark 可以還通過依賴關係從上一個 RDD 中重新計算丟失的數據,而不是對最開始的 RDD 分區數據重新進行計算。

在 RDD 的依賴關係中,我們將上一個 RDD 稱爲父 RDD,下一個 RDD 稱爲子 RDD。

2)如何區分寬窄依賴

RDD 們之間的依賴關係,也分爲寬依賴和窄依賴。

說白了,就是看兩個 RDD 的分區之間,是不是一對一的關係,若是則爲窄依賴,反之則爲寬依賴。

有個形象的比喻,如果父 RDD 中的一個分區有多個孩子(被多個分區依賴),也就是超生了,就爲寬依賴;反之,如果只有一個孩子(只被一個分區依賴),那麼就爲窄依賴。

常見的寬窄依賴算子:

3)爲何設計要寬窄依賴

從上面的分析,不難看出,在窄依賴中子 RDD 的每個分區數據的生成操作都是可以並行執行的,而在寬依賴中需要所有父 RDD 的 Shuffle 結果完成後再被執行。

在 Spark 執行作業時,會按照 Stage 劃分不同的 RDD,生成一個完整的最優執行計劃,使每個 Stage 內的 RDD 都儘可能在各個節點上並行地被執行。

如下圖,Stage3 包含 Stage1 和 Stage2,其中, Stage1 爲窄依賴,Stage2 爲寬依賴。

因此,劃分寬窄依賴也是 Spark 優化執行計劃的一個重要步驟,寬依賴是劃分執行計劃中 Stage 的依據,對於寬依賴必須要等到上一個 Stage 計算完成之後才能計算下一個階段。

6、Spark RDD 的轉換操作與行動操作

1)RDD 的創建

Spark 提供了兩種創建 RDD 的方式:對一個集合進行並行化操作和利用外部數據集生成 RDD 。

對一個集合進行並行化操作

Spark 創建 RDD 最簡單的方式就是把已經存在的集合傳給 parallelize() 方法,不過,這種方式在開發中並不常用,畢竟需要將整個的數據集先放到一個節點內存中。

利用 parallelize() 方法將已經存在的一個集合轉換爲 RDD,集合中的數據也會被複制到 RDD 中並參與並行計算。

val lines = sc.parallelize(Arrays.asList(1,2,3,4,5),n)

其中,n 爲並行集合的分區數量,Spark 將爲集羣的每個分區都運行一個任務。該參數設置太小不能很好地利用 CPU,設置太大又會導致任務阻塞等待,一般 Spark 會嘗試根據集羣的 CPU 核數自動設置分區數量。

利用外部數據集生成 RDD

在開發中,Spark 創建 RDD 最常用的一個方式就是從 Hadoop 或者其他外部存儲系統創建 RDD,包括本地文件系統、HDFS、Cassandra、HBase、S3 等。

通過 SparkContext 的 textFile() 方法來讀取文本文件創建 RDD 的代碼,如下:

val lines = sc.textFile("../temp.txt")

其中,textFile() 方法的 url 參數可以是本地文件或路徑、HDFS 路徑等,Spark 會讀取該路徑下所有的文件,並將其作爲數據源加載到內存生成對應的 RDD。

當然, RDD 也可以在現有 RDD 的基礎上經過算子轉換生成新的 RDD,這是接下來要講的 RDD 算子轉換的內容,Spark RDD 支持兩種類型的操作:轉換操作 (Transformation) 和行動操作 (Action)。

2)RDD 的轉換操作

轉換操作是指從現有 RDD 的基礎上創建新的 RDD,是返回一個新 RDD 的操作。轉換操作是惰性求值的,也就是不會立即觸發執行實際的轉換,而是先記錄 RDD 之間的轉換關係,只有當觸發行動操作時纔會真正地執行轉換操作,並返回計算結果。

舉個栗子,有一個日誌文件 log.txt,需要從裏面若干條信息中,篩選出其中錯誤的報警信息,我們可以用轉化操作 filter() 即可完成,代碼如下:

val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))

其中,textFile() 方法定義了名爲 inputRDD 的 RDD,但是此時 log.txt 文件並沒有加載到內存中,僅僅是指向文件的位置。然後通過 filter() 方法進行篩選定義了名爲 errorsRDD 的轉換 RDD,同樣也屬於惰性計算,並沒有立即執行。

3)RDD 的行動操作

瞭解瞭如何通過轉換操作從已有的 RDD 中創建一個新的 RDD,但有時我們希望可以對數據集進行實際的計算。行動操作就是接下來要講的第二種 RDD 操作,它會強制執行那些求值必須用到的 RDD 的轉換操作,並將最終的計算結果返回給 Driver 程序,或者寫入到外部存儲系統中。

繼續剛纔的栗子,我們需要將上一步統計出來的報警信息的數量打印出來,我們可以藉助 count() 方法進行計數。

val countRDD = errorsRDD.count()

其中,count() 可以觸發實際的計算,強制執行前面步驟中的轉換操作。實際上,Spark RDD 會將 RDD 計算分解到不同的 Stage 並在不同的節點上進行運算,每個節點都會運行 count() 結果,所有運算完成之後會聚合一個結果返回給 Driver 程序。

如果分不清楚給定的一個 RDD 操作方法是屬於轉換操作還是行動操作,去看下它的返回類型,轉換操作返回的是 RDD 類型,而行動操作則返回的是其他的數據類型。

4)惰性求值

前面,我們多次提到轉換操作都是惰性求值,這也就意味着調用的轉換操作(textFile、filter 等)時,並不會立即去執行,而是 Spark 會記錄下所要求執行的操作的相關信息。

因此,我們對 RDD 的理解應該更深一步,RDD 不僅可以看作是一個存放分佈式數據的數據集,也可以當作是通過轉換操作構建出來的、記錄如何計算數據的指令列表。

惰性操作避免了所有操作都進行一遍 RDD 運算,它可以將很多操作合併在一起,來減少計算數據的步驟,提高 Spark 的運算效率。

7、Spark RDD 中常用的操作算子

1)向 Spark 傳遞函數

Spark API 依賴 Driver 程序中的傳遞函數完成在集羣上執行 RDD 轉換並完成數據計算。在 Java API 中,函數所在的類需要實現 org.apache.spark.api.java.function 包中的接口。

Spark 提供了 lambda 表達式和 自定義 Function 類兩種創建函數的方式。前者語法簡潔、方便使用;後者可以在一些複雜應用場景中自定義需要的 Function 功能。

舉個栗子,求 RDD 中數據的平方,並只保留不爲 0 的數據。
可以用 lambda 表達式簡明地定義 Function 實現,代碼如下:

val input = sc.parallelize(List(-2,-1,0,1,2))
val rdd1 = input.map(x => x * x)
val rdd2 = rdd1.filter(x => x != 0 )

首先用 map() 對 RDD 中所有的數據進行求平方,然後用到 filter() 來篩選不爲 0 的數據。

其中,map() 和 filter() 就是我們最常用的轉換算子,map() 接收了 一個 lambda 表達式定義的函數,並把這個函數運用到 input 中的每一個元素,最後把函數計算後的返回結果作爲結果 rdd1 中對應元素的值。

同樣,filter() 也接收了一個 lambda 表達式定義的函數,並將 rdd1 中滿足該函數的數據放入到了新的 rdd2 中返回。

Spark 提供了很豐富的處理 RDD 數據的操作算子,比如使用 distinct() 還可以繼續對 rdd2 進行去重處理。

如果需要對 RDD 中每一個元素處理後生成多個輸出,也有相應的算子,比如 flatMap()。它和 map() 類似,也是將輸入函數應用到 RDD 中的每個元素,不過返回的不是一個元素了,而是一個返回值序列的迭代器。

最終得到的輸出是一個包含各個迭代器可訪問的所有元素的 RDD,flatMap() 最經典的一個用法就是把輸入的一行字符串切分爲一個個的單詞。

舉個栗子,將行數據切分成單詞,對比下 map() 與 flat() 的不同。

val lines = sc.parallelize(List("hello spark","hi,flink"))
val rdd1 = lines.map(line => line.split(","))
val rdd2 = lines.flatMap(line => line.split(","))

可以看到,把 lines 中的每一個 line,使用所提供的函數執行一遍,map() 輸出的 rdd1 中仍然只有兩個元素;而 flatMap() 輸出的 rdd2 則是將原 RDD 中的數據 “拍扁” 了,這樣就得到了一個由各列表中元素組成的 RDD,而不是一個由列表組成的 RDD。

2) RDD 的轉換算子

Spark 中的轉換算子主要用於 RDD 之間的轉化和數據處理,常見的轉換算子具體如下:(發送推文預覽時發現表格內容展示不全,就乾脆截圖了

3)RDD 的行動算子

Spark 中行動算子主要用於對分佈式環境中 RDD 的轉化操作結果進行統一地執行處理,比如結果收集、數據保存等,常用的行動算子具體如下:

8、Spark 的共享變量之累加器和廣播變量

本節將介紹下 Spark 編程中兩種類型的共享變量:累加器和廣播變量。
簡單說,累加器是用來對信息進行聚合的,而廣播變量則是用來高效分發較大對象的。

1) 閉包的概念

在講共享變量之前,我們先了解下啥是閉包,代碼如下。

var n = 1
val func = (i:Int) => i + n

函數 func 中有兩個變量 n 和 i ,其中 i 爲該函數的形式參數,也就是入參,在 func 函數被調用時, i 會被賦予一個新的值,我們稱之爲綁定變量 (bound variable)。而 n 則是定義在了函數 func 外面的,該函數並沒有賦予其任何值,我們稱之爲自由變量 (free variable)。

像 func 函數這樣,返回結果依賴於聲明在函數外部的一個或多個變量,將這些自由變量捕獲並構成的封閉函數,我們稱之爲 “閉包”。

先看一個累加求和的栗子,如果在集羣模式下運行以下代碼,會發現結果並非我們所期待的累計求和。

var sum = 0
val arr = Array(1,2,3,4,5)
sc.parallelize(arr).foreach(x => sum + x)
println(sum)

sum 的結果爲 0,導致這個結果的原因就是存在閉包。

在集羣中 Spark 會將對 RDD 的操作處理分解爲 Tasks ,每個 Task 由 Executor 執行。而在執行之前,Spark 會計算 task 的閉包 (也就是 foreach() )。閉包會被序列化併發送給每個 Executor,但是發送給 Executor 的是副本,所以在 Driver 上輸出的依然是 sum 本身。

如果想對 sum 變量進行更新,則就要用到接下來我們要講的累加器。

2)累加器的原理

累加器是對信息進行聚合的,通常在向 Spark 傳遞函數時,比如使用 map() 或者 filter() 傳條件時,可以使用 Driver 中定義的變量,但是集羣中運行的每個任務都會得到這些變量的一份新的副本,然而,正如前面所述,更新這些副本的值,並不會影響到 Driver 中對應的變量。

累加器則突破了這個限制,可以將工作節點中的值聚合到 Driver 中。它的一個典型用途就是對作業執行過程中的特定事件進行計數。

舉個栗子,給了一個日誌記錄,需要統計這個文件中有多少空行。

val sc = new SparkContext(...)
val logs = sc.textFile(...)
val blanklines = sc.accumulator(0)
val callSigns = logs.flatMap(line => {
	if(line == ""){
		blanklines += 1
	}
	line.split("")
})
callSigns.count()
println("日誌中的空行數爲:" + blanklines.value)

總結下累加器的使用,首先 Driver 調用了 SparkContext.accumulator(initialValue) 方法,創建一個名爲 blanklines 且初始值爲 0 的累加器。然後在遇到空行時,Spark 閉包裏的執行器代碼就會對其 +1 。執行完成之後,Driver 可以調用累加器的 value 屬性來訪問累加器的值。

需要說明的是,只有在行動算子 count() 運行之後,纔可以 println 出正確的值,因爲我們之前講過 flatMap() 是惰性計算的,只有遇到行動操作之後纔會出發強制執行運算進行求值。

另外,工作節點上的任務是不可以訪問累加器的值,在這些任務看來,累加器是一個只寫的變量。

對於累加器的使用,不僅可以進行數據的 sum 加法,也可以跟蹤數據的最大值 max、最小值 min 等。

3)廣播變量的原理

前面說了,Spark 會自動把閉包中所有引用到的自由變量發送到工作節點上,那麼每個 Task 的閉包都會持有自由變量的副本。如果自由變量的內容很大且 Task 很多的情況下,爲每個 Task 分發這樣的自由變量的代價將會巨大,必然會對網絡 IO 造成壓力。

廣播變量則突破了這個限制,不是把變量副本發給所有的 Task ,而是將其分發給所有的工作節點一次,這樣節點上的 Task 可以共享一個變量副本。

Spark 使用的是一種高效的類似 BitTorrent 的通信機制,可以降低通信成本。廣播的數據只會被髮動各個節點一次,除了 Driver 可以修改,其他節點都是隻讀,並且廣播數據是以序列化形式緩存在系統中的,當 Task 需要數據時對其反序列化操作即可。

在使用中,Spark 可以通過調用 SparkContext.broadcast(v) 創建廣播變量,並通過調用 value 來訪問其值,舉例代碼如下:

val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/rUy83-XVpmT2uW1iE0-VCA