当前位置:   article > 正文

使用SparkSQL的电影分析_spark统计出:每个类别的 平均电影评分。

spark统计出:每个类别的 平均电影评分。

项目介绍

数据集介绍

使用MovieLens的名称为ml-25m.zip的数据集,使用的文件时movies.csvratings.csv,上述文件的下载地址为:

http://files.grouplens.org/datasets/movielens/ml-25m.zip
  • movies.csv

该文件是电影数据,对应的为维表数据,大小为2.89MB,包括6万多部电影,其数据格式为[movieId,title,genres],分别对应[电影id,电影名称,电影所属分类],样例数据如下所示:逗号分隔

1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
  • ratings.csv

该文件为定影评分数据,对应为事实表数据,大小为646MB,其数据格式为:[userId,movieId,rating,timestamp],分别对应[用户id,电影id,评分,时间戳],样例数据如下所示:逗号分隔

1,296,5,1147880044

项目代码结构

图片

需求分析

  • 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分

图片

  • 需求2:查找每个电影类别及其对应的平均评分

图片

  • 需求3:查找被评分次数较多的前十部电影

图片

代码讲解

  • DemoMainApp

该类是程序执行的入口,主要是获取数据源,转换成DataFrame,并调用封装好的业务逻辑类。

  1. object DemoMainApp {
  2.   // 文件路径
  3.   private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"
  4.   private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"
  5.   def main(args: Array[String]): Unit = {
  6.     // 创建spark session
  7.     val spark = SparkSession
  8.       .builder
  9.       .master("local[4]")
  10.       .getOrCreate
  11.     // schema信息
  12.     val schemaLoader = new SchemaLoader
  13.     // 读取Movie数据集
  14.     val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)
  15.     // 读取Rating数据集
  16.     val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)
  17.     // 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分
  18.     val bestFilmsByOverallRating = new BestFilmsByOverallRating
  19.     //bestFilmsByOverallRating.run(movieDF, ratingDF, spark)
  20.     // 需求2:查找每个电影类别及其对应的平均评分
  21.     val genresByAverageRating = new GenresByAverageRating
  22.     //genresByAverageRating.run(movieDF, ratingDF, spark)
  23.     // 需求3:查找被评分次数较多的前十部电影
  24.     val mostRatedFilms = new MostRatedFilms
  25.     mostRatedFilms.run(movieDF, ratingDF, spark)
  26.     spark.close()
  27.   }
  28.   /**
  29.     * 读取数据文件,转成DataFrame
  30.     *
  31.     * @param spark
  32.     * @param path
  33.     * @param schema
  34.     * @return
  35.     */
  36.   def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {
  37.     val dataSet = spark.read
  38.       .format("csv")
  39.       .option("header""true")
  40.       .schema(schema)
  41.       .load(path)
  42.     dataSet
  43.   }
  44. }
  • Entry

该类为实体类,封装了数据源的样例类和结果表的样例类

  1. class Entry {
  2. }
  3. case class Movies(
  4.                    movieId: String// 电影的id
  5.                    title: String// 电影的标题
  6.                    genres: String // 电影类别
  7.                  )
  8. case class Ratings(
  9.                     userId: String// 用户的id
  10.                     movieId: String// 电影的id
  11.                     rating: String// 用户评分
  12.                     timestamp: String // 时间戳
  13.                   )
  14. // 需求1MySQL结果表
  15. case class tenGreatestMoviesByAverageRating(
  16.                                              movieId: String// 电影的id
  17.                                              title: String// 电影的标题
  18.                                              avgRating: String // 电影平均评分
  19.                                            )
  20. // 需求2MySQL结果表
  21. case class topGenresByAverageRating(
  22.                                      genres: String//电影类别
  23.                                      avgRating: String // 平均评分
  24.                                    )
  25. // 需求3MySQL结果表
  26. case class tenMostRatedFilms(
  27.                               movieId: String// 电影的id
  28.                               title: String// 电影的标题
  29.                               ratingCnt: String // 电影被评分的次数
  30.                             )
  • SchemaLoader

