Spark SQL 重點知識總結

一、Spark SQL 的概念理解

Spark SQL 是 spark 套件中一個模板,它將數據的計算任務通過 SQL 的形式轉換成了 RDD 的計算,類似於 Hive 通過 SQL 的形式將數據的計算任務轉換成了 MapReduce。

Spark SQL 的特點:

  1. 和 Spark Core 的無縫集成,可以在寫整個 RDD 應用的時候,配置 Spark SQL 來完成邏輯實現。

  2. 統一的數據訪問方式,Spark SQL 提供標準化的 SQL 查詢。

  3. Hive 的繼承,Spark SQL 通過內嵌的 hive 或者連接外部已經部署好的 hive 案例,實現了對 hive 語法的繼承和操作。

  4. 標準化的連接方式,Spark SQL 可以通過啓動 thrift Server 來支持 JDBC、ODBC 的訪問,將自己作爲一個 BI Server 使用

Spark SQL 數據抽象:

  1. RDD(Spark1.0)->DataFrame(Spark1.3)->DataSet(Spark1.6)

  2. Spark SQL 提供了 DataFrame 和 DataSet 的數據抽象

  3. DataFrame 就是 RDD+Schema,可以認爲是一張二維表格,劣勢在於編譯器不進行表格中的字段的類型檢查,在運行期進行檢查

  4. DataSet 是 Spark 最新的數據抽象,Spark 的發展會逐步將 DataSet 作爲主要的數據抽象,弱化 RDD 和 DataFrame.DataSet 包含了 DataFrame 所有的優化機制。除此之外提供了以樣例類爲 Schema 模型的強類型

  5. DataFrame=DataSet[Row]

  6. DataFrame 和 DataSet 都有可控的內存管理機制,所有數據都保存在非堆上,都使用了 catalyst 進行 SQL 的優化。

Spark SQL 客戶端查詢:

  1. 可以通過 Spark-shell 來操作 Spark SQL,spark 作爲 SparkSession 的變量名,sc 作爲 SparkContext 的變量名

  2. 可以通過 Spark 提供的方法讀取 json 文件,將 json 文件轉換成 DataFrame

  3. 可以通過 DataFrame 提供的 API 來操作 DataFrame 裏面的數據。

  4. 可以通過將 DataFrame 註冊成爲一個臨時表的方式,來通過 Spark.sql 方法運行標準的 SQL 語句來查詢。

二、Spark SQL 查詢方式

DataFrame 查詢方式

  1. DataFrame 支持兩種查詢方式:一種是 DSL 風格,另外一種是 SQL 風格

(1)、DSL 風格:

需要引入 import spark.implicit. _ 這個隱式轉換,可以將 DataFrame 隱式轉換成 RDD

(2)、SQL 風格:

a、需要將 DataFrame 註冊成一張表格,如果通過 CreateTempView 這種方式來創建,那麼該表格 Session 有效,如果通過 CreateGlobalTempView 來創建,那麼該表格跨 Session 有效,但是 SQL 語句訪問該表格的時候需要加上前綴 global_temp

b、需要通過 sparkSession.sql 方法來運行你的 SQL 語句

DataSet 查詢方式

  1. 定義一個 DataSet,先定義一個 Case 類

三、DataFrame、Dataset 和 RDD 互操作

RDD->DataFrame

  1. 普通方式:例如 rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF("name","age")

  2. 通過反射來設置 schema,例如:

#註冊成一張臨時表
peopleDF.createOrReplaceTempView("persons")
val teen=spark.sql("select name,age from persons where age between 13 and 29")
teen.show

這時 teen 是一張表,每一行是一個 row 對象,如果需要訪問 Row 對象中的每一個元素,可以通過下標 row(0);你也可以通過列名 row.getAs[String]("name")

也可以使用 getAs 方法:

3、通過編程的方式來設置 schema,適用於編譯器不能確定列的情況

val peopleRDD=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt")
val schemaString="name age"
val filed=schemaString.split(" ").map(filename=> org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))
val schema=org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(",")).map(para=>org.apache.spark.sql.Row(para(0).trim,para(1).trim))
val peopleDF=spark.createDataFrame(res6,schema)
peopleDF.show

DataFrame->RDD

dataFrame.rdd

RDD->DataSet

rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS

DataSet->DataSet

