Spark 技術原理
Spark 概述
Spark 簡介:
Spark 最初由美國加州伯克利大學(UCBerkeley)的 AMP 實驗室於 2009 年開發,是基於內存計算的大數據並行計算框架,可用於構建大型的、低延遲的數據分析應用程序。
2013 年 Spark 加入 Apache 孵化器項目後發展迅猛,如今已成爲 Apache 軟件基金會最重要的三大分佈式計算系統開源項目之一(Hadoop、Spark、Storm)。
Spark 在 2014 年打破了 Hadoop 保持的基準排序紀錄
-
Spark/206 個節點 / 23 分鐘 / 100TB 數據
-
Hadoop/2000 個節點 / 72 分鐘 / 100TB 數據
-
Spark 用十分之一的計算資源,獲得了比 Hadoop 快 3 倍的速度。
Spark 具有如下幾個主要特點:
-
運行速度快:使用 DAG 執行引擎以支持循環數據流與內存計算。
-
容易使用:支持使用 Scala、Java、Python 和 R 語言進行編程,可以通過 Spark Shell 進行交互式編程。
-
通用性:Spark 提供了完整而強大的技術棧,包括 SQL 查詢、流式計算、機器學習和圖算法組件。
-
運行模式多樣:可運行於獨立的集羣模式中,可運行於 Hadoop 中,也可運行於 Amazon EC2 等雲環境中,並且可以訪問 HDFS、Cassandra、HBase、Hive 等多種數據源。
特點
圖:Spark 的特點
Spark 如今已吸引了國內外各大公司的注意,如騰訊、淘寶、百度、亞馬遜等公司均不同程度地使用了 Spark 來構建大數據分析應用,並應用到實際的生產環境中。
Spark 與 hadoop
圖:谷歌應用趨勢: Spark 與 Hadoop 對比
Spark 是什麼?
-
Spark 是一個基於內存的分佈式批處理引擎。
-
由 AMP LAB 貢獻到 Apache 社區的開源項目,是 AMP 大數據棧的基礎組件。
-
Spark 是一站式解決方案,集批處理、實時流計算、交互式查詢、圖計算與機器學習與一體。
Spark 做什麼?
-
數據處理(Data Processing):可以用來快速處理數據,兼具容錯性和可擴展性。
-
迭代計算(Iterrative Computation):支持迭代計算,有效應對多步數據處理邏輯。
-
數據挖掘(Data Mining):在還海量數據的基礎上進程複雜的挖掘分析,可支持各種數據挖掘和機器學習算法。
Spark 適用場景:
-
數據處理, ETL(抽取、轉換、加載)
-
機器學習。如:可用於自動判斷淘寶買家的評論是好評還是差評。
-
交互式分析:可用於查詢 Hive 數據倉庫。
-
特別使用與迭代計算,數據重複利用場景。
-
流計算:流處理可用於頁面點擊瀏覽分析,推薦系統,輿情分析等實時業務。
需要反覆操作的次數越多,所需讀取的數據量越大,收益越大。
Scala 簡介:
Scala 是一門現代的多範式編程語言,運行於 Java 平臺(JVM,Java 虛擬機),併兼容現有的 Java 程序。
Scala 的特性:
-
Scala 具備強大的併發性,支持函數式編程,可以更好地支持分佈式系統。
-
Scala 語法簡潔,能提供優雅的 API
Scala 兼容 Java,運行速度快,且能融合到 Hadoop 生態圈中。Scala 是 Spark 的主要編程語言,但 Spark 還支持 Java、Python、R 作爲編程語言 Scala 的優勢是提供了 REPL(Read-Eval-Print Loop,交互式解釋器),提高程序開發效率。
Spark 與 Hadoop 的對比:
對比 Hadoop:
-
性能上提升高於 100 倍。
-
Spark 的中間數據存放在內存中,對於迭代運算的效率更高,進行批處理時更高效。
-
更低的延時。
-
Spark 提供更多的數據操作類型,編程模型比 Hadoop 更靈活,開發效率更高。
-
更高的容錯能力(血統機制)。
Hadoop 存在如下一些缺點:
-
表達能力有限
-
磁盤 IO 開銷大
-
延遲高
-
任務之間的銜接涉及 IO 開銷
-
在前一個任務執行完成之前,其他任務就無法開始,難以勝任複雜、多階段的計算任務
Spark 在借鑑 Hadoop MapReduce 優點的同時,很好地解決了 MapReduce 所面臨的問題。
相比於 Hadoop MapReduce,Spark 主要具有如下優點:
-
Spark 的計算模式也屬於 MapReduce,但不侷限於 Map 和 Reduce 操作,還提供了多種數據集操作類型,編程模型比 Hadoop MapReduce 更靈活。
-
Spark 提供了內存計算,可將中間結果放到內存中,對於迭代運算效率更高 Spark 基於 DAG 的任務調度執行機制,要優於 Hadoop MapReduce 的迭代執行機制。
Sprak
圖:Hadoop 與 Spark 的執行流程對比
使用 Hadoop 進行迭代計算非常耗資源。Spark 將數據載入內存後,之後的迭代計算都可以直接使用內存中的中間結果作運算,避免了從磁盤中頻繁讀取數據。
時間
圖:Hadoop 與 Spark 執行邏輯迴歸的時間對比
Spark 生態系統
##Spark 生態系統的原因:
在實際應用中,大數據處理主要包括以下三個類型:
-
複雜的批量數據處理:通常時間跨度在數十分鐘到數小時之間。
-
基於歷史數據的交互式查詢:通常時間跨度在數十秒到數分鐘之間。
-
基於實時數據流的數據處理:通常時間跨度在數百毫秒到數秒之間。
當同時存在以上三種場景時,就需要同時部署三種不同的軟件
- 比如: MapReduce / Impala / Storm
這樣做難免會帶來一些問題:
-
不同場景之間輸入輸出數據無法做到無縫共享,通常需要進行數據格式的轉換。
-
不同的軟件需要不同的開發和維護團隊,帶來了較高的使用成本。
-
比較難以對同一個集羣中的各個系統進行統一的資源協調和分配。
Spark 設計的理念:
Spark 的設計遵循 “一個軟件棧滿足不同應用場景” 的理念,逐漸形成了一套完整的生態系統。
既能夠提供內存計算框架,也可以支持 SQL 即席查詢、實時流式計算、機器學習和圖計算等。
Spark 可以部署在資源管理器 YARN 之上,提供一站式的大數據解決方案。
因此,Spark 所提供的生態系統足以應對上述三種場景,即同時支持批處理、交互式查詢和流數據處理。
Spark 生態系統:
生態系統
圖:BDAS 架構
Spark 生態系統已經成爲伯克利數據分析軟件棧 BDAS(Berkeley Data Analytics Stack)的重要組成部分。
Spark 的生態系統主要包含了 Spark Core、Spark SQL、Spark Streaming、MLLib 和 GraphX 等組件。
Spark 生態系統組件的應用場景:
Spark 組件
Spark 運行架構
Spark 架構
圖:Spark 體系架構
基本概念:
-
RDD:是 Resillient Distributed Dataset(彈性分佈式數據集)的簡稱,是分佈式內存的一個抽象概念,提供了一種高度受限的共享內存模型。
-
DAG:是 Directed Acyclic Graph(有向無環圖)的簡稱,反映 RDD 之間的依賴關係。
-
Executor:是運行在工作節點(WorkerNode)的一個進程,負責運行 Task。
-
Application:用戶編寫的 Spark 應用程序。
-
Task:運行在 Executor 上的工作單元。
-
Job:一個 Job 包含多個 RDD 及作用於相應 RDD 上的各種操作。
-
Stage:是 Job 的基本調度單位,一個 Job 會分爲多組 Task,每組 Task 被稱爲 Stage,或者也被稱爲 TaskSet,代表了一組關聯的、相互之間沒有 Shuffle 依賴關係的任務組成的任務集。
Spark 架構設計:
架構設計
Spark 運行架構包括集羣資源管理器(Cluster Manager)、運行作業任務的工作節點(Worker Node)、每個應用的任務控制節點(Driver)和每個工作節點上負責具體任務的執行進程(Executor)。
資源管理器可以自帶或 Mesos 或 YARN。(在華爲 FusionInsight 中只能用 YARN。)
與 Hadoop MapReduce 計算框架相比,Spark 所採用的 Executor 有兩個優點:
-
一是利用多線程來執行具體的任務,減少任務的啓動開銷。
-
二是 Executor 中有一個 BlockManager 存儲模塊,會將內存和磁盤共同作爲存儲設備,有效減少 IO 開銷。
關係
圖:Spark 中各種概念之間的相互聯繫
一個 Application 由一個 Driver 和若干個 Job 構成,一個 Job 由多個 Stage 構成,一個 Stage 由多個沒有 Shuffle 關係的 Task 組成.
當執行一個 Application 時,Driver 會向集羣管理器申請資源,啓動 Executor,並向 Executor 發送應用程序代碼和文件,然後在 Executor 上執行 Task,運行結束後,執行結果會返回給 Driver,或者寫到 HDFS 或者其他數據庫中。
Spark 運行基本流程:
Spark 運行流程圖
圖:Spark 運行基本流程圖
Spark 運行基本流程如下:
-
首先爲應用構建起基本的運行環境,即由 Driver 創建一個 SparkContext,進行資源的申請、任務的分配和監控。
-
資源管理器爲 Executor 分配資源,並啓動 Executor 進程。
-
SparkContext 根據 RDD 的依賴關係構建 DAG 圖,DAG 圖提交給 DAGScheduler 解析成 Stage,然後把一個個 TaskSet 提交給底層調度器 TaskScheduler 處理;Executor 向 SparkContext 申請 Task,Task Scheduler 將 Task 發放給 Executor 運行,並提供應用程序代碼。
-
Task 在 Executor 上運行,把執行結果反饋給 TaskScheduler,然後反饋給 DAGScheduler,運行完畢後寫入數據並釋放所有資源。
總體而言,Spark 運行架構具有以下特點:
-
每個 Application 都有自己專屬的 Executor 進程,並且該進程在 Application 運行期間一直駐留。Executor 進程以多線程的方式運行 Task。
-
Spark 運行過程與資源管理器無關,只要能夠獲取 Executor 進程並保持通信即可。
-
Task 採用了數據本地性和推測執行等優化機制。
Spark on Yarn 的運行流程:
Spark on client
圖:Spark on Yarn-client 的運行流程
spark on clter
圖:Spark on Yarn-cluster 的運行流程
Yarn-client 和 Yarn-cluster 的區別:
-
Yarn-client 和 Yarn-cluster 主要區別是 Application Master 進程的區別。
-
Yarn-client 適合測試,Yarn-cluster 適合生成。
-
Yarn-client 任務提交節點宕機,整個任務失敗。Yarn-cluster 不會。
RDD 運行原理
設計背景:
許多迭代式算法(比如機器學習、圖算法等)和交互式數據挖掘工具,共同之處是,不同計算階段之間會重用中間結果。
目前的 MapReduce 框架都是把中間結果寫入到 HDFS 中,帶來了大量的數據複製、磁盤 IO 和序列化開銷。
RDD 就是爲了滿足這種需求而出現的,它提供了一個抽象的數據架構,我們不必擔心底層數據的分佈式特性,只需將具體的應用邏輯表達爲一系列轉換處理,不同 RDD 之間的轉換操作形成依賴關係,可以實現管道化,避免中間數據存儲。
RDD 概念:
一個 RDD 就是一個分佈式對象集合,本質上是一個只讀的分區記錄集合,每個 RDD 可分成多個分區,每個分區就是一個數據集片段,並且一個 RDD 的不同分區可以被保存到集羣中不同的節點上,從而可以在集羣中的不同節點上進行並行計算。
RDD 提供了一種高度受限的共享內存模型,即 RDD 是隻讀的記錄分區的集合,不能直接修改,只能基於穩定的物理存儲中的數據集創建 RDD,或者通過在其他 RDD 上執行確定的轉換操作(如 map、join 和 group by)而創建得到新的 RDD。
RDD 提供了一組豐富的操作以支持常見的數據運算,分爲 “動作”(Action)和 “轉換”(Transformation)兩種類型。
RDD 提供的轉換接口都非常簡單,都是類似 map、filter、groupBy、join 等粗粒度的數據轉換操作,而不是針對某個數據項的細粒度修改(不適合網頁爬蟲)。
表面上 RDD 的功能很受限、不夠強大,實際上 RDD 已經被實踐證明可以高效地表達許多框架的編程模型(比如 MapReduce、SQL、Pregel)。
Spark 用 Scala 語言實現了 RDD 的 API,程序員可以通過調用 API 實現對 RDD 的各種操作。
RDD 典型的執行過程如下:
-
RDD 讀入外部數據源進行創建。
-
RDD 經過系列的轉換(Transformation)操作,每一次都會產生不同的 RDD,供給下一個轉換操作使用。
-
最後一個 RDD 經過 “動作” 操作進行轉換,並輸出到外部數據源。
這一系列處理稱爲一個 Lineage(血緣關係),即 DAG 拓撲排序的結果。
優點:惰性調用、管道化、避免同步等待、不需要保存中間結果、每次操作變得簡單。
過程示例
圖:RDD 執行過程的一個示例
RDD 特性:
Spark 採用 RDD 以後能夠實現高效計算的原因主要在於:
(1)高效的容錯性
-
現有容錯機制:數據複製或者記錄日誌。
-
RDD:血緣關係、重新計算丟失分區、無需回滾系統、重算過程在不同節點之間並行、只記錄粗粒度的操作。
(2)中間結果持久化到內存,數據在內存中的多個 RDD 操作之間進行傳遞,避免了不必要的讀寫磁盤開銷 。
(3)存放的數據可以是 Java 對象,避免了不必要的對象序列化和反序列化。
RDD 之間的依賴關係:
依賴關係
圖:窄依賴於寬依賴的區別
窄依賴表現爲一個父 RDD 的分區對應於一個子 RDD 的分區或多個父 RDD 的分區對應於一個子 RDD 的分區.
寬依賴則表現爲存在一個父 RDD 的一個分區對應一個子 RDD 的多個分區。
Stage 的劃分:
Spark 通過分析各個 RDD 的依賴關係生成了 DAG,再通過分析各個 RDD 中的分區之間的依賴關係來決定如何劃分 Stage,具體劃分方法是:
-
在 DAG 中進行反向解析,遇到寬依賴就斷開
-
遇到窄依賴就把當前的 RDD 加入到 Stage 中
-
將窄依賴儘量劃分在同一個 Stage 中,可以實現流水線計算
Stage 劃分
圖:根據 RDD 分區的依賴關係劃分 Stage
如上圖,被分成三個 Stage,在 Stage2 中,從 map 到 union 都是窄依賴,這兩步操作可以形成一個流水線操作。
流水線操作實例:
分區 7 通過 map 操作生成的分區 9,可以不用等待分區 8 到分區 10 這個 map 操作的計算結束,而是繼續進行 union 操作,得到分區 13,這樣流水線執行大大提高了計算的效率。
Stage 的類型包括兩種:ShuffleMapStage 和 ResultStage,具體如下:(1)ShuffleMapStage:不是最終的 Stage,在它之後還有其他 Stage,所以,它的輸出一定需要經過 Shuffle 過程,並作爲後續 Stage 的輸入;這種 Stage 是以 Shuffle 爲輸出邊界,其輸入邊界可以是從外部獲取數據,也可以是另一個 ShuffleMapStage 的輸出,其輸出可以是另一個 Stage 的開始;在一個 Job 裏可能有該類型的 Stage,也可能沒有該類型 Stage;
(2)ResultStage:最終的 Stage,沒有輸出,而是直接產生結果或存儲。這種 Stage 是直接輸出結果,其輸入邊界可以是從外部獲取數據,也可以是另一個 ShuffleMapStage 的輸出。在一個 Job 裏必定有該類型 Stage。因此,一個 Job 含有一個或多個 Stage,其中至少含有一個 ResultStage。
RDD 運行的過程:
RDD 運行過程
圖;RDD 在 Spark 中的運行過程
通過上述對 RDD 概念、依賴關係和 Stage 劃分的介紹,結合之前介紹的 Spark 運行基本流程,再總結一下 RDD 在 Spark 架構中的運行過程:(1)創建 RDD 對象;(2)SparkContext 負責計算 RDD 之間的依賴關係,構建 DAG;(3)DAGScheduler 負責把 DAG 圖分解成多個 Stage,每個 Stage 中包含了多個 Task,每個 Task 會被 TaskScheduler 分發給各個 WorkerNode 上的 Executor 去執行。
RDD 的算子:
Transformation:
-
Transformation 是通過轉換從一個或多個 RDD 生成新的 RDD,該操作時 Lazy 的,當調用 action 算子,才發起 job。
-
典型算子:map、flatMap、filter、reduceByKey。
Action:
-
當代碼調用該類型算子時,立即啓動 job。
-
典型算子:take、count、savaAsTextFile 等。
Spark SQL
從 Shark 說起:
Shark 即 Hive on Spark,爲了實現與 Hive 兼容,Shark 在 HiveQL 方面重用了 Hive 中 HiveQL 的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,可以近似認爲僅將物理執行計劃從 MapReduce 作業替換成了 Spark 作業,通過 Hive 的 HiveQL 解析,把 HiveQL 翻譯成 Spark 上的 RDD 操作。
Shark 的設計導致了兩個問題:
-
一是執行計劃優化完全依賴於 Hive,不方便添加新的優化策略;
-
二是因爲 Spark 是線程級並行,而 MapReduce 是進程級並行,因此,Spark 在兼容 Hive 的實現上存在線程安全問題,導致 Shark 不得不使用另外一套獨立維護的打了補丁的 Hive 源碼分支。
流程
圖:Hive 中 SQL 查詢的 MapReduce 作業轉換過程
Spark SQL 設計:
Spark SQL 是 Spark 中用於結構化數據處理的模塊。在 Spark 應用中,可以無縫的使用 SQL 語句亦或是 DataFrame APi 對結構化數據進程查詢。
圖
Spar Sql
圖:Spark SQL 架構
Spark SQL 在 Hive 兼容層面僅依賴 HiveQL 解析、Hive 元數據,也就是說,從 HQL 被解析成抽象語法樹(AST)起,就全部由 Spark SQL 接管了。Spark SQL 執行計劃生成和優化都由 Catalyst(函數式關係查詢優化框架)負責。
支持
圖:Spark SQL 支持的數據格式和編程語言
Spark SQL 增加了 SchemaRDD(即帶有 Schema 信息的 RDD),使用戶可以在 Spark SQL 中執行 SQL 語句,數據既可以來自 RDD,也可以是 Hive、HDFS、Cassandra 等外部數據源,還可以是 JSON 格式的數據。
Spark SQL 目前支持 Scala、Java、Python 三種語言,支持 SQL-92 規範。
Dataset 簡介:
DataSet 是一個由特定域的對象組成的強類型集合,可通過功能或關係操作並行轉換其中的對象。
DataSet 以 Catalyst 邏輯執行計劃表示,並且數據以編碼的二進制形式存儲,不需要反序列化就可以執行 sort、filter、shuffle 等操作。
DataSet 是 “懶惰的”,只在執行 cation 操作時觸發計算。當執行 action 操作時,Spark 用查詢優化程序來優化邏輯計劃,並生成一個高效的並行分佈式的物理計劃。
DataFrame 介紹:
DateFrame:指定列名稱的 DataSet。DataFream 是 Dataset[Row] 的特例。
RDD、DataFrame 與 Dataset:
RDD:
-
優點:類型安全,面向對象。
-
缺點:序列化和反序列化的性能開銷大;GC 的性能開銷,頻繁的創建和銷燬對象,勢必會增加 GC。
DataFrame:
-
優點:自帶 scheme 信息,降低序列化反序列化開銷。
-
缺點:不是面向對象的。編譯器不安全。
Dataset 的特點:
-
快:大多數場景下,性能優於 RDD;Encoders(編譯器)優於 Kryo 或者 Java 序列號;避免了不必要的格式轉換。
-
類型安全:類似於 RDD,函數儘可能編譯時安全。
-
和 DataFrame,RDD 互相轉換。
Dataset 具有 RDD 和 DataFrame 的有點,又避免它們的缺點。
Spark SQL vs Hive:
區別:
-
Spark SQL 的執行引擎爲 Spark core,HIve 默認執行引擎爲 MapReduce。
-
Spark SQL 的執行速度是 Hive 的 10-100 倍。
-
Spark SQL 不支持 buckets,Hive 支持。
聯繫:
-
Spark SQL 依賴於 Hive 的元數據。
-
Spark SQL 兼容絕大部分 Hive 的語法和函數。
-
Spark SQL 可以使用 HIve 的自定義函數。
Spark Structured Streaming
Structured Streaming 概述:
Structured Streaming 是構建在 Spark SQL 引擎上的流式數據處理引擎。可以像使用靜態 RDD 數據那樣編寫流式計算過程。當流數據連續不斷的產生時,SPark SQL 將會增量的、持續不斷的處理這些數據,並將結果更新到結果集中。如下圖:
流數據
圖:Structured Streaming
Structured Streaming 計算模型:
模型
圖:計算模型圖
模型示例
圖:計算模型示例
每一次計算後將結果更新到數據集中。
Spark Streaming
概述:
Spark Streaming 是 Spark 核心 API 的一個擴展,一個實時計算框架。具有可擴展性、高吞吐量、可容錯性等特定。
Spark Streaming
圖:Spark Streaming 過程示意圖
Spark Streaming 微批處理:
Spark Streaming 計算基於 DStream,將流式計算分解成一系列短小的批處理作業。Spark 引擎將數據生成最終結果數據。
批處理
圖:批處理示意圖
使用 DStream 從 Kafka 和 HDFS 等源獲取連續的數據流,DStreams 由一系列連續的 RDD 組成,每個 RDD 包含確定時間間隔的數據,任何對 DStreams 的操作都轉換成對 RDD 的操作。
Spark Streaming 容錯機制:
Spark Streaming 本質仍是基於 RDD 計算,當 RDD 的某些 partiton 丟失,可以通過 RDD 的血統機制重新恢復丟失的 RDD。
Spark 的部署和應用方式
Spark 三種部署方式:
Spark 支持三種不同類型的部署方式,包括:
-
Standalone(類似於 MapReduce1.0,slot 爲資源分配單位)
-
Spark on Mesos(和 Spark 有血緣關係,更好支持 Mesos)
-
Spark on YARN
從 Hadoop+Storm 架構轉向 Spark 架構:
案例
圖:採用 Hadoop+Storm 部署方式的一個案例
-
用 Spark 架構具有如下優點:實現一鍵式安裝和配置、線程級別的任務監控和告警.
-
降低硬件集羣、軟件維護、任務監控和應用開發的難度。
-
便於做成統一的硬件、計算平臺資源池.
需要說明的是,Spark Streaming 無法實現毫秒級的流計算,因此,對於需要毫秒級實時響應的企業應用而言,仍然需要採用流計算框架(如 Storm).
saprk
圖:用 Spark 架構滿足批處理和流處理需求
Hadoop 和 Spark 的統一部署:
由於 Hadoop 生態系統中的一些組件所實現的功能,目前還是無法由 Spark 取代的,比如,Storm。
現有的 Hadoop 組件開發的應用,完全轉移到 Spark 上需要一定的成本,不同的計算框架統一運行在 YARN 中,可以帶來如下好處:
-
計算資源按需伸縮
-
不用負載應用混搭,集羣利用率高
-
共享底層存儲,避免數據跨集羣遷移
部署
圖:Hadoop 和 Spark 的統一部署
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/YcL7Yzpd-sJscUXufpXK-g