基於 SparkSQL 的電影分析項目實戰

在本篇分享中,將介紹一個完整的項目案例,該案例會真實還原企業中 SparkSQL 的開發流程,手把手教你構建一個基於 SparkSQL 的分析系統。爲了講解方便,我會對代碼進行拆解,完整的代碼已上傳至 GitHub。

https://github.com/jiamx/spark_project_practise

項目介紹

數據集介紹

使用 MovieLens 的名稱爲 ml-25m.zip 的數據集,使用的文件時 movies.csvratings.csv,上述文件的下載地址爲:

http://files.grouplens.org/datasets/movielens/ml-25m.zip

該文件是電影數據,對應的爲維表數據,大小爲 2.89MB,包括 6 萬多部電影,其數據格式爲 [movieId,title,genres], 分別對應 [電影 id,電影名稱,電影所屬分類],樣例數據如下所示:逗號分隔

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy

該文件爲定影評分數據,對應爲事實表數據,大小爲 646MB,其數據格式爲:[userId,movieId,rating,timestamp], 分別對應 [用戶 id,電影 id,評分,時間戳],樣例數據如下所示:逗號分隔

1,296,5,1147880044

項目代碼結構

需求分析

代碼講解

該類是程序執行的入口,主要是獲取數據源,轉換成 DataFrame,並調用封裝好的業務邏輯類。

object DemoMainApp {
  // 文件路徑
  private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"
  private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"

  def main(args: Array[String])Unit = {
    // 創建spark session
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    // schema信息
    val schemaLoader = new SchemaLoader
    // 讀取Movie數據集
    val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)
    // 讀取Rating數據集
    val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)

    // 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分
    val bestFilmsByOverallRating = new BestFilmsByOverallRating
    //bestFilmsByOverallRating.run(movieDF, ratingDF, spark)

    // 需求2:查找每個電影類別及其對應的平均評分
    val genresByAverageRating = new GenresByAverageRating
    //genresByAverageRating.run(movieDF, ratingDF, spark)

    // 需求3:查找被評分次數較多的前十部電影
    val mostRatedFilms = new MostRatedFilms
    mostRatedFilms.run(movieDF, ratingDF, spark)

    spark.close()

  }
  /**
    * 讀取數據文件,轉成DataFrame
    *
    * @param spark
    * @param path
    * @param schema
    * @return
    */
  def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {

    val dataSet = spark.read
      .format("csv")
      .option("header""true")
      .schema(schema)
      .load(path)
    dataSet
  }
}

該類爲實體類,封裝了數據源的樣例類和結果表的樣例類

class Entry {

}

case class Movies(
                   movieId: String, // 電影的id
                   title: String, // 電影的標題
                   genres: String // 電影類別
                 )

case class Ratings(
                    userId: String, // 用戶的id
                    movieId: String, // 電影的id
                    rating: String, // 用戶評分
                    timestamp: String // 時間戳
                  )

// 需求1MySQL結果表
case class tenGreatestMoviesByAverageRating(
                                             movieId: String, // 電影的id
                                             title: String, // 電影的標題
                                             avgRating: String // 電影平均評分
                                           )

// 需求2MySQL結果表
case class topGenresByAverageRating(
                                     genres: String, //電影類別
                                     avgRating: String // 平均評分
                                   )

// 需求3MySQL結果表
case class tenMostRatedFilms(
                              movieId: String, // 電影的id
                              title: String, // 電影的標題
                              ratingCnt: String // 電影被評分的次數
                            )

該類封裝了數據集的 schema 信息,主要用於讀取數據源是指定 schema 信息

class SchemaLoader {
  // movies數據集schema信息
  private val movieSchema = new StructType()
    .add("movieId", DataTypes.StringType, false)
    .add("title", DataTypes.StringType, false)
    .add("genres", DataTypes.StringType, false)
 // ratings數據集schema信息
  private val ratingSchema = new StructType()
    .add("userId", DataTypes.StringType, false)
    .add("movieId", DataTypes.StringType, false)
    .add("rating", DataTypes.StringType, false)
    .add("timestamp", DataTypes.StringType, false)

  def getMovieSchema: StructType = movieSchema

  def getRatingSchema: StructType = ratingSchema
}

該類封裝了連接 MySQL 的邏輯,主要用於連接 MySQL,在業務邏輯代碼中會使用該工具類獲取 MySQL 連接,將結果數據寫入到 MySQL 中。

object JDBCUtil {
  val dataSource = new ComboPooledDataSource()
  val user = "root"
  val password = "123qwe"
  val url = "jdbc:mysql://localhost:3306/mydb"

