Spark 技術原理

Spark 概述

Spark 簡介:

Spark 最初由美國加州伯克利大學(UCBerkeley)的 AMP 實驗室於 2009 年開發,是基於內存計算的大數據並行計算框架,可用於構建大型的、低延遲的數據分析應用程序。

2013 年 Spark 加入 Apache 孵化器項目後發展迅猛,如今已成爲 Apache 軟件基金會最重要的三大分佈式計算系統開源項目之一(Hadoop、Spark、Storm)。

Spark 在 2014 年打破了 Hadoop 保持的基準排序紀錄

Spark 具有如下幾個主要特點:

特點

圖:Spark 的特點

Spark 如今已吸引了國內外各大公司的注意,如騰訊、淘寶、百度、亞馬遜等公司均不同程度地使用了 Spark 來構建大數據分析應用,並應用到實際的生產環境中。

Spark 與 hadoop

圖:谷歌應用趨勢: Spark 與 Hadoop 對比

Spark 是什麼?

Spark 做什麼?

Spark 適用場景:

需要反覆操作的次數越多,所需讀取的數據量越大,收益越大。

Scala 簡介:

Scala 是一門現代的多範式編程語言,運行於 Java 平臺(JVM,Java 虛擬機),併兼容現有的 Java 程序。

Scala 的特性:

Scala 兼容 Java,運行速度快,且能融合到 Hadoop 生態圈中。Scala 是 Spark 的主要編程語言,但 Spark 還支持 Java、Python、R 作爲編程語言 Scala 的優勢是提供了 REPL(Read-Eval-Print Loop,交互式解釋器),提高程序開發效率。

Spark 與 Hadoop 的對比:

對比 Hadoop:

Hadoop 存在如下一些缺點:

Spark 在借鑑 Hadoop MapReduce 優點的同時,很好地解決了 MapReduce 所面臨的問題。

相比於 Hadoop MapReduce,Spark 主要具有如下優點:

Sprak

圖:Hadoop 與 Spark 的執行流程對比

使用 Hadoop 進行迭代計算非常耗資源。Spark 將數據載入內存後,之後的迭代計算都可以直接使用內存中的中間結果作運算,避免了從磁盤中頻繁讀取數據。

時間

圖:Hadoop 與 Spark 執行邏輯迴歸的時間對比

Spark 生態系統

##Spark 生態系統的原因:

在實際應用中,大數據處理主要包括以下三個類型:

當同時存在以上三種場景時,就需要同時部署三種不同的軟件

這樣做難免會帶來一些問題:

Spark 設計的理念:

Spark 的設計遵循 “一個軟件棧滿足不同應用場景” 的理念,逐漸形成了一套完整的生態系統。

既能夠提供內存計算框架,也可以支持 SQL 即席查詢、實時流式計算、機器學習和圖計算等。

Spark 可以部署在資源管理器 YARN 之上,提供一站式的大數據解決方案。

因此,Spark 所提供的生態系統足以應對上述三種場景,即同時支持批處理、交互式查詢和流數據處理。

Spark 生態系統:

生態系統

圖:BDAS 架構

Spark 生態系統已經成爲伯克利數據分析軟件棧 BDAS(Berkeley Data Analytics Stack)的重要組成部分。

Spark 的生態系統主要包含了 Spark Core、Spark SQL、Spark Streaming、MLLib 和 GraphX 等組件。

Spark 生態系統組件的應用場景:

Spark 組件

Spark 運行架構

Spark 架構

圖:Spark 體系架構

基本概念:

Spark 架構設計:

架構設計

Spark 運行架構包括集羣資源管理器(Cluster Manager)、運行作業任務的工作節點(Worker Node)、每個應用的任務控制節點(Driver)和每個工作節點上負責具體任務的執行進程(Executor)。