该类封装了数据集的schema信息,主要用于读取数据源是指定schema信息

  1. class SchemaLoader {
  2.   // movies数据集schema信息
  3.   private val movieSchema = new StructType()
  4.     .add("movieId", DataTypes.StringType, false)
  5.     .add("title", DataTypes.StringType, false)
  6.     .add("genres", DataTypes.StringType, false)
  7.  // ratings数据集schema信息
  8.   private val ratingSchema = new StructType()
  9.     .add("userId", DataTypes.StringType, false)
  10.     .add("movieId", DataTypes.StringType, false)
  11.     .add("rating", DataTypes.StringType, false)
  12.     .add("timestamp", DataTypes.StringType, false)
  13.   def getMovieSchema: StructType = movieSchema
  14.   def getRatingSchema: StructType = ratingSchema
  15. }
  • JDBCUtil

该类封装了连接MySQL的逻辑,主要用于连接MySQL,在业务逻辑代码中会使用该工具类获取MySQL连接,将结果数据写入到MySQL中。

  1. object JDBCUtil {
  2.   val dataSource = new ComboPooledDataSource()
  3.   val user = "root"
  4.   val password = "123qwe"
  5.   val url = "jdbc:mysql://localhost:3306/mydb"
  6.   dataSource.setUser(user)
  7.   dataSource.setPassword(password)
  8.   dataSource.setDriverClass("com.mysql.jdbc.Driver")
  9.   dataSource.setJdbcUrl(url)
  10.   dataSource.setAutoCommitOnClose(false)
  11. // 获取连接
  12.   def getQueryRunner(): Option[QueryRunner]={
  13.     try {
  14.       Some(new QueryRunner(dataSource))
  15.     }catch {
  16.       case e:Exception =>
  17.         e.printStackTrace()
  18.         None
  19.     }
  20.   }
  21. }

需求1实现

  • BestFilmsByOverallRating

需求1实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。

  1. /**
  2.   * 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分
  3.   */
  4. class BestFilmsByOverallRating extends Serializable {
  5.   def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
  6.     import spark.implicits._
  7.     // 将moviesDataset注册成表
  8.     moviesDataset.createOrReplaceTempView("movies")
  9.     // 将ratingsDataset注册成表
  10.     ratingsDataset.createOrReplaceTempView("ratings")
  11.     // 查询SQL语句
  12.     val ressql1 =
  13.       """
  14.          |WITH ratings_filter_cnt AS (
  15.          |SELECT
  16.          |     movieId,
  17.          |     count( * ) AS rating_cnt,
  18.          |     avg( rating ) AS avg_rating
  19.          |FROM
  20.          |     ratings
  21.          |GROUP BY
  22.          |     movieId
  23.          |HAVING
  24.          |     count( * ) >= 5000
  25.          |),
  26.          |ratings_filter_score AS (
  27.          |SELECT
  28.          |     movieId, -- 电影id
  29.          |     avg_rating -- 电影平均评分
  30.          |FROM ratings_filter_cnt
  31.          |ORDER BY avg_rating DESC -- 平均评分降序排序
  32.          |LIMIT 10 -- 平均分较高的前十部电影
  33.          |)
  34.          |SELECT
  35.          |    m.movieId,
  36.          |    m.title,
  37.          |    r.avg_rating AS avgRating
  38.          |FROM
  39.          |   ratings_filter_score r
  40.          |JOIN movies m ON m.movieId = r.movieId
  41.       """.stripMargin
  42.     val resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]
  43.     // 打印数据
  44.     resultDS.show(10)
  45.     resultDS.printSchema()
  46.     // 写入MySQL
  47.     resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
  48.   }
  49.   /**
  50.     * 获取连接,调用写入MySQL数据的方法
  51.     *
  52.     * @param res
  53.     */
  54.   private def insert2Mysql(res: tenGreatestMoviesByAverageRating): Unit = {
  55.     lazy val conn = JDBCUtil.getQueryRunner()
  56.     conn match {
  57.       case Some(connection) => {
  58.         upsert(res, connection)
  59.       }
  60.       case None => {
  61.         println("Mysql连接失败")
  62.         System.exit(-1)
  63.       }
  64.     }
  65.   }
  66.   /**
  67.     * 封装将结果写入MySQL的方法
  68.     * 执行写入操作
  69.     *
  70.     * @param r
  71.     * @param conn
  72.     */
  73.   private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner): Unit = {
  74.     try {
  75.       val sql =
  76.         s"""
  77.            |REPLACE INTO `ten_movies_averagerating`(
  78.            |movieId,
  79.            |title,
  80.            |avgRating
  81.            |)
  82.            |VALUES
  83.            |(?,?,?)
  84.        """.stripMargin
  85.       // 执行insert操作
  86.       conn.update(
  87.         sql,
  88.         r.movieId,
  89.         r.title,
  90.         r.avgRating
  91.       )
  92.     } catch {
  93.       case e: Exception => {
  94.         e.printStackTrace()
  95.         System.exit(-1)
  96.       }
  97.     }
  98.   }
  99. }

