基於 SparkSQL 的電影分析項目實戰
在本篇分享中,將介紹一個完整的項目案例,該案例會真實還原企業中 SparkSQL 的開發流程,手把手教你構建一個基於 SparkSQL 的分析系統。爲了講解方便,我會對代碼進行拆解,完整的代碼已上傳至 GitHub。
https://github.com/jiamx/spark_project_practise
項目介紹
數據集介紹
使用 MovieLens 的名稱爲 ml-25m.zip 的數據集,使用的文件時 movies.csv 和 ratings.csv,上述文件的下載地址爲:
http://files.grouplens.org/datasets/movielens/ml-25m.zip
- movies.csv
該文件是電影數據,對應的爲維表數據,大小爲 2.89MB,包括 6 萬多部電影,其數據格式爲 [movieId,title,genres], 分別對應 [電影 id,電影名稱,電影所屬分類],樣例數據如下所示:逗號分隔
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
- ratings.csv
該文件爲定影評分數據,對應爲事實表數據,大小爲 646MB,其數據格式爲:[userId,movieId,rating,timestamp], 分別對應 [用戶 id,電影 id,評分,時間戳],樣例數據如下所示:逗號分隔
1,296,5,1147880044
項目代碼結構
需求分析
- 需求 1:查找電影評分個數超過 5000, 且平均評分較高的前十部電影名稱及其對應的平均評分
- 需求 2:查找每個電影類別及其對應的平均評分
- 需求 3:查找被評分次數較多的前十部電影
代碼講解
- DemoMainApp
該類是程序執行的入口,主要是獲取數據源,轉換成 DataFrame,並調用封裝好的業務邏輯類。
object DemoMainApp {
// 文件路徑
private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"
private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"
def main(args: Array[String]): Unit = {
// 創建spark session
val spark = SparkSession
.builder
.master("local[4]")
.getOrCreate
// schema信息
val schemaLoader = new SchemaLoader
// 讀取Movie數據集
val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)
// 讀取Rating數據集
val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)
// 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分
val bestFilmsByOverallRating = new BestFilmsByOverallRating
//bestFilmsByOverallRating.run(movieDF, ratingDF, spark)
// 需求2:查找每個電影類別及其對應的平均評分
val genresByAverageRating = new GenresByAverageRating
//genresByAverageRating.run(movieDF, ratingDF, spark)
// 需求3:查找被評分次數較多的前十部電影
val mostRatedFilms = new MostRatedFilms
mostRatedFilms.run(movieDF, ratingDF, spark)
spark.close()
}
/**
* 讀取數據文件,轉成DataFrame
*
* @param spark
* @param path
* @param schema
* @return
*/
def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {
val dataSet = spark.read
.format("csv")
.option("header", "true")
.schema(schema)
.load(path)
dataSet
}
}
- Entry
該類爲實體類,封裝了數據源的樣例類和結果表的樣例類
class Entry {
}
case class Movies(
movieId: String, // 電影的id
title: String, // 電影的標題
genres: String // 電影類別
)
case class Ratings(
userId: String, // 用戶的id
movieId: String, // 電影的id
rating: String, // 用戶評分
timestamp: String // 時間戳
)
// 需求1MySQL結果表
case class tenGreatestMoviesByAverageRating(
movieId: String, // 電影的id
title: String, // 電影的標題
avgRating: String // 電影平均評分
)
// 需求2MySQL結果表
case class topGenresByAverageRating(
genres: String, //電影類別
avgRating: String // 平均評分
)
// 需求3MySQL結果表
case class tenMostRatedFilms(
movieId: String, // 電影的id
title: String, // 電影的標題
ratingCnt: String // 電影被評分的次數
)
- SchemaLoader
該類封裝了數據集的 schema 信息,主要用於讀取數據源是指定 schema 信息
class SchemaLoader {
// movies數據集schema信息
private val movieSchema = new StructType()
.add("movieId", DataTypes.StringType, false)
.add("title", DataTypes.StringType, false)
.add("genres", DataTypes.StringType, false)
// ratings數據集schema信息
private val ratingSchema = new StructType()
.add("userId", DataTypes.StringType, false)
.add("movieId", DataTypes.StringType, false)
.add("rating", DataTypes.StringType, false)
.add("timestamp", DataTypes.StringType, false)
def getMovieSchema: StructType = movieSchema
def getRatingSchema: StructType = ratingSchema
}
- JDBCUtil
該類封裝了連接 MySQL 的邏輯,主要用於連接 MySQL,在業務邏輯代碼中會使用該工具類獲取 MySQL 連接,將結果數據寫入到 MySQL 中。
object JDBCUtil {
val dataSource = new ComboPooledDataSource()
val user = "root"
val password = "123qwe"
val url = "jdbc:mysql://localhost:3306/mydb"
dataSource.setUser(user)
dataSource.setPassword(password)
dataSource.setDriverClass("com.mysql.jdbc.Driver")
dataSource.setJdbcUrl(url)
dataSource.setAutoCommitOnClose(false)
// 獲取連接
def getQueryRunner(): Option[QueryRunner]={
try {
Some(new QueryRunner(dataSource))
}catch {
case e:Exception =>
e.printStackTrace()
None
}
}
}
需求 1 實現
- BestFilmsByOverallRating
需求 1 實現的業務邏輯封裝。該類有一個 run() 方法,主要是封裝計算邏輯。
/**
* 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分
*/
class BestFilmsByOverallRating extends Serializable {
def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
import spark.implicits._
// 將moviesDataset註冊成表
moviesDataset.createOrReplaceTempView("movies")
// 將ratingsDataset註冊成表
ratingsDataset.createOrReplaceTempView("ratings")
// 查詢SQL語句
val ressql1 =
"""
|WITH ratings_filter_cnt AS (
|SELECT
| movieId,
| count( * ) AS rating_cnt,
| avg( rating ) AS avg_rating
|FROM
| ratings
|GROUP BY
| movieId
|HAVING
| count( * ) >= 5000
|),
|ratings_filter_score AS (
|SELECT
| movieId, -- 電影id
| avg_rating -- 電影平均評分
|FROM ratings_filter_cnt
|ORDER BY avg_rating DESC -- 平均評分降序排序
|LIMIT 10 -- 平均分較高的前十部電影
|)
|SELECT
| m.movieId,
| m.title,
| r.avg_rating AS avgRating
|FROM
| ratings_filter_score r
|JOIN movies m ON m.movieId = r.movieId
""".stripMargin
val resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]
// 打印數據
resultDS.show(10)
resultDS.printSchema()
// 寫入MySQL
resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
}
/**
* 獲取連接,調用寫入MySQL數據的方法
*
* @param res
*/
private def insert2Mysql(res: tenGreatestMoviesByAverageRating): Unit = {
lazy val conn = JDBCUtil.getQueryRunner()
conn match {
case Some(connection) => {
upsert(res, connection)
}
case None => {
println("Mysql連接失敗")
System.exit(-1)
}
}
}
/**
* 封裝將結果寫入MySQL的方法
* 執行寫入操作
*
* @param r
* @param conn
*/
private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner): Unit = {
try {
val sql =
s"""
|REPLACE INTO `ten_movies_averagerating`(
|movieId,
|title,
|avgRating
|)
|VALUES
|(?,?,?)
""".stripMargin
// 執行insert操作
conn.update(
sql,
r.movieId,
r.title,
r.avgRating
)
} catch {
case e: Exception => {
e.printStackTrace()
System.exit(-1)
}
}
}
}
需求 1 結果
- 結果表建表語句
CREATE TABLE `ten_movies_averagerating` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`movieId` int(11) NOT NULL COMMENT '電影id',
`title` varchar(100) NOT NULL COMMENT '電影名稱',
`avgRating` decimal(10,2) NOT NULL COMMENT '平均評分',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`id`),
UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 統計結果
平均評分最高的前十部電影如下:
上述電影評分對應的電影中文名稱爲:
需求 2 實現
- GenresByAverageRating
需求 2 實現的業務邏輯封裝。該類有一個 run() 方法,主要是封裝計算邏輯。
**
* 需求2:查找每個電影類別及其對應的平均評分
*/
class GenresByAverageRating extends Serializable {
def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
import spark.implicits._
// 將moviesDataset註冊成表
moviesDataset.createOrReplaceTempView("movies")
// 將ratingsDataset註冊成表
ratingsDataset.createOrReplaceTempView("ratings")
val ressql2 =
"""
|WITH explode_movies AS (
|SELECT
| movieId,
| title,
| category
|FROM
| movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category
|)
|SELECT
| m.category AS genres,
| avg( r.rating ) AS avgRating
|FROM
| explode_movies m
| JOIN ratings r ON m.movieId = r.movieId
|GROUP BY
| m.category
| """.stripMargin
val resultDS = spark.sql(ressql2).as[topGenresByAverageRating]
// 打印數據
resultDS.show(10)
resultDS.printSchema()
// 寫入MySQL
resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
}
/**
* 獲取連接,調用寫入MySQL數據的方法
*
* @param res
*/
private def insert2Mysql(res: topGenresByAverageRating): Unit = {
lazy val conn = JDBCUtil.getQueryRunner()
conn match {
case Some(connection) => {
upsert(res, connection)
}
case None => {
println("Mysql連接失敗")
System.exit(-1)
}
}
}
/**
* 封裝將結果寫入MySQL的方法
* 執行寫入操作
*
* @param r
* @param conn
*/
private def upsert(r: topGenresByAverageRating, conn: QueryRunner): Unit = {
try {
val sql =
s"""
|REPLACE INTO `genres_average_rating`(
|genres,
|avgRating
|)
|VALUES
|(?,?)
""".stripMargin
// 執行insert操作
conn.update(
sql,
r.genres,
r.avgRating
)
} catch {
case e: Exception => {
e.printStackTrace()
System.exit(-1)
}
}
}
}
需求 2 結果
- 結果表建表語句
CREATE TABLE genres_average_rating (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`genres` VARCHAR ( 100 ) NOT NULL COMMENT '電影類別',
`avgRating` DECIMAL ( 10, 2 ) NOT NULL COMMENT '電影類別平均評分',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `genres_UNIQUE` ( `genres` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
- 統計結果
共有 20 個電影分類,每個電影分類的平均評分爲:
電影分類對應的中文名稱爲:
需求 3 實現
-
MostRatedFilms
需求 3 實現的業務邏輯封裝。該類有一個 run() 方法,主要是封裝計算邏輯。
/**
* 需求3:查找被評分次數較多的前十部電影.
*/
class MostRatedFilms extends Serializable {
def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {
import spark.implicits._
// 將moviesDataset註冊成表
moviesDataset.createOrReplaceTempView("movies")
// 將ratingsDataset註冊成表
ratingsDataset.createOrReplaceTempView("ratings")
val ressql3 =
"""
|WITH rating_group AS (
| SELECT
| movieId,
| count( * ) AS ratingCnt
| FROM ratings
| GROUP BY movieId
|),
|rating_filter AS (
| SELECT
| movieId,
| ratingCnt
| FROM rating_group
| ORDER BY ratingCnt DESC
| LIMIT 10
|)
|SELECT
| m.movieId,
| m.title,
| r.ratingCnt
|FROM
| rating_filter r
|JOIN movies m ON r.movieId = m.movieId
|
""".stripMargin
val resultDS = spark.sql(ressql3).as[tenMostRatedFilms]
// 打印數據
resultDS.show(10)
resultDS.printSchema()
// 寫入MySQL
resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
}
/**
* 獲取連接,調用寫入MySQL數據的方法
*
* @param res
*/
private def insert2Mysql(res: tenMostRatedFilms): Unit = {
lazy val conn = JDBCUtil.getQueryRunner()
conn match {
case Some(connection) => {
upsert(res, connection)
}
case None => {
println("Mysql連接失敗")
System.exit(-1)
}
}
}
/**
* 封裝將結果寫入MySQL的方法
* 執行寫入操作
*
* @param r
* @param conn
*/
private def upsert(r: tenMostRatedFilms, conn: QueryRunner): Unit = {
try {
val sql =
s"""
|REPLACE INTO `ten_most_rated_films`(
|movieId,
|title,
|ratingCnt
|)
|VALUES
|(?,?,?)
""".stripMargin
// 執行insert操作
conn.update(
sql,
r.movieId,
r.title,
r.ratingCnt
)
} catch {
case e: Exception => {
e.printStackTrace()
System.exit(-1)
}
}
}
}
需求 3 結果
- 結果表創建語句
CREATE TABLE ten_most_rated_films (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`movieId` INT ( 11 ) NOT NULL COMMENT '電影Id',
`title` varchar(100) NOT NULL COMMENT '電影名稱',
`ratingCnt` INT(11) NOT NULL COMMENT '電影被評分的次數',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
- 統計結果
評分次數較多的電影對應的中文名稱爲:
總結
本文主要是基於 SparkSQL 對 MovieLens 數據集進行統計分析,完整實現了三個需求,並給對每個需求都給出了詳細的代碼實現和結果分析。本案例還原了企業使用 SparkSQL 進行實現數據統計的基本流程,通過本文,或許你對 SparkSQL 的應用有了更加深刻的認識,希望本文對你有所幫助。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/vJPgr7hTO6T9KsnvxzxGIQ