dataSet.rdd

DataFrame -> DataSet

dataFrame.to[Person]

DataSet -> DataFrame

dataSet.toDF

四、用戶自定義函數

用戶自定義 UDF 函數

通過 spark.udf 功能用戶可以自定義函數

自定義 udf 函數:

  1. 通過 spark.udf.register(name,func) 來註冊一個 UDF 函數,name 是 UDF 調用時的標識符,fun 是一個函數,用於處理字段。

  2. 需要將一個 DF 或者 DS 註冊爲一個臨時表

  3. 通過 spark.sql 去運行一個 SQL 語句,在 SQL 語句中可以通過 name(列名) 方式來應用 UDF 函數

用戶自定義聚合函數

1. 弱類型用戶自定義聚合函數

  1. 新建一個 Class 繼承 UserDefinedAggregateFunction ,然後複寫方法:
//聚合函數需要輸入參數的數據類型
override def inputSchema: StructType = ???
//可以理解爲保存聚合函數業務邏輯數據的一個數據結構
override def bufferSchema: StructType = ???
// 返回值的數據類型
override def dataType: DataType = ???
// 對於相同的輸入一直有相同的輸出
override def deterministic: Boolean = true
//用於初始化你的數據結構
override def initialize(buffer: MutableAggregationBuffer)Unit = ???
//用於同分區內Row對聚合函數的更新操作
override def update(buffer: MutableAggregationBuffer, input: Row)Unit = ???
//用於不同分區對聚合結果的聚合。
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row)Unit = ???
//計算最終結果
override def evaluate(buffer: Row)Any = ???
  1. 你需要通過 spark.udf.resigter 去註冊你的 UDAF 函數。

  2. 需要通過 spark.sql 去運行你的 SQL 語句,可以通過 select UDAF(列名) 來應用你的用戶自定義聚合函數。

2、強類型用戶自定義聚合函數

  1. 新建一個 class,繼承 Aggregator[Employee, Average, Double],其中 Employee 是在應用聚合函數的時候傳入的對象,Average 是聚合函數在運行的時候內部需要的數據結構,Double 是聚合函數最終需要輸出的類型。這些可以根據自己的業務需求去調整。複寫相對應的方法:
//用於定義一個聚合函數內部需要的數據結構
override def zero: Average = ???
//針對每個分區內部每一個輸入來更新你的數據結構
override def reduce(b: Average, a: Employee)Average = ???
//用於對於不同分區的結構進行聚合
override def merge(b1: Average, b2: Average)Average = ???
//計算輸出
override def finish(reduction: Average)Double = ???
//用於數據結構他的轉換
override def bufferEncoder: Encoder[Average] = ???
//用於最終結果的轉換
override def outputEncoder: Encoder[Double] = ???
  1. 新建一個 UDAF 實例,通過 DF 或者 DS 的 DSL 風格語法去應用。

五、Spark SQL 和 Hive 的繼承

1、內置 Hive

  1. Spark 內置有 Hive,Spark2.1.1 內置的 Hive 是 1.2.1。

  2. 需要將 core-site.xml 和 hdfs-site.xml 拷貝到 spark 的 conf 目錄下。如果 Spark 路徑下發現 metastore_db,需要刪除【僅第一次啓動的時候】。

  3. 在你第一次啓動創建 metastore 的時候,你需要指定 spark.sql.warehouse.dir 這個參數, 比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse

  4. 注意,如果你在 load 數據的時候,需要將數據放到 HDFS 上。

2、外部 Hive(這裏主要使用這個方法)

  1. 需要將 hive-site.xml 拷貝到 spark 的 conf 目錄下。

  2. 如果 hive 的 metestore 使用的是 mysql 數據庫,那麼需要將 mysql 的 jdbc 驅動包放到 spark 的 jars 目錄下。

  3. 可以通過 spark-sql 或者 spark-shell 來進行 sql 的查詢。完成和 hive 的連接。

這就是 hive 裏面的表

六、Spark SQL 的數據源

1、輸入

對於 Spark SQL 的輸入需要使用 sparkSession.read 方法

  1. 通用模式 sparkSession.read.format("json").load("path") 支持類型:parquet、json、text、csv、orc、jdbc

  2. 專業模式 sparkSession.read.json、 csv 直接指定類型。

2、輸出

