Spark SQL 重點知識總結
一、Spark SQL 的概念理解
Spark SQL 是 spark 套件中一個模板,它將數據的計算任務通過 SQL 的形式轉換成了 RDD 的計算,類似於 Hive 通過 SQL 的形式將數據的計算任務轉換成了 MapReduce。
Spark SQL 的特點:
-
和 Spark Core 的無縫集成,可以在寫整個 RDD 應用的時候,配置 Spark SQL 來完成邏輯實現。
-
統一的數據訪問方式,Spark SQL 提供標準化的 SQL 查詢。
-
Hive 的繼承,Spark SQL 通過內嵌的 hive 或者連接外部已經部署好的 hive 案例,實現了對 hive 語法的繼承和操作。
-
標準化的連接方式,Spark SQL 可以通過啓動 thrift Server 來支持 JDBC、ODBC 的訪問,將自己作爲一個 BI Server 使用
Spark SQL 數據抽象:
-
RDD(Spark1.0)->DataFrame(Spark1.3)->DataSet(Spark1.6)
-
Spark SQL 提供了 DataFrame 和 DataSet 的數據抽象
-
DataFrame 就是 RDD+Schema,可以認爲是一張二維表格,劣勢在於編譯器不進行表格中的字段的類型檢查,在運行期進行檢查
-
DataSet 是 Spark 最新的數據抽象,Spark 的發展會逐步將 DataSet 作爲主要的數據抽象,弱化 RDD 和 DataFrame.DataSet 包含了 DataFrame 所有的優化機制。除此之外提供了以樣例類爲 Schema 模型的強類型
-
DataFrame=DataSet[Row]
-
DataFrame 和 DataSet 都有可控的內存管理機制,所有數據都保存在非堆上,都使用了 catalyst 進行 SQL 的優化。
Spark SQL 客戶端查詢:
-
可以通過 Spark-shell 來操作 Spark SQL,spark 作爲 SparkSession 的變量名,sc 作爲 SparkContext 的變量名
-
可以通過 Spark 提供的方法讀取 json 文件,將 json 文件轉換成 DataFrame
-
可以通過 DataFrame 提供的 API 來操作 DataFrame 裏面的數據。
-
可以通過將 DataFrame 註冊成爲一個臨時表的方式,來通過 Spark.sql 方法運行標準的 SQL 語句來查詢。
二、Spark SQL 查詢方式
DataFrame 查詢方式
- DataFrame 支持兩種查詢方式:一種是 DSL 風格,另外一種是 SQL 風格
(1)、DSL 風格:
需要引入 import spark.implicit. _ 這個隱式轉換,可以將 DataFrame 隱式轉換成 RDD
(2)、SQL 風格:
a、需要將 DataFrame 註冊成一張表格,如果通過 CreateTempView 這種方式來創建,那麼該表格 Session 有效,如果通過 CreateGlobalTempView 來創建,那麼該表格跨 Session 有效,但是 SQL 語句訪問該表格的時候需要加上前綴 global_temp
b、需要通過 sparkSession.sql 方法來運行你的 SQL 語句
DataSet 查詢方式
- 定義一個 DataSet,先定義一個 Case 類
三、DataFrame、Dataset 和 RDD 互操作
RDD->DataFrame
-
普通方式:例如 rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF("name","age")
-
通過反射來設置 schema,例如:
#註冊成一張臨時表
peopleDF.createOrReplaceTempView("persons")
val teen=spark.sql("select name,age from persons where age between 13 and 29")
teen.show
這時 teen 是一張表,每一行是一個 row 對象,如果需要訪問 Row 對象中的每一個元素,可以通過下標 row(0);你也可以通過列名 row.getAs[String]("name")
也可以使用 getAs 方法:
3、通過編程的方式來設置 schema,適用於編譯器不能確定列的情況
val peopleRDD=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt")
val schemaString="name age"
val filed=schemaString.split(" ").map(filename=> org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))
val schema=org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(",")).map(para=>org.apache.spark.sql.Row(para(0).trim,para(1).trim))
val peopleDF=spark.createDataFrame(res6,schema)
peopleDF.show
DataFrame->RDD
dataFrame.rdd
RDD->DataSet
rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS
DataSet->DataSet
dataSet.rdd
DataFrame -> DataSet
dataFrame.to[Person]
DataSet -> DataFrame
dataSet.toDF
四、用戶自定義函數
用戶自定義 UDF 函數
通過 spark.udf 功能用戶可以自定義函數
自定義 udf 函數:
-
通過 spark.udf.register(name,func) 來註冊一個 UDF 函數,name 是 UDF 調用時的標識符,fun 是一個函數,用於處理字段。
-
需要將一個 DF 或者 DS 註冊爲一個臨時表
-
通過 spark.sql 去運行一個 SQL 語句,在 SQL 語句中可以通過 name(列名) 方式來應用 UDF 函數
用戶自定義聚合函數
1. 弱類型用戶自定義聚合函數
- 新建一個 Class 繼承 UserDefinedAggregateFunction ,然後複寫方法:
//聚合函數需要輸入參數的數據類型
override def inputSchema: StructType = ???
//可以理解爲保存聚合函數業務邏輯數據的一個數據結構
override def bufferSchema: StructType = ???
// 返回值的數據類型
override def dataType: DataType = ???
// 對於相同的輸入一直有相同的輸出
override def deterministic: Boolean = true
//用於初始化你的數據結構
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
//用於同分區內Row對聚合函數的更新操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
//用於不同分區對聚合結果的聚合。
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
//計算最終結果
override def evaluate(buffer: Row): Any = ???
-
你需要通過 spark.udf.resigter 去註冊你的 UDAF 函數。
-
需要通過 spark.sql 去運行你的 SQL 語句,可以通過 select UDAF(列名) 來應用你的用戶自定義聚合函數。
2、強類型用戶自定義聚合函數
- 新建一個 class,繼承 Aggregator[Employee, Average, Double],其中 Employee 是在應用聚合函數的時候傳入的對象,Average 是聚合函數在運行的時候內部需要的數據結構,Double 是聚合函數最終需要輸出的類型。這些可以根據自己的業務需求去調整。複寫相對應的方法:
//用於定義一個聚合函數內部需要的數據結構
override def zero: Average = ???
//針對每個分區內部每一個輸入來更新你的數據結構
override def reduce(b: Average, a: Employee): Average = ???
//用於對於不同分區的結構進行聚合
override def merge(b1: Average, b2: Average): Average = ???
//計算輸出
override def finish(reduction: Average): Double = ???
//用於數據結構他的轉換
override def bufferEncoder: Encoder[Average] = ???
//用於最終結果的轉換
override def outputEncoder: Encoder[Double] = ???
- 新建一個 UDAF 實例,通過 DF 或者 DS 的 DSL 風格語法去應用。
五、Spark SQL 和 Hive 的繼承
1、內置 Hive
-
Spark 內置有 Hive,Spark2.1.1 內置的 Hive 是 1.2.1。
-
需要將 core-site.xml 和 hdfs-site.xml 拷貝到 spark 的 conf 目錄下。如果 Spark 路徑下發現 metastore_db,需要刪除【僅第一次啓動的時候】。
-
在你第一次啓動創建 metastore 的時候,你需要指定 spark.sql.warehouse.dir 這個參數, 比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse
-
注意,如果你在 load 數據的時候,需要將數據放到 HDFS 上。
2、外部 Hive(這裏主要使用這個方法)
-
需要將 hive-site.xml 拷貝到 spark 的 conf 目錄下。
-
如果 hive 的 metestore 使用的是 mysql 數據庫,那麼需要將 mysql 的 jdbc 驅動包放到 spark 的 jars 目錄下。
-
可以通過 spark-sql 或者 spark-shell 來進行 sql 的查詢。完成和 hive 的連接。
這就是 hive 裏面的表
六、Spark SQL 的數據源
1、輸入
對於 Spark SQL 的輸入需要使用 sparkSession.read 方法
-
通用模式 sparkSession.read.format("json").load("path") 支持類型:parquet、json、text、csv、orc、jdbc
-
專業模式 sparkSession.read.json、 csv 直接指定類型。
2、輸出
對於 Spark SQL 的輸出需要使用 sparkSession.write 方法
-
通用模式 dataFrame.write.format("json").save("path") 支持類型:parquet、json、text、csv、orc
-
專業模式 dataFrame.write.csv("path") 直接指定類型
-
如果你使用通用模式,spark 默認 parquet 是默認格式、sparkSession.read.load 加載的默認是 parquet 格式 dataFrame.write.save 也是默認保存成 parquet 格式。
-
如果需要保存成一個 text 文件,那麼需要 dataFrame 裏面只有一列(只需要一列即可)。
七、Spark SQL 實戰
1、數據說明
這裏有三個數據集,合起來大概有幾十萬條數據,是關於貨品交易的數據集。
2、任務
這裏有三個需求:
-
計算所有訂單中每年的銷售單數、銷售總額
-
計算所有訂單每年最大金額訂單的銷售額
-
計算所有訂單中每年最暢銷貨品
3、步驟
1. 加載數據
tbStock.txt
#代碼
case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
val tbStockRdd=spark.sparkContext.textFile("file:///root/dataset/tbStock.txt")
val tbStockDS=tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS
tbStockDS.show()
tbStockDetail.txt
case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable
val tbStockDetailRdd=spark.sparkContext.textFile("file:///root/dataset/tbStockDetail.txt")
val tbStockDetailDS=tbStockDetailRdd.map(_.split(",")).map(attr=>tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS
tbStockDetailDS.show()
tbDate.txt
case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable
val tbDateRdd=spark.sparkContext.textFile("file:///root/dataset/tbDate.txt")
val tbDateDS=tbDateRdd.map(_.split(",")).map(attr=>tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS
tbDateDS.show()
2. 註冊表
tbStockDS.createOrReplaceTempView("tbStock")
tbDateDS.createOrReplaceTempView("tbDate")
tbStockDetailDS.createOrReplaceTempView("tbStockDetail")
3. 解析表
- 計算所有訂單中每年的銷售單數、銷售總額
#sql語句
select c.theyear,count(distinct a.ordernumber),sum(b.amount)
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear
order by c.theyear
- 計算所有訂單每年最大金額訂單的銷售額
a、先統計每年每個訂單的銷售額
select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber
b、計算最大金額訂單的銷售額
select d.theyear,c.SumOfAmount as SumOfAmount
from
(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber) c
join tbDate d on c.dateid=d.dateid
group by d.theyear
order by theyear desc
- 計算所有訂單中每年最暢銷貨品
a、求出每年每個貨品的銷售額
select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid
b、在 a 的基礎上,統計每年單個貨品的最大金額
select d.theyear,max(d.SumOfAmount) as MaxOfAmount
from
(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by theyear
c、用最大銷售額和統計好的每個貨品的銷售額 join,以及用年 join,集合得到最暢銷貨品那一行信息
select distinct e.theyear,e.itemid,f.maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) e
join
(select d.theyear,max(d.sumofamount) as maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by d.theyear) f on e.theyear=f.theyear
and e.sumofamount=f.maxofamount order by e.theyear
轉自:大數據真好玩
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/mBMkkOSz4h-Qkf3m7IH3EA