  dataSource.setUser(user)
  dataSource.setPassword(password)
  dataSource.setDriverClass("com.mysql.jdbc.Driver")
  dataSource.setJdbcUrl(url)
  dataSource.setAutoCommitOnClose(false)
// 獲取連接
  def getQueryRunner(): Option[QueryRunner]={
    try {
      Some(new QueryRunner(dataSource))
    }catch {
      case e:Exception =>
        e.printStackTrace()
        None
    }
  }
}

需求 1 實現

需求 1 實現的業務邏輯封裝。該類有一個 run() 方法,主要是封裝計算邏輯。

/**
  * 需求1:查找電影評分個數超過5000,且平均評分較高的前十部電影名稱及其對應的平均評分
  */
class BestFilmsByOverallRating extends Serializable {

  def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
    import spark.implicits._

    // 將moviesDataset註冊成表
    moviesDataset.createOrReplaceTempView("movies")
    // 將ratingsDataset註冊成表
    ratingsDataset.createOrReplaceTempView("ratings")

    // 查詢SQL語句
    val ressql1 =
      """
         |WITH ratings_filter_cnt AS (
         |SELECT
         |     movieId,
         |     count( * ) AS rating_cnt,
         |     avg( rating ) AS avg_rating
         |FROM
         |     ratings
         |GROUP BY
         |     movieId
         |HAVING
         |     count( * ) >= 5000
         |),
         |ratings_filter_score AS (
         |SELECT
         |     movieId, -- 電影id
         |     avg_rating -- 電影平均評分
         |FROM ratings_filter_cnt
         |ORDER BY avg_rating DESC -- 平均評分降序排序
         |LIMIT 10 -- 平均分較高的前十部電影
         |)
         |SELECT
         |    m.movieId,
         |    m.title,
         |    r.avg_rating AS avgRating
         |FROM
         |   ratings_filter_score r
         |JOIN movies m ON m.movieId = r.movieId
      """.stripMargin

    val resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]
    // 打印數據
    resultDS.show(10)
    resultDS.printSchema()
    // 寫入MySQL
    resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
  }

  /**
    * 獲取連接,調用寫入MySQL數據的方法
    *
    * @param res
    */
  private def insert2Mysql(res: tenGreatestMoviesByAverageRating)Unit = {
    lazy val conn = JDBCUtil.getQueryRunner()
    conn match {
      case Some(connection) ={
        upsert(res, connection)
      }
      case None ={
        println("Mysql連接失敗")
        System.exit(-1)
      }
    }
  }

  /**
    * 封裝將結果寫入MySQL的方法
    * 執行寫入操作
    *
    * @param r
    * @param conn
    */
  private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner)Unit = {
    try {
      val sql =
        s"""
           |REPLACE INTO `ten_movies_averagerating`(
           |movieId,
           |title,
           |avgRating
           |)
           |VALUES
           |(?,?,?)
       """.stripMargin
      // 執行insert操作
      conn.update(
        sql,
        r.movieId,
        r.title,
        r.avgRating
      )
    } catch {
      case e: Exception ={
        e.printStackTrace()
        System.exit(-1)
      }
    }
  }
}

需求 1 結果