對於 Spark SQL 的輸出需要使用 sparkSession.write 方法

  1. 通用模式 dataFrame.write.format("json").save("path") 支持類型:parquet、json、text、csv、orc

  2. 專業模式 dataFrame.write.csv("path") 直接指定類型

  3. 如果你使用通用模式,spark 默認 parquet 是默認格式、sparkSession.read.load 加載的默認是 parquet 格式 dataFrame.write.save 也是默認保存成 parquet 格式。

  4. 如果需要保存成一個 text 文件,那麼需要 dataFrame 裏面只有一列(只需要一列即可)。

七、Spark SQL 實戰

1、數據說明

這裏有三個數據集,合起來大概有幾十萬條數據,是關於貨品交易的數據集。

2、任務

這裏有三個需求:

  1. 計算所有訂單中每年的銷售單數、銷售總額

  2. 計算所有訂單每年最大金額訂單的銷售額

  3. 計算所有訂單中每年最暢銷貨品

3、步驟

1. 加載數據

tbStock.txt

#代碼
case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
val tbStockRdd=spark.sparkContext.textFile("file:///root/dataset/tbStock.txt")
val tbStockDS=tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS
tbStockDS.show()

tbStockDetail.txt

case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable
val tbStockDetailRdd=spark.sparkContext.textFile("file:///root/dataset/tbStockDetail.txt")
val tbStockDetailDS=tbStockDetailRdd.map(_.split(",")).map(attr=>tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS
tbStockDetailDS.show()

tbDate.txt

case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable
val tbDateRdd=spark.sparkContext.textFile("file:///root/dataset/tbDate.txt")
val tbDateDS=tbDateRdd.map(_.split(",")).map(attr=>tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS
tbDateDS.show()

2. 註冊表

tbStockDS.createOrReplaceTempView("tbStock")
tbDateDS.createOrReplaceTempView("tbDate")
tbStockDetailDS.createOrReplaceTempView("tbStockDetail")

3. 解析表

  1. 計算所有訂單中每年的銷售單數、銷售總額
#sql語句
select c.theyear,count(distinct a.ordernumber),sum(b.amount)
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear
order by c.theyear

  1. 計算所有訂單每年最大金額訂單的銷售額

a、先統計每年每個訂單的銷售額

select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber

b、計算最大金額訂單的銷售額

select d.theyear,c.SumOfAmount as SumOfAmount 
from 
(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount 
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber  
group by a.dateid,a.ordernumber) c  
join tbDate d on c.dateid=d.dateid  
group by d.theyear
order by theyear desc

  1. 計算所有訂單中每年最暢銷貨品

a、求出每年每個貨品的銷售額

select c.theyear,b.itemid,sum(b.amount) as SumOfAmount 
from tbStock a 
join tbStockDetail b on a.ordernumber=b.ordernumber 
join tbDate c on a.dateid=c.dateid 
group by c.theyear,b.itemid

b、在 a 的基礎上,統計每年單個貨品的最大金額

select d.theyear,max(d.SumOfAmount) as MaxOfAmount 
from 
(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount 
from tbStock a 
join tbStockDetail b on a.ordernumber=b.ordernumber 
join tbDate c on a.dateid=c.dateid 
group by c.theyear,b.itemid) d 
group by theyear

c、用最大銷售額和統計好的每個貨品的銷售額 join,以及用年 join,集合得到最暢銷貨品那一行信息

select distinct e.theyear,e.itemid,f.maxofamount 
from 
(select c.theyear,b.itemid,sum(b.amount) as sumofamount 
from tbStock a 
join tbStockDetail b on a.ordernumber=b.ordernumber 
join tbDate c on a.dateid=c.dateid 
group by c.theyear,b.itemid) e 
join 
(select d.theyear,max(d.sumofamount) as maxofamount 
from 
(select c.theyear,b.itemid,sum(b.amount) as sumofamount 
from tbStock a 
join tbStockDetail b on a.ordernumber=b.ordernumber 
join tbDate c on a.dateid=c.dateid 
group by c.theyear,b.itemid) d 
group by d.theyear) f on e.theyear=f.theyear 
and e.sumofamount=f.maxofamount order by e.theyear

轉自:大數據真好玩

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/mBMkkOSz4h-Qkf3m7IH3EA