需求1结果

  • 结果表建表语句

  1. CREATE TABLE `ten_movies_averagerating` (
  2.   `id` int(11NOT NULL AUTO_INCREMENT COMMENT '自增id',
  3.   `movieId` int(11NOT NULL COMMENT '电影id',
  4.   `title` varchar(100NOT NULL COMMENT '电影名称',
  5.   `avgRating` decimal(10,2NOT NULL COMMENT '平均评分',
  6.   `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  7.   PRIMARY KEY (`id`),
  8.   UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
  9. ) ENGINE=InnoDB  DEFAULT CHARSET=utf8;
  • 统计结果

平均评分最高的前十部电影如下:

movieIdtitleavgRating
318Shawshank Redemption, The (1994)4.41
858Godfather, The (1972)4.32
50Usual Suspects, The (1995)4.28
1221Godfather: Part II, The (1974)4.26
527Schindler's List (1993)4.25
2019Seven Samurai (Shichinin no samurai) (1954)4.25
904Rear Window (1954)4.24
120312 Angry Men (1957)4.24
2959Fight Club (1999)4.23
1193One Flew Over the Cuckoo's Nest (1975)4.22

上述电影评分对应的电影中文名称为:

英文名称中文名称
Shawshank Redemption, The (1994)肖申克的救赎
Godfather, The (1972)教父1
Usual Suspects, The (1995)非常嫌疑犯
Godfather: Part II, The (1974)教父2
Schindler's List (1993)辛德勒的名单
Seven Samurai (Shichinin no samurai)  (1954)七武士
Rear Window (1954)后窗
12 Angry Men (1957)十二怒汉
Fight Club (1999)搏击俱乐部
One Flew Over the Cuckoo's Nest (1975)飞越疯人院

需求2实现

  • GenresByAverageRating

需求2实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。

  1. **
  2.   * 需求2:查找每个电影类别及其对应的平均评分
  3.   */
  4. class GenresByAverageRating extends Serializable {
  5.   def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {
  6.     import spark.implicits._
  7.     // 将moviesDataset注册成表
  8.     moviesDataset.createOrReplaceTempView("movies")
  9.     // 将ratingsDataset注册成表
  10.     ratingsDataset.createOrReplaceTempView("ratings")
  11.     val ressql2 =
  12.       """
  13.         |WITH explode_movies AS (
  14.         |SELECT
  15.         | movieId,
  16.         | title,
  17.         | category
  18.         |FROM
  19.         | movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category
  20.         |)
  21.         |SELECT
  22.         | m.category AS genres,
  23.         | avg( r.rating ) AS avgRating
  24.         |FROM
  25.         | explode_movies m
  26.         | JOIN ratings r ON m.movieId = r.movieId
  27.         |GROUP BY
  28.         | m.category
  29.         | """.stripMargin
  30.     val resultDS = spark.sql(ressql2).as[topGenresByAverageRating]
  31.     // 打印数据
  32.     resultDS.show(10)
  33.     resultDS.printSchema()
  34.     // 写入MySQL
  35.     resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
  36.   }
  37.   /**
  38.     * 获取连接,调用写入MySQL数据的方法
  39.     *
  40.     * @param res
  41.     */
  42.   private def insert2Mysql(res: topGenresByAverageRating): Unit = {
  43.     lazy val conn = JDBCUtil.getQueryRunner()
  44.     conn match {
  45.       case Some(connection) => {
  46.         upsert(res, connection)
  47.       }
  48.       case None => {
  49.         println("Mysql连接失败")
  50.         System.exit(-1)
  51.       }
  52.     }
  53.   }
  54.   /**
  55.     * 封装将结果写入MySQL的方法
  56.     * 执行写入操作
  57.     *
  58.     * @param r
  59.     * @param conn
  60.     */
  61.   private def upsert(r: topGenresByAverageRating, conn: QueryRunner): Unit = {
  62.     try {
  63.       val sql =
  64.         s"""
  65.            |REPLACE INTO `genres_average_rating`(
  66.            |genres,
  67.            |avgRating
  68.            |)
  69.            |VALUES
  70.            |(?,?)
  71.        """.stripMargin
  72.       // 执行insert操作
  73.       conn.update(
  74.         sql,
  75.         r.genres,
  76.         r.avgRating
  77.       )
  78.     } catch {
  79.       case e: Exception => {
  80.         e.printStackTrace()
  81.         System.exit(-1)
  82.       }
  83.     }
  84.   }
  85. }

需求2结果

  • 结果表建表语句

  1. CREATE TABLE genres_average_rating (
  2.     `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  3.     `genres` VARCHAR ( 100 ) NOT NULL COMMENT '电影类别',
  4.     `avgRating` DECIMAL ( 102 ) NOT NULL COMMENT '电影类别平均评分',
  5.     `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  6. PRIMARY KEY ( `id` ),
  7. UNIQUE KEY `genres_UNIQUE` ( `genres` )
  8. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
  • 统计结果

共有20个电影分类,每个电影分类的平均评分为:

genresavgRating
Film-Noir3.93
War3.79
Documentary3.71
Crime3.69
Drama3.68
Mystery3.67
Animation3.61
IMAX3.6
Western3.59
Musical3.55
Romance3.54
Adventure3.52
Thriller3.52
Fantasy3.51
Sci-Fi3.48
Action3.47
Children3.43
Comedy3.42
(no genres listed)3.33
Horror3.29

电影分类对应的中文名称为:

分类中文名称
Film-Noir黑色电影
War战争
Documentary纪录片
Crime犯罪
Drama历史剧
Mystery推理
Animation动画片
IMAX巨幕电影
Western西部电影
Musical音乐
Romance浪漫
Adventure冒险
Thriller惊悚片
Fantasy魔幻电影
Sci-Fi科幻
Action动作
Children儿童
Comedy喜剧
(no genres listed)未分类
Horror恐怖

需求3实现

  • MostRatedFilms

    需求3实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。

  1. /**
  2.   * 需求3:查找被评分次数较多的前十部电影.
  3.   */
  4. class MostRatedFilms extends Serializable {
  5.    def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {
  6.      import spark.implicits._
  7.      // 将moviesDataset注册成表
  8.      moviesDataset.createOrReplaceTempView("movies")
  9.      // 将ratingsDataset注册成表
  10.      ratingsDataset.createOrReplaceTempView("ratings")
  11. val ressql3 =
  12.   """
  13.     |WITH rating_group AS (
  14.     |    SELECT
  15.     |       movieId,
  16.     |       count( * ) AS ratingCnt
  17.     |    FROM ratings
  18.     |    GROUP BY movieId
  19.     |),
  20.     |rating_filter AS (
  21.     |    SELECT
  22.     |       movieId,
  23.     |       ratingCnt
  24.     |    FROM rating_group
  25.     |    ORDER BY ratingCnt DESC
  26.     |    LIMIT 10
  27.     |)
  28.     |SELECT
  29.     |    m.movieId,
  30.     |    m.title,
  31.     |    r.ratingCnt
  32.     |FROM
  33.     |    rating_filter r
  34.     |JOIN movies m ON r.movieId = m.movieId
  35.     |
  36.   """.stripMargin
  37.      val resultDS = spark.sql(ressql3).as[tenMostRatedFilms]
  38.      // 打印数据
  39.      resultDS.show(10)
  40.      resultDS.printSchema()
  41.      // 写入MySQL
  42.      resultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))
  43.   }
  44.   /**
  45.     * 获取连接,调用写入MySQL数据的方法
  46.     *
  47.     * @param res
  48.     */
  49.   private def insert2Mysql(res: tenMostRatedFilms): Unit = {
  50.     lazy val conn = JDBCUtil.getQueryRunner()
  51.     conn match {
  52.       case Some(connection) => {
  53.         upsert(res, connection)
  54.       }
  55.       case None => {
  56.         println("Mysql连接失败")
  57.         System.exit(-1)
  58.       }
  59.     }
  60.   }
  61.   /**
  62.     * 封装将结果写入MySQL的方法
  63.     * 执行写入操作
  64.     *
  65.     * @param r
  66.     * @param conn
  67.     */
  68.   private def upsert(r: tenMostRatedFilms, conn: QueryRunner): Unit = {
  69.     try {
  70.       val sql =
  71.         s"""
  72.            |REPLACE INTO `ten_most_rated_films`(
  73.            |movieId,
  74.            |title,
  75.            |ratingCnt
  76.            |)
  77.            |VALUES
  78.            |(?,?,?)
  79.        """.stripMargin
  80.       // 执行insert操作
  81.       conn.update(
  82.         sql,
  83.         r.movieId,
  84.         r.title,
  85.         r.ratingCnt
  86.       )
  87.     } catch {
  88.       case e: Exception => {
  89.         e.printStackTrace()
  90.         System.exit(-1)
  91.       }
  92.     }
  93.   }
  94. }

需求3结果

  • 结果表创建语句

  1. CREATE TABLE ten_most_rated_films (
  2.     `id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  3.     `movieId` INT ( 11 ) NOT NULL COMMENT '电影Id',
  4.     `title` varchar(100NOT NULL COMMENT '电影名称',
  5.     `ratingCnt` INT(11NOT NULL COMMENT '电影被评分的次数',
  6.     `update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  7. PRIMARY KEY ( `id` ),
  8. UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
  9. ) ENGINE = INNODB DEFAULT CHARSET = utf8;
  • 统计结果

movieIdtitleratingCnt
356Forrest Gump (1994)81491
318Shawshank Redemption, The (1994)81482
296Pulp Fiction (1994)79672
593Silence of the Lambs, The (1991)74127
2571Matrix, The (1999)72674
260Star Wars: Episode IV - A New Hope (1977)68717
480Jurassic Park (1993)64144
527Schindler's List (1993)60411
110Braveheart (1995)59184
2959Fight Club (1999)58773

评分次数较多的电影对应的中文名称为:

英文名称中文名称
Forrest Gump (1994)阿甘正传
Shawshank Redemption, The (1994)肖申克的救赎
Pulp Fiction (1994)低俗小说
Silence of the Lambs, The (1991)沉默的羔羊
Matrix, The (1999)黑客帝国
Star Wars: Episode IV - A New Hope (1977)星球大战
Jurassic Park (1993)侏罗纪公园
Schindler's List (1993)辛德勒的名单
Braveheart (1995)勇敢的心
Fight Club (1999)搏击俱乐部

总结

本文主要是基于SparkSQL对MovieLens数据集进行统计分析,完整实现了三个需求,并给对每个需求都给出了详细的代码实现和结果分析。本案例还原了企业使用SparkSQL进行实现数据统计的基本流程

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/780272
推荐阅读
相关标签
  

闽ICP备14008679号