CREATE TABLE `ten_movies_averagerating` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `movieId` int(11) NOT NULL COMMENT '電影id',
  `title` varchar(100) NOT NULL COMMENT '電影名稱',
  `avgRating` decimal(10,2) NOT NULL COMMENT '平均評分',
  `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

平均評分最高的前十部電影如下:

kUd4jp

上述電影評分對應的電影中文名稱爲:

G7QNag

需求 2 實現

需求 2 實現的業務邏輯封裝。該類有一個 run() 方法,主要是封裝計算邏輯。

**
  * 需求2:查找每個電影類別及其對應的平均評分
  */
class GenresByAverageRating extends Serializable {
  def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
    import spark.implicits._
    // 將moviesDataset註冊成表
    moviesDataset.createOrReplaceTempView("movies")
    // 將ratingsDataset註冊成表
    ratingsDataset.createOrReplaceTempView("ratings")

    val ressql2 =
      """
        |WITH explode_movies AS (
        |SELECT
        | movieId,
        | title,
        | category
        |FROM
        | movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category
        |)
        |SELECT
        | m.category AS genres,
        | avg( r.rating ) AS avgRating
        |FROM
        | explode_movies m
        | JOIN ratings r ON m.movieId = r.movieId
        |GROUP BY
        | m.category
        | """.stripMargin

    val resultDS = spark.sql(ressql2).as[topGenresByAverageRating]

    // 打印數據
    resultDS.show(10)
    resultDS.printSchema()
    // 寫入MySQL
    resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))

  }

  /**
    * 獲取連接,調用寫入MySQL數據的方法
    *
    * @param res
    */
  private def insert2Mysql(res: topGenresByAverageRating)Unit = {
    lazy val conn = JDBCUtil.getQueryRunner()
    conn match {
      case Some(connection) ={
        upsert(res, connection)
      }
      case None ={
        println("Mysql連接失敗")
        System.exit(-1)
      }
    }
  }

  /**
    * 封裝將結果寫入MySQL的方法
    * 執行寫入操作
    *
    * @param r
    * @param conn
    */
  private def upsert(r: topGenresByAverageRating, conn: QueryRunner)Unit = {
    try {
      val sql =
        s"""
           |REPLACE INTO `genres_average_rating`(
           |genres,
           |avgRating
           |)
           |VALUES
           |(?,?)
       """.stripMargin
      // 執行insert操作
      conn.update(
        sql,
        r.genres,
        r.avgRating
      )
    } catch {
      case e: Exception ={
        e.printStackTrace()
        System.exit(-1)
      }
    }
  }
}

需求 2 結果

CREATE TABLE genres_average_rating (
    `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
    `genres` VARCHAR ( 100 ) NOT NULL COMMENT '電影類別',
    `avgRating` DECIMAL ( 10, 2 ) NOT NULL COMMENT '電影類別平均評分',
    `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `genres_UNIQUE` ( `genres` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;

共有 20 個電影分類,每個電影分類的平均評分爲:

9mAzMF

電影分類對應的中文名稱爲:

9WY7xH

需求 3 實現

/**
  * 需求3:查找被評分次數較多的前十部電影.
  */
class MostRatedFilms extends Serializable {
   def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {

     import spark.implicits._

     // 將moviesDataset註冊成表
     moviesDataset.createOrReplaceTempView("movies")
     // 將ratingsDataset註冊成表
     ratingsDataset.createOrReplaceTempView("ratings")

val ressql3 =
  """
    |WITH rating_group AS (
    |    SELECT
    |       movieId,
    |       count( * ) AS ratingCnt
    |    FROM ratings
    |    GROUP BY movieId
    |),
    |rating_filter AS (
    |    SELECT
    |       movieId,
    |       ratingCnt
    |    FROM rating_group
    |    ORDER BY ratingCnt DESC
    |    LIMIT 10
    |)
    |SELECT
    |    m.movieId,
    |    m.title,
    |    r.ratingCnt
    |FROM
    |    rating_filter r
    |JOIN movies m ON r.movieId = m.movieId
    |
  """.stripMargin

     val resultDS = spark.sql(ressql3).as[tenMostRatedFilms]
     // 打印數據
     resultDS.show(10)
     resultDS.printSchema()
     // 寫入MySQL
     resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))

  }

  /**
    * 獲取連接,調用寫入MySQL數據的方法
    *
    * @param res
    */
  private def insert2Mysql(res: tenMostRatedFilms)Unit = {
    lazy val conn = JDBCUtil.getQueryRunner()
    conn match {
      case Some(connection) ={
        upsert(res, connection)
      }
      case None ={
        println("Mysql連接失敗")
        System.exit(-1)
      }
    }
  }

  /**
    * 封裝將結果寫入MySQL的方法
    * 執行寫入操作
    *
    * @param r
    * @param conn
    */
  private def upsert(r: tenMostRatedFilms, conn: QueryRunner)Unit = {
    try {
      val sql =
        s"""
           |REPLACE INTO `ten_most_rated_films`(
           |movieId,
           |title,
           |ratingCnt
           |)
           |VALUES
           |(?,?,?)
       """.stripMargin
      // 執行insert操作
      conn.update(
        sql,
        r.movieId,
        r.title,
        r.ratingCnt
      )
    } catch {
      case e: Exception ={
        e.printStackTrace()
        System.exit(-1)
      }
    }
  }

}

需求 3 結果

CREATE TABLE ten_most_rated_films (
    `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
    `movieId` INT ( 11 ) NOT NULL COMMENT '電影Id',
    `title` varchar(100) NOT NULL COMMENT '電影名稱',
    `ratingCnt` INT(11) NOT NULL COMMENT '電影被評分的次數',
    `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY ( `id` ),
UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;

GBwv11

評分次數較多的電影對應的中文名稱爲:

gqJbjk

總結

本文主要是基於 SparkSQL 對 MovieLens 數據集進行統計分析,完整實現了三個需求,並給對每個需求都給出了詳細的代碼實現和結果分析。本案例還原了企業使用 SparkSQL 進行實現數據統計的基本流程,通過本文,或許你對 SparkSQL 的應用有了更加深刻的認識,希望本文對你有所幫助。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/vJPgr7hTO6T9KsnvxzxGIQ