萬字長文 - 圖解,帶你輕鬆學習 Spark
大家好,我是大 D。
今天給大家分享一篇 Spark 核心知識點的梳理,對知識點的講解秉承着能用圖解的就不照本宣科地陳述,力求精簡、通俗易懂。希望能爲新手的入門學習掃清障礙,從基礎概念入手、再到原理深入,由淺入深地輕鬆掌握 Spark。
1、初識 Spark
Spark 不僅能夠在內存中進行高效運算,還是一個大一統的軟件棧,可以適用於各種各樣原本需要多種不同的分佈式平臺的場景。
1)背景
Spark 作爲一個用來快速實現大規模數據計算的通用分佈式大數據計算引擎,是大數據開發工程師必備的一項技術棧。Spark 相對 Hadoop 具有較大優勢,但 Spark 並不能完全替代 Hadoop。
實際上,Spark 已經很好地融入了 Hadoop 家族,作爲其中一員,主要用於替代 Hadoop 中的 MapReduce 計算模型。
2)Spark 的優勢
Spark 擁有 Hadoop MapReduce 所具備的優點,但不同的是,Hadoop 每次經過 job 執行的中間結果都會存儲在 HDFS 上,而 Spark 執行 job 的中間過程數據可以直接保存在內存中,無需讀寫到 HDFS 磁盤上。因爲內存的讀寫速度與磁盤的讀寫速度不在一個數量級上,所以 Spark 利用內存中的數據可以更快地完成數據的計算處理。
此外,由於 Spark 在內部使用了彈性分佈式數據集 (Resilient Distributed Dataset,RDD),經過了數據模型的優化,即便在磁盤上進行分佈式計算,其計算性能也是高於 Hadoop MapReduce 的。
3)Spark 的特點
-
計算速度快
Spark 將處理的每個任務都構造出一個有向無環圖 (Directed Acyclic Graph,DAG) 來執行,實現原理是基於 RDD 在內存中對數據進行迭代計算的,因此計算速度很快。官方數據表明,如果計算數據是從磁盤中讀取,Spark 計算速度是 Hadoop 的 10 倍以上;如果計算數據是從內存中讀取,Spark 計算速度則是 Hadoop 的 100 倍以上。
-
易於使用
Spark 提供了 80 多個高級運算操作,支持豐富的算子。開發人員只需調用 Spark 封裝好的 API 來實現即可,無需關注 Spark 的底層架構。
-
通用大數據框架
大數據處理的傳統方案需要維護多個平臺,比如,離線任務是放在 Hadoop MapRedue 上運行,實時流計算任務是放在 Storm 上運行。而 Spark 則提供了一站式整體解決方案,可以將即時查詢、離線計算、實時流計算等多種開發庫無縫組合使用。
-
支持多種資源管理器
Spark 支持多種運行模式,比如 Local、Standalone、YARN、Mesos、AWS 等部署模式。用戶可以根據現有的大數據平臺靈活地選擇運行模式。
-
Spark 生態圈豐富
Spark 不僅支持多種資源管理器調度 job,也支持 HDFS、HBase 等多種持久化層讀取數據,來完成基於不同組件實現的應用程序計算。目前,Spark 生態圈已經從大數據計算和數據挖掘擴展到機器學習、NLP、語音識別等領域。
2、Spark 的模塊組成
Spark 是包含多個緊密集成的組件,這些組件結合密切並且可以相互調用,這樣我們可以像在平常軟件項目中使用程序庫一樣,組合使用這些的組件。
1)Spark 的模塊組成
-
Spark 基於 Spark Core 建立了 Spark SQL、Spark Streaming、MLlib、GraphX、SparkR 等核心組件;
-
基於這些不同組件又可以實現不同的計算任務;
-
這些計算任務的運行模式有:本地模式、獨立模式、YARN、Mesos 等;
-
Spark 任務的計算可以從 HDFS、HBase、Cassandra 等多種數據源中存取數據。
2)Spark Core
Spark Core 實現了 Spark 基本的核心功能,如下:
-
基礎設施
SparkConf :用於定義 Spark 應用程序的配置信息;
SparkContext :爲 Spark 應用程序的入口,隱藏了底層邏輯,開發人員只需使用其提供的 API 就可以完成應用程序的提交與執行;
SparkRPC :Spark 組件之間的網絡通信依賴於基於 Netty 實現的 Spark RPC 框架;
SparkEnv :爲 Spark 的執行環境,其內部封裝了很多 Spark 運行所需要的基礎環境組件;
ListenerBus :爲事件總線,主要用於 SparkContext 內部各組件之間的事件交互;
MetricsSystem :爲度量系統,用於整個 Spark 集羣中各個組件狀態的監控; -
存儲系統
用於管理 Spark 運行過程中依賴的數據的存儲方式和存儲位置,Spark 的存儲系統首先考慮在各個節點的內存中存儲數據,當內存不足時會將數據存儲到磁盤上,並且內存存儲空間和執行存儲空間之間的邊界也可以靈活控制。 -
調度系統
DAGScheduler :負責創建 job、將 DAG 中的 RDD 劃分到不同 Stage 中、爲 Stage 創建對應的 Task、批量提交 Task 等;
TaskScheduler :負責按照 FIFO 和 FAIR 等調度算法對 Task 進行批量調度; -
計算引擎
主要由內存管理器、任務管理器、Task、Shuffle 管理器等組成。
3)Spark SQL
Spark SQL 是 Spark 用來操作結構化數據的程序包,支持使用 SQL 或者 Hive SQL 或者與傳統的 RDD 編程的數據操作結合的方式來查詢數據,使得分佈式數據的處理變得更加簡單。
4)Spark Streaming
Spark Streaming 提供了對實時數據進行流式計算的 API,支持 Kafka、Flume、TCP 等多種流式數據源。此外,還提供了基於時間窗口的批量流操作,用於對一定時間週期內的流數據執行批量處理。
5)MLlib
Spark MLlib 作爲一個提供常見機器學習(ML)功能的程序庫,包括分類、迴歸、聚類等多種機器學習算法的實現,其簡單易用的 API 接口降低了機器學習的門檻。
6)GraphX
GraphX 用於分佈式圖計算,比如可以用來操作社交網絡的朋友關係圖,能夠通過其提供的 API 快速解決圖計算中的常見問題。
7)SparkR
SparkR 是一個 R 語言包,提供了輕量級的基於 R 語言使用 Spark 的方式,使得基於 R 語言能夠更方便地處理大規模的數據集。
3、Spark 的運行原理
1)Spark 的運行模式
就底層而言,Spark 設計爲可以高效地在一個到數千個計算節點之間伸縮計算。爲了實現這樣的要求,同時獲得最大靈活性,Spark 支持在各種集羣管理器上運行。
Spark 的運行模式主要有:
-
Local 模式 :學習測試使用,分爲 Local 單線程和 Local-Cluster 多線程兩種方式;
-
Standalone 模式 :學習測試使用,在 Spark 自己的資源調度管理框架上運行;
-
ON YARN :生產環境使用,在 YARN 資源管理器框架上運行,由 YARN 負責資源管理,Spark 負責任務調度和計算;
-
ON Mesos :生產環境使用,在 Mesos 資源管理器框架上運行,由 Mesos 負責資源管理,Spark 負責任務調度和計算;
-
On Cloud :運行在 AWS、阿里雲、華爲雲等環境。
2)Spark 的集羣架構
Spark 的集羣架構主要由 Cluster Manager(資源管理器)、Worker (工作節點)、Executor(執行器)、Driver(驅動器)、Application(應用程序) 5 部分組成,如下圖:
-
Cluster Manager :Spark 集羣管理器,主要用於整個集羣資源的管理和分配,有多種部署和運行模式;
-
Worker :Spark 的工作節點,用於執行提交的任務;
-
Executor :真正執行計算任務的一個進程,負責 Task 的運行並且將運行的結果數據保存到內存或磁盤上;
-
Driver :Application 的驅動程序,可以理解爲驅動程序運行中的 main() 函數,Driver 在運行過程中會創建 Spark Context;
-
Application :基於 Spark API 編寫的應用程序,包括實現 Driver 功能的代碼和在集羣中多個節點上運行的 Executor 代碼。
3)Worker 的工作職責
-
通過註冊機制向 Cluster Manager 彙報自身的 CPU 和內存等資源使用信息;
-
在 Master 的指示下,創建並啓動 Executor(真正的計算單元);
-
將資源和任務進一步分配給 Executor 並運行;
-
同步資源信息和 Executor 狀態信息給 Cluster Manager。
4)Driver 的工作職責
Application 通過 Driver 與 Cluster Manager 和 Executor 進行通信。
-
運行 Application 的 main() 函數;
-
創建 SparkContext;
-
劃分 RDD 並生成 DAG;
-
構建 Job 並將每個 Job 都拆分爲多個 Stage,每個 Stage 由多個 Task 構成,也被稱爲 Task Set;
-
與 Spark 中的其他組件進行資源協調;
-
生成併發送 Task 到 Executor。
4、RDD 概念及核心結構
本節將介紹 Spark 中一個抽象的概念——RDD,要學習 Spark 就必須對 RDD 有一個清晰的認知,RDD 是 Spark 中最基本的數據抽象,代表一個不可變、可分區、元素可並行計算的集合。
1)RDD 的概念
RRD 全稱叫做彈性分佈式數據集(Resilient Distributed Dataset),從它的名字中可以拆解出三個概念。
-
Resilient :彈性的,包括存儲和計算兩個方面。RDD 中的數據可以保存在內存中,也可以保存在磁盤中。RDD 具有自動容錯的特點,可以根據血緣重建丟失或者計算失敗的數據;
-
Distributed :RDD 的元素是分佈式存儲的,並且用於分佈式計算;
-
Dataset :本質上還是一個存放元素的數據集。
2)RDD 的特點
RDD 具有自動容錯、位置感知性調度以及可伸縮等特點,並且允許用戶在執行多個查詢時,顯式地將數據集緩存在內存中,後續查詢能夠重用該數據集,這極大地提升了查詢效率。
下面是源碼中對 RDD 類介紹的註釋:
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
-
一組分區(Partition)的列表,其中分區也就是 RDD 的基本組成單位;
-
一個函數會被作用到每個分區上,RDD 的計算是以分區爲單位的;
-
一個 RDD 會依賴其他多個 RDD,它們之間具有依賴關係;
-
可選,對於 K-V 型的 RDD 會有一個分區函數,控制 key 分到哪個 reduce;
-
可選,一個存儲每個分區優先位置的列表。
從源碼對 RDD 的定義中,可以看出 RDD 不僅能表示存有多個元素的數據集,而且還能通過依賴關係推算出這個數據集是從哪來的,在哪裏計算更合適。
3)RDD 的核心結構
在學習 RDD 轉換操作算子之前,根據 RDD 的特點對 RDD 中的核心結構進行一下梳理,這樣對 Spark 的執行原理會有一個更深的理解。
-
分區(Partition):RDD 內部的數據集在邏輯上和物理上都被劃分爲了多個分區(Partition),每一個分區中的數據都可以在單獨的任務中被執行,這樣分區數量就決定了計算的並行度。如果在計算中沒有指定 RDD 中的分區數,那麼 Spark 默認的分區數就是爲 Applicaton 運行分配到的 CPU 核數。
-
分區函數(Partitioner):分區函數不但決定了 RDD 本身的分區數量,也決定了其父 RDD (即上一個衍生它的 RDD)Shuffle 輸出時的分區數量。Spark 實現了基於 HashPartitioner 的和基於 RangePartitoner 的兩種分區函數。需要特別說明的是,只有對 K-V 類型的 RDD 纔會有分區函數。
-
依賴關係:RDD 表示只讀的分區的數據集,如果對 RDD 中的數據進行改動,就只能通過轉化操作,由一個或多個 RDD 計算得到一個新的 RDD,並且這些 RDD 之間存在着前後依賴關係,前面的稱爲父 RDD,後面的稱爲子 RDD。
RDD 之間的依賴可分爲寬依賴和窄依賴。
當計算過程中出現異常情況導致部分分區數據丟失時,Spark 可以通過這種依賴關係從父 RDD 中重新計算丟失的分區數據,而不需要對 RDD 中的所有分區全部重新計算。 -
Stage:當 Spark 執行作業時,會根據 RDD 之間的依賴關係,按照寬窄依賴生成一個最優的執行計劃。如果 RDD 之間爲窄依賴,則會被劃到一個 Stage 中;如果 RDD 之間爲寬依賴,則會被劃分到不同的 Stage 中,這樣做的原因就是每個 Stage 內的 RDD 都儘可能在各個節點上並行地被執行,以提高運行效率。
-
優先列表(PreferredLocation):用於存儲每個分區優先位置的列表,對於每個 HDFS 文件來說,就是保存下每個分區所在 block 的位置。按照 “移動數據不如移動計算” 的理念,Spark 在執行任務調度時會優先選擇有存儲數據的 Worker 節點進行任務運算。
-
CheckPoint:是 Spark 提供的一種基於快照的緩存機制,如果在任務運算中,多次使用同一個 RDD,可以將這個 RDD 進行緩存處理。這樣,該 RDD 只有在第一次計算時會根據依賴關係得到分區數據,在後續使用到該 RDD 時,直接從緩存處取而不是重新進行計算。
如下圖,對 RDD-1 做快照緩存處理,那麼當 RDD-n 在用到 RDD-1 數據時,無需重新計算 RDD-1,而是直接從緩存處取數重算。
此外,Spark 還提供了另一種緩存機制 Cache,其中的數據是由 Executor 管理的,當 Executor 消失時,Cache 緩存的數據也將會消失。而 CheckPoint 是將數據保存到磁盤或者 HDFS 中的,當任務運行錯誤時,Job 會從 CheckPoint 緩存位置取數繼續計算。
5、Spark RDD 的寬窄依賴關係
1)RDD 的依賴關係
在 Spark 中,RDD 分區的數據不支持修改,是隻讀的。如果想更新 RDD 分區中的數據,那麼只能對原有 RDD 進行轉化操作,也就是在原來 RDD 基礎上創建一個新的 RDD。
那麼,在整個任務的運算過程中,RDD 的每次轉換都會生成一個新的 RDD,因此 RDD 們之間會產生前後依賴的關係。
說白了,相當於對原始 RDD 分區數據的整個運算進行了拆解,當運算中出現異常情況導致分區數據丟失時,Spark 可以還通過依賴關係從上一個 RDD 中重新計算丟失的數據,而不是對最開始的 RDD 分區數據重新進行計算。
在 RDD 的依賴關係中,我們將上一個 RDD 稱爲父 RDD,下一個 RDD 稱爲子 RDD。
2)如何區分寬窄依賴
RDD 們之間的依賴關係,也分爲寬依賴和窄依賴。
-
寬依賴 :父 RDD 中每個分區的數據都可以被子 RDD 的多個分區使用(涉及到了 shuffle);
-
窄依賴 :父 RDD 中每個分區的數據最多隻能被子 RDD 的一個分區使用。
說白了,就是看兩個 RDD 的分區之間,是不是一對一的關係,若是則爲窄依賴,反之則爲寬依賴。
有個形象的比喻,如果父 RDD 中的一個分區有多個孩子(被多個分區依賴),也就是超生了,就爲寬依賴;反之,如果只有一個孩子(只被一個分區依賴),那麼就爲窄依賴。
常見的寬窄依賴算子:
-
寬依賴的算子 :join(非 hash-partitioned)、groupByKey、partitionBy;
-
窄依賴的算子 :map、filter、union、join(hash-partitioned)、mapPartitions;
3)爲何設計要寬窄依賴
從上面的分析,不難看出,在窄依賴中子 RDD 的每個分區數據的生成操作都是可以並行執行的,而在寬依賴中需要所有父 RDD 的 Shuffle 結果完成後再被執行。
在 Spark 執行作業時,會按照 Stage 劃分不同的 RDD,生成一個完整的最優執行計劃,使每個 Stage 內的 RDD 都儘可能在各個節點上並行地被執行。
如下圖,Stage3 包含 Stage1 和 Stage2,其中, Stage1 爲窄依賴,Stage2 爲寬依賴。
因此,劃分寬窄依賴也是 Spark 優化執行計劃的一個重要步驟,寬依賴是劃分執行計劃中 Stage 的依據,對於寬依賴必須要等到上一個 Stage 計算完成之後才能計算下一個階段。
6、Spark RDD 的轉換操作與行動操作
1)RDD 的創建
Spark 提供了兩種創建 RDD 的方式:對一個集合進行並行化操作和利用外部數據集生成 RDD 。
對一個集合進行並行化操作
Spark 創建 RDD 最簡單的方式就是把已經存在的集合傳給 parallelize() 方法,不過,這種方式在開發中並不常用,畢竟需要將整個的數據集先放到一個節點內存中。
利用 parallelize() 方法將已經存在的一個集合轉換爲 RDD,集合中的數據也會被複制到 RDD 中並參與並行計算。
val lines = sc.parallelize(Arrays.asList(1,2,3,4,5),n)
其中,n 爲並行集合的分區數量,Spark 將爲集羣的每個分區都運行一個任務。該參數設置太小不能很好地利用 CPU,設置太大又會導致任務阻塞等待,一般 Spark 會嘗試根據集羣的 CPU 核數自動設置分區數量。
利用外部數據集生成 RDD
在開發中,Spark 創建 RDD 最常用的一個方式就是從 Hadoop 或者其他外部存儲系統創建 RDD,包括本地文件系統、HDFS、Cassandra、HBase、S3 等。
通過 SparkContext 的 textFile() 方法來讀取文本文件創建 RDD 的代碼,如下:
val lines = sc.textFile("../temp.txt")
其中,textFile() 方法的 url 參數可以是本地文件或路徑、HDFS 路徑等,Spark 會讀取該路徑下所有的文件,並將其作爲數據源加載到內存生成對應的 RDD。
當然, RDD 也可以在現有 RDD 的基礎上經過算子轉換生成新的 RDD,這是接下來要講的 RDD 算子轉換的內容,Spark RDD 支持兩種類型的操作:轉換操作 (Transformation) 和行動操作 (Action)。
2)RDD 的轉換操作
轉換操作是指從現有 RDD 的基礎上創建新的 RDD,是返回一個新 RDD 的操作。轉換操作是惰性求值的,也就是不會立即觸發執行實際的轉換,而是先記錄 RDD 之間的轉換關係,只有當觸發行動操作時纔會真正地執行轉換操作,並返回計算結果。
舉個栗子,有一個日誌文件 log.txt,需要從裏面若干條信息中,篩選出其中錯誤的報警信息,我們可以用轉化操作 filter() 即可完成,代碼如下:
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
其中,textFile() 方法定義了名爲 inputRDD 的 RDD,但是此時 log.txt 文件並沒有加載到內存中,僅僅是指向文件的位置。然後通過 filter() 方法進行篩選定義了名爲 errorsRDD 的轉換 RDD,同樣也屬於惰性計算,並沒有立即執行。
3)RDD 的行動操作
瞭解瞭如何通過轉換操作從已有的 RDD 中創建一個新的 RDD,但有時我們希望可以對數據集進行實際的計算。行動操作就是接下來要講的第二種 RDD 操作,它會強制執行那些求值必須用到的 RDD 的轉換操作,並將最終的計算結果返回給 Driver 程序,或者寫入到外部存儲系統中。
繼續剛纔的栗子,我們需要將上一步統計出來的報警信息的數量打印出來,我們可以藉助 count() 方法進行計數。
val countRDD = errorsRDD.count()
其中,count() 可以觸發實際的計算,強制執行前面步驟中的轉換操作。實際上,Spark RDD 會將 RDD 計算分解到不同的 Stage 並在不同的節點上進行運算,每個節點都會運行 count() 結果,所有運算完成之後會聚合一個結果返回給 Driver 程序。
如果分不清楚給定的一個 RDD 操作方法是屬於轉換操作還是行動操作,去看下它的返回類型,轉換操作返回的是 RDD 類型,而行動操作則返回的是其他的數據類型。
4)惰性求值
前面,我們多次提到轉換操作都是惰性求值,這也就意味着調用的轉換操作(textFile、filter 等)時,並不會立即去執行,而是 Spark 會記錄下所要求執行的操作的相關信息。
因此,我們對 RDD 的理解應該更深一步,RDD 不僅可以看作是一個存放分佈式數據的數據集,也可以當作是通過轉換操作構建出來的、記錄如何計算數據的指令列表。
惰性操作避免了所有操作都進行一遍 RDD 運算,它可以將很多操作合併在一起,來減少計算數據的步驟,提高 Spark 的運算效率。
7、Spark RDD 中常用的操作算子
1)向 Spark 傳遞函數
Spark API 依賴 Driver 程序中的傳遞函數完成在集羣上執行 RDD 轉換並完成數據計算。在 Java API 中,函數所在的類需要實現 org.apache.spark.api.java.function 包中的接口。
Spark 提供了 lambda 表達式和 自定義 Function 類兩種創建函數的方式。前者語法簡潔、方便使用;後者可以在一些複雜應用場景中自定義需要的 Function 功能。
舉個栗子,求 RDD 中數據的平方,並只保留不爲 0 的數據。
可以用 lambda 表達式簡明地定義 Function 實現,代碼如下:
val input = sc.parallelize(List(-2,-1,0,1,2))
val rdd1 = input.map(x => x * x)
val rdd2 = rdd1.filter(x => x != 0 )
首先用 map() 對 RDD 中所有的數據進行求平方,然後用到 filter() 來篩選不爲 0 的數據。
其中,map() 和 filter() 就是我們最常用的轉換算子,map() 接收了 一個 lambda 表達式定義的函數,並把這個函數運用到 input 中的每一個元素,最後把函數計算後的返回結果作爲結果 rdd1 中對應元素的值。
同樣,filter() 也接收了一個 lambda 表達式定義的函數,並將 rdd1 中滿足該函數的數據放入到了新的 rdd2 中返回。
Spark 提供了很豐富的處理 RDD 數據的操作算子,比如使用 distinct() 還可以繼續對 rdd2 進行去重處理。
如果需要對 RDD 中每一個元素處理後生成多個輸出,也有相應的算子,比如 flatMap()。它和 map() 類似,也是將輸入函數應用到 RDD 中的每個元素,不過返回的不是一個元素了,而是一個返回值序列的迭代器。
最終得到的輸出是一個包含各個迭代器可訪問的所有元素的 RDD,flatMap() 最經典的一個用法就是把輸入的一行字符串切分爲一個個的單詞。
舉個栗子,將行數據切分成單詞,對比下 map() 與 flat() 的不同。
val lines = sc.parallelize(List("hello spark","hi,flink"))
val rdd1 = lines.map(line => line.split(","))
val rdd2 = lines.flatMap(line => line.split(","))
可以看到,把 lines 中的每一個 line,使用所提供的函數執行一遍,map() 輸出的 rdd1 中仍然只有兩個元素;而 flatMap() 輸出的 rdd2 則是將原 RDD 中的數據 “拍扁” 了,這樣就得到了一個由各列表中元素組成的 RDD,而不是一個由列表組成的 RDD。
2) RDD 的轉換算子
Spark 中的轉換算子主要用於 RDD 之間的轉化和數據處理,常見的轉換算子具體如下:(發送推文預覽時發現表格內容展示不全,就乾脆截圖了
3)RDD 的行動算子
Spark 中行動算子主要用於對分佈式環境中 RDD 的轉化操作結果進行統一地執行處理,比如結果收集、數據保存等,常用的行動算子具體如下:
8、Spark 的共享變量之累加器和廣播變量
本節將介紹下 Spark 編程中兩種類型的共享變量:累加器和廣播變量。
簡單說,累加器是用來對信息進行聚合的,而廣播變量則是用來高效分發較大對象的。
1) 閉包的概念
在講共享變量之前,我們先了解下啥是閉包,代碼如下。
var n = 1
val func = (i:Int) => i + n
函數 func 中有兩個變量 n 和 i ,其中 i 爲該函數的形式參數,也就是入參,在 func 函數被調用時, i 會被賦予一個新的值,我們稱之爲綁定變量 (bound variable)。而 n 則是定義在了函數 func 外面的,該函數並沒有賦予其任何值,我們稱之爲自由變量 (free variable)。
像 func 函數這樣,返回結果依賴於聲明在函數外部的一個或多個變量,將這些自由變量捕獲並構成的封閉函數,我們稱之爲 “閉包”。
先看一個累加求和的栗子,如果在集羣模式下運行以下代碼,會發現結果並非我們所期待的累計求和。
var sum = 0
val arr = Array(1,2,3,4,5)
sc.parallelize(arr).foreach(x => sum + x)
println(sum)
sum 的結果爲 0,導致這個結果的原因就是存在閉包。
在集羣中 Spark 會將對 RDD 的操作處理分解爲 Tasks ,每個 Task 由 Executor 執行。而在執行之前,Spark 會計算 task 的閉包 (也就是 foreach() )。閉包會被序列化併發送給每個 Executor,但是發送給 Executor 的是副本,所以在 Driver 上輸出的依然是 sum 本身。
如果想對 sum 變量進行更新,則就要用到接下來我們要講的累加器。
2)累加器的原理
累加器是對信息進行聚合的,通常在向 Spark 傳遞函數時,比如使用 map() 或者 filter() 傳條件時,可以使用 Driver 中定義的變量,但是集羣中運行的每個任務都會得到這些變量的一份新的副本,然而,正如前面所述,更新這些副本的值,並不會影響到 Driver 中對應的變量。
累加器則突破了這個限制,可以將工作節點中的值聚合到 Driver 中。它的一個典型用途就是對作業執行過程中的特定事件進行計數。
舉個栗子,給了一個日誌記錄,需要統計這個文件中有多少空行。
val sc = new SparkContext(...)
val logs = sc.textFile(...)
val blanklines = sc.accumulator(0)
val callSigns = logs.flatMap(line => {
if(line == ""){
blanklines += 1
}
line.split("")
})
callSigns.count()
println("日誌中的空行數爲:" + blanklines.value)
總結下累加器的使用,首先 Driver 調用了 SparkContext.accumulator(initialValue) 方法,創建一個名爲 blanklines 且初始值爲 0 的累加器。然後在遇到空行時,Spark 閉包裏的執行器代碼就會對其 +1 。執行完成之後,Driver 可以調用累加器的 value 屬性來訪問累加器的值。
需要說明的是,只有在行動算子 count() 運行之後,纔可以 println 出正確的值,因爲我們之前講過 flatMap() 是惰性計算的,只有遇到行動操作之後纔會出發強制執行運算進行求值。
另外,工作節點上的任務是不可以訪問累加器的值,在這些任務看來,累加器是一個只寫的變量。
對於累加器的使用,不僅可以進行數據的 sum 加法,也可以跟蹤數據的最大值 max、最小值 min 等。
3)廣播變量的原理
前面說了,Spark 會自動把閉包中所有引用到的自由變量發送到工作節點上,那麼每個 Task 的閉包都會持有自由變量的副本。如果自由變量的內容很大且 Task 很多的情況下,爲每個 Task 分發這樣的自由變量的代價將會巨大,必然會對網絡 IO 造成壓力。
廣播變量則突破了這個限制,不是把變量副本發給所有的 Task ,而是將其分發給所有的工作節點一次,這樣節點上的 Task 可以共享一個變量副本。
Spark 使用的是一種高效的類似 BitTorrent 的通信機制,可以降低通信成本。廣播的數據只會被髮動各個節點一次,除了 Driver 可以修改,其他節點都是隻讀,並且廣播數據是以序列化形式緩存在系統中的,當 Task 需要數據時對其反序列化操作即可。
在使用中,Spark 可以通過調用 SparkContext.broadcast(v) 創建廣播變量,並通過調用 value 來訪問其值,舉例代碼如下:
val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/rUy83-XVpmT2uW1iE0-VCA