資源管理器可以自帶或 Mesos 或 YARN。(在華爲 FusionInsight 中只能用 YARN。

與 Hadoop MapReduce 計算框架相比,Spark 所採用的 Executor 有兩個優點:

關係

圖:Spark 中各種概念之間的相互聯繫

一個 Application 由一個 Driver 和若干個 Job 構成,一個 Job 由多個 Stage 構成,一個 Stage 由多個沒有 Shuffle 關係的 Task 組成.

當執行一個 Application 時,Driver 會向集羣管理器申請資源,啓動 Executor,並向 Executor 發送應用程序代碼和文件,然後在 Executor 上執行 Task,運行結束後,執行結果會返回給 Driver,或者寫到 HDFS 或者其他數據庫中。

Spark 運行基本流程:

Spark 運行流程圖

圖:Spark 運行基本流程圖

Spark 運行基本流程如下:

  1. 首先爲應用構建起基本的運行環境,即由 Driver 創建一個 SparkContext,進行資源的申請、任務的分配和監控。

  2. 資源管理器爲 Executor 分配資源,並啓動 Executor 進程。

  3. SparkContext 根據 RDD 的依賴關係構建 DAG 圖,DAG 圖提交給 DAGScheduler 解析成 Stage,然後把一個個 TaskSet 提交給底層調度器 TaskScheduler 處理;Executor 向 SparkContext 申請 Task,Task Scheduler 將 Task 發放給 Executor 運行,並提供應用程序代碼。

  4. Task 在 Executor 上運行,把執行結果反饋給 TaskScheduler,然後反饋給 DAGScheduler,運行完畢後寫入數據並釋放所有資源。

總體而言,Spark 運行架構具有以下特點:

  1. 每個 Application 都有自己專屬的 Executor 進程,並且該進程在 Application 運行期間一直駐留。Executor 進程以多線程的方式運行 Task。

  2. Spark 運行過程與資源管理器無關,只要能夠獲取 Executor 進程並保持通信即可。

  3. Task 採用了數據本地性和推測執行等優化機制。

Spark on Yarn 的運行流程:

Spark on client

圖:Spark on Yarn-client 的運行流程

spark on clter

圖:Spark on Yarn-cluster 的運行流程

Yarn-client 和 Yarn-cluster 的區別:

RDD 運行原理

設計背景:

許多迭代式算法(比如機器學習、圖算法等)和交互式數據挖掘工具,共同之處是,不同計算階段之間會重用中間結果。

目前的 MapReduce 框架都是把中間結果寫入到 HDFS 中,帶來了大量的數據複製、磁盤 IO 和序列化開銷。

RDD 就是爲了滿足這種需求而出現的,它提供了一個抽象的數據架構,我們不必擔心底層數據的分佈式特性,只需將具體的應用邏輯表達爲一系列轉換處理,不同 RDD 之間的轉換操作形成依賴關係,可以實現管道化,避免中間數據存儲。

RDD 概念:

一個 RDD 就是一個分佈式對象集合,本質上是一個只讀的分區記錄集合,每個 RDD 可分成多個分區,每個分區就是一個數據集片段,並且一個 RDD 的不同分區可以被保存到集羣中不同的節點上,從而可以在集羣中的不同節點上進行並行計算。

RDD 提供了一種高度受限的共享內存模型,即 RDD 是隻讀的記錄分區的集合,不能直接修改,只能基於穩定的物理存儲中的數據集創建 RDD,或者通過在其他 RDD 上執行確定的轉換操作(如 map、join 和 group by)而創建得到新的 RDD。

RDD 提供了一組豐富的操作以支持常見的數據運算,分爲 “動作”(Action)和 “轉換”(Transformation)兩種類型。

RDD 提供的轉換接口都非常簡單,都是類似 map、filter、groupBy、join 等粗粒度的數據轉換操作,而不是針對某個數據項的細粒度修改(不適合網頁爬蟲)。

表面上 RDD 的功能很受限、不夠強大,實際上 RDD 已經被實踐證明可以高效地表達許多框架的編程模型(比如 MapReduce、SQL、Pregel)。

Spark 用 Scala 語言實現了 RDD 的 API,程序員可以通過調用 API 實現對 RDD 的各種操作。

RDD 典型的執行過程如下:

  1. RDD 讀入外部數據源進行創建。

  2. RDD 經過系列的轉換(Transformation)操作,每一次都會產生不同的 RDD,供給下一個轉換操作使用。

  3. 最後一個 RDD 經過 “動作” 操作進行轉換,並輸出到外部數據源。

這一系列處理稱爲一個 Lineage(血緣關係),即 DAG 拓撲排序的結果。

優點:惰性調用、管道化、避免同步等待、不需要保存中間結果、每次操作變得簡單。

過程示例

圖:RDD 執行過程的一個示例

RDD 特性:

Spark 採用 RDD 以後能夠實現高效計算的原因主要在於:

(1)高效的容錯性

(2)中間結果持久化到內存,數據在內存中的多個 RDD 操作之間進行傳遞,避免了不必要的讀寫磁盤開銷 。

(3)存放的數據可以是 Java 對象,避免了不必要的對象序列化和反序列化。

RDD 之間的依賴關係:

依賴關係

圖:窄依賴於寬依賴的區別

窄依賴表現爲一個父 RDD 的分區對應於一個子 RDD 的分區或多個父 RDD 的分區對應於一個子 RDD 的分區.

寬依賴則表現爲存在一個父 RDD 的一個分區對應一個子 RDD 的多個分區。

Stage 的劃分:

Spark 通過分析各個 RDD 的依賴關係生成了 DAG,再通過分析各個 RDD 中的分區之間的依賴關係來決定如何劃分 Stage,具體劃分方法是:

  1. 在 DAG 中進行反向解析,遇到寬依賴就斷開

  2. 遇到窄依賴就把當前的 RDD 加入到 Stage 中

  3. 將窄依賴儘量劃分在同一個 Stage 中,可以實現流水線計算

Stage 劃分

圖:根據 RDD 分區的依賴關係劃分 Stage

如上圖,被分成三個 Stage,在 Stage2 中,從 map 到 union 都是窄依賴,這兩步操作可以形成一個流水線操作。

流水線操作實例:

分區 7 通過 map 操作生成的分區 9,可以不用等待分區 8 到分區 10 這個 map 操作的計算結束,而是繼續進行 union 操作,得到分區 13,這樣流水線執行大大提高了計算的效率。

Stage 的類型包括兩種:ShuffleMapStage 和 ResultStage,具體如下:(1)ShuffleMapStage:不是最終的 Stage,在它之後還有其他 Stage,所以,它的輸出一定需要經過 Shuffle 過程,並作爲後續 Stage 的輸入;這種 Stage 是以 Shuffle 爲輸出邊界,其輸入邊界可以是從外部獲取數據,也可以是另一個 ShuffleMapStage 的輸出,其輸出可以是另一個 Stage 的開始;在一個 Job 裏可能有該類型的 Stage,也可能沒有該類型 Stage;

(2)ResultStage:最終的 Stage,沒有輸出,而是直接產生結果或存儲。這種 Stage 是直接輸出結果,其輸入邊界可以是從外部獲取數據,也可以是另一個 ShuffleMapStage 的輸出。在一個 Job 裏必定有該類型 Stage。因此,一個 Job 含有一個或多個 Stage,其中至少含有一個 ResultStage。

RDD 運行的過程:

RDD 運行過程

圖;RDD 在 Spark 中的運行過程

通過上述對 RDD 概念、依賴關係和 Stage 劃分的介紹,結合之前介紹的 Spark 運行基本流程,再總結一下 RDD 在 Spark 架構中的運行過程:(1)創建 RDD 對象;(2)SparkContext 負責計算 RDD 之間的依賴關係,構建 DAG;(3)DAGScheduler 負責把 DAG 圖分解成多個 Stage,每個 Stage 中包含了多個 Task,每個 Task 會被 TaskScheduler 分發給各個 WorkerNode 上的 Executor 去執行。

RDD 的算子:

Transformation:

Action:

Spark SQL

從 Shark 說起:

Shark 即 Hive on Spark,爲了實現與 Hive 兼容,Shark 在 HiveQL 方面重用了 Hive 中 HiveQL 的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,可以近似認爲僅將物理執行計劃從 MapReduce 作業替換成了 Spark 作業,通過 Hive 的 HiveQL 解析,把 HiveQL 翻譯成 Spark 上的 RDD 操作。

Shark 的設計導致了兩個問題:

  1. 一是執行計劃優化完全依賴於 Hive,不方便添加新的優化策略;

  2. 二是因爲 Spark 是線程級並行,而 MapReduce 是進程級並行,因此,Spark 在兼容 Hive 的實現上存在線程安全問題,導致 Shark 不得不使用另外一套獨立維護的打了補丁的 Hive 源碼分支。

流程

圖:Hive 中 SQL 查詢的 MapReduce 作業轉換過程

Spark SQL 設計:

Spark SQL 是 Spark 中用於結構化數據處理的模塊。在 Spark 應用中,可以無縫的使用 SQL 語句亦或是 DataFrame APi 對結構化數據進程查詢。

Spar Sql

圖:Spark SQL 架構

Spark SQL 在 Hive 兼容層面僅依賴 HiveQL 解析、Hive 元數據,也就是說,從 HQL 被解析成抽象語法樹(AST)起,就全部由 Spark SQL 接管了。Spark SQL 執行計劃生成和優化都由 Catalyst(函數式關係查詢優化框架)負責。

支持

圖:Spark SQL 支持的數據格式和編程語言

Spark SQL 增加了 SchemaRDD(即帶有 Schema 信息的 RDD),使用戶可以在 Spark SQL 中執行 SQL 語句,數據既可以來自 RDD,也可以是 Hive、HDFS、Cassandra 等外部數據源,還可以是 JSON 格式的數據。

Spark SQL 目前支持 Scala、Java、Python 三種語言,支持 SQL-92 規範。

Dataset 簡介:

DataSet 是一個由特定域的對象組成的強類型集合,可通過功能或關係操作並行轉換其中的對象。

DataSet 以 Catalyst 邏輯執行計劃表示,並且數據以編碼的二進制形式存儲,不需要反序列化就可以執行 sort、filter、shuffle 等操作。

DataSet 是 “懶惰的”,只在執行 cation 操作時觸發計算。當執行 action 操作時,Spark 用查詢優化程序來優化邏輯計劃,並生成一個高效的並行分佈式的物理計劃。

DataFrame 介紹:

DateFrame:指定列名稱的 DataSet。DataFream 是 Dataset[Row] 的特例。

RDD、DataFrame 與 Dataset:

RDD:

DataFrame:

Dataset 的特點:

Dataset 具有 RDD 和 DataFrame 的有點,又避免它們的缺點。

Spark SQL vs Hive:

區別:

聯繫:

Spark Structured Streaming

Structured Streaming 概述:

Structured Streaming 是構建在 Spark SQL 引擎上的流式數據處理引擎。可以像使用靜態 RDD 數據那樣編寫流式計算過程。當流數據連續不斷的產生時,SPark SQL 將會增量的、持續不斷的處理這些數據,並將結果更新到結果集中。如下圖:

流數據

圖:Structured Streaming

Structured Streaming 計算模型:

模型

圖:計算模型圖

模型示例

圖:計算模型示例

每一次計算後將結果更新到數據集中。

Spark Streaming

概述:

Spark Streaming 是 Spark 核心 API 的一個擴展,一個實時計算框架。具有可擴展性、高吞吐量、可容錯性等特定。

Spark Streaming

圖:Spark Streaming 過程示意圖

Spark Streaming 微批處理:

Spark Streaming 計算基於 DStream,將流式計算分解成一系列短小的批處理作業。Spark 引擎將數據生成最終結果數據。

批處理

圖:批處理示意圖

使用 DStream 從 Kafka 和 HDFS 等源獲取連續的數據流,DStreams 由一系列連續的 RDD 組成,每個 RDD 包含確定時間間隔的數據,任何對 DStreams 的操作都轉換成對 RDD 的操作。

Spark Streaming 容錯機制:

Spark Streaming 本質仍是基於 RDD 計算,當 RDD 的某些 partiton 丟失,可以通過 RDD 的血統機制重新恢復丟失的 RDD。

Spark 的部署和應用方式

Spark 三種部署方式:

Spark 支持三種不同類型的部署方式,包括:

從 Hadoop+Storm 架構轉向 Spark 架構:

案例

圖:採用 Hadoop+Storm 部署方式的一個案例

需要說明的是,Spark Streaming 無法實現毫秒級的流計算,因此,對於需要毫秒級實時響應的企業應用而言,仍然需要採用流計算框架(如 Storm).

saprk

圖:用 Spark 架構滿足批處理和流處理需求

Hadoop 和 Spark 的統一部署:

由於 Hadoop 生態系統中的一些組件所實現的功能,目前還是無法由 Spark 取代的,比如,Storm。

現有的 Hadoop 組件開發的應用,完全轉移到 Spark 上需要一定的成本,不同的計算框架統一運行在 YARN 中,可以帶來如下好處:

部署

圖:Hadoop 和 Spark 的統一部署

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/YcL7Yzpd-sJscUXufpXK-g