当前位置:   article > 正文

spark_Sql 基于用户对电影的一系列评分数据进行离线任务分析_本关任务:计算happiness is in the field (1995)这部电影的评分次数,要

本关任务:计算happiness is in the field (1995)这部电影的评分次数,要求显示电影

在这里插入图片描述在这里插入图片描述

package com.briup.spark.sc.movies_analyse

import com.briup.Utils.{getSC, getSS}
import org.apache.log4j.Level
import org.apache.spark.{SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, RelationalGroupedDataset, Row, SparkSession}

object MovieAnalyseCase {
  org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)

  //读取三张表的数据
  private val spark: SparkSession = getSS

  import spark.implicits._

  //电影movies.dat字段组成:  电影id::名字::类别
  private val movies: DataFrame = spark.read.format("csv")
    .option("sep", "::")
    .option("inferSchema", "true")
    .load("data/movies.dat").toDF("moviesid", "name", "type")

  //评分表ratings.dat字段组成:用户id::电影id::评分::时间戳
  private val rats: DataFrame = spark.read.format("csv")
    .option("sep", "::")
    .option("inferSchema", "true")
    .load("data/ratings.dat").toDF("user_id", "moviesid", "score", "timestamp")

  //user.dat数据组成:   用户id::性别::年龄::职业代码::邮编
  private val users: DataFrame = spark.read.format("csv")
    .option("sep", "::")
    .option("inferSchema", "true")
    .load("data/users.dat").toDF("user_id", "gender", "age", "dept", "postcode")


  /**
   * 男女用户的比例
   * user.dat数据组成:
   * 用户id::性别::年龄::职业代码::邮编
   */
  def sexAnalyse: Unit = {
    val sc: SparkContext = getSC
    val userRDD: RDD[String] = sc.textFile("data/users.dat")

    val groupRDD: RDD[(String, Iterable[String])] = userRDD.map(_.split("::")(1)).groupBy(x => x)

    //    groupRDD.foreach(println)
    val resultRDD: RDD[(String, Int)] = groupRDD.map(x => (x._1, x._2.size))

    //得到男女人数的数组
    val res: Array[(String, Int)] = resultRDD.collect()

    //计算男女比例    男/女
    var finalRes: Double = res(0)._2.toDouble / res(1)._2.toDouble

    println(finalRes)
  }

  /**
   * 统计每个用户的平均评分中,排名前十和最后十名的用户及其评分分别是多少
   * 评分表ratings.dat字段组成:
   * 用户id::电影id::评分::时间戳
   */
  def ratingScore(): Unit = {
    val sc: SparkContext = getSC

    //textFile默认为2个分区
    val rdd: RDD[String] = sc.textFile("data/ratings.dat")

    //得到(用户id,评分)的rdd
    val rdd1: RDD[(String, Int)] = rdd.map(x => {
      val strs: Array[String] = x.split("::")
      (strs(0), strs(2).toInt)
    })

    //    rdd1.foreach(println)

    //得到(用户id,总评分)的数据集rdd
    val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_ + _)

    //分组得到(用户id,评分集合)的数据集rdd
    val rdd3: RDD[(String, Iterable[Int])] = rdd1.groupByKey()

    //求每个用户的平均评分
    val resultAvg: RDD[(String, Int)] = rdd3.map(x => {
      val avg: Int = x._2.reduce(_ + _) / x._2.size
      (x._1, avg)
    })

    //求出每个用户的平均评分中,排名前十和最后十名的用户及其评分分别是多少
    //  1.先对resultAvg进行升序排序
    val sortRes: RDD[(String, Int)] = resultAvg.sortBy(_._2)

    //得到每个用户的平均评分中,最后十名的用户及其评分
    val tail10: Array[(String, Int)] = sortRes.take(10)
    //    tail10.foreach(println)

    //得到每个用户的平均评分中,排名前十的用户及其评分
    //  2.先对resultAvg进行降序排序
    val top10: Array[(String, Int)] = resultAvg.sortBy(_._2, false).take(10)
    top10.foreach(println)

  }

  /**
   * 按性别计算每部电源的平均得分
   * 评分表ratings.dat字段组成:
   * 用户id::电影id::评分::时间戳
   * 最后结果
   * (男,电影id,平均评分)
   */
  def sexScoreAnalyse(): Unit = {
    val sc = getSC

    //1. 首先得到(用户id,电影id,评分)
    //获取评分表的数据  (用户id,电影id,评分)
    val rates = sc.textFile("data/ratings.dat").map(x => {
      val strs: Array[String] = x.split("::")
      (strs(0), strs(1), strs(2))
    })


    //2. 基于(用户id, 电源id)求电源评分的平均值
    val rdd: RDD[(String, (String, Double))] = rates.map(x => (x._1, (x._2, x._3.toDouble)))
    //    rdd.foreach(println)

    //**************************************************
    //基于用户表得到 (用户id:性别)
    val users: RDD[(String, String)] = sc.textFile("data/users.dat").map(x => {
      val strs: Array[String] = x.split("::")
      (strs(0), strs(1))
    })


    //(男,电影id,平均评分)
    //得到(用户id,(电影id,评分),性别))
    val joinRDD: RDD[(String, ((String, Double), Option[String]))] = rdd.leftOuterJoin(users)
    //    joinRDD.foreach(println)

    //得到((性别,电源id),电源评分)
    val sexRDD: RDD[((String, String), Double)] = joinRDD.map(x => {
      val moviesAndScore: (String, Double) = x._2._1
      val sex: String = x._2._2.get
      ((sex, moviesAndScore._1), moviesAndScore._2)
    })

    val groupSexRDD: RDD[((String, String), Iterable[Double])] = sexRDD.groupByKey()

    val avgSexRDD: RDD[((String, String), Double)] = groupSexRDD.mapValues(x => {
      val sum: Double = x.reduce(_ + _)
      val avg: Double = sum / x.size
      avg
    })

    avgSexRDD.foreach(println)

  }

  /**
   * 过滤掉评分数据不够250条的电影,按性别计算每部电影的平均分
   * 评分表:用户id,电影id,评分,时间戳
   */
  def filterMovies: Unit = {
    org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)
    val spark: SparkSession = getSS
    import spark.implicits._

    // Step1: 过滤掉评分数据不足250条的电影
    val df: DataFrame = spark.read.format("csv")
      .option("sep", "::")
      .load("data/ratings.dat")
      .toDF("user_id", "moviesid", "score", "timestamp")

    //    df.show(20,false)
    df.createOrReplaceTempView("rating")

    //    spark.sql("select user_id,movie_id from rating").show()

    //过滤掉评分数据不足250条的电影id及数量
    val rats_over250: Dataset[Row] = df.groupBy("moviesid").count().filter($"count" >= 250)
    //    rats_over250.show(false)

    //得到过滤之后的评分表
    val rats_fiter: DataFrame = rats_over250.join(rats, Seq("moviesid"), "left_outer")

    //得到过滤之后评分表与用户表进行等值连接
    val user_rats: DataFrame = rats_fiter.join(users, "user_id")

    user_rats.printSchema()
    val moviesInfo: RelationalGroupedDataset = user_rats.groupBy("moviesid", "gender")
    val result: DataFrame = moviesInfo.avg("score").join(movies, "moviesid").toDF("moviesid", "gender", "scores",
      "name", "type")

    result.show(false)
    //    user_rats.show(false)
  }


  /**
   * 男女观众分别最喜欢的前十部电影
   */
  def five: Unit = {
    val rats_user: DataFrame = rats.join(users, "user_id")
    val group: RelationalGroupedDataset = rats_user.groupBy("moviesid", "gender")
    val genderMovie: DataFrame = group.avg("score").join(movies, "moviesid").toDF("moviesid", "gender", "score", "name", "type")
    println("男女观众分别最喜欢的前十部电影:")
    val f: Dataset[Row] = genderMovie.filter($"gender" === "F").sort($"score".desc, $"moviesid".asc).limit(10)
    val m: Dataset[Row] = genderMovie.filter($"gender" === "M").sort($"score".desc, $"moviesid".asc).limit(10)
    f.show()
    m.show()
  }

  /**
   * 6.男女观众评分差别最大的十部电影
   * movies:  "moviesid", "name", "type"
   * rats:  "user_id", "moviesid", "score", "timestamp"
   * users: "user_id", "gender", "age", "dept", "postcode"
   */

  def six: Unit = {
    //uesrs表和rats表基于user_id进行等值连接
    val rats_users: DataFrame = rats.join(users, "user_id")
    rats_users.groupBy("moviesid", "gender").avg("score")
      .sort($"moviesid")
      .toDF("moviesid", "gender", "avg_score")
      .groupBy("moviesid").pivot("gender")
      .max("avg_score")
      .sort("moviesid")
      //      .show()
      .selectExpr("moviesid", "F", "M", "abs(F - M) df")
      .sort($"df".desc, $"moviesid")
      .limit(10)
      .join(movies, "moviesid")
      .select("moviesid", "name", "F", "M", "df")
      .sort($"df".desc, $"moviesid")
      .show(false)

  }

  /**
   * 7.所有观众评分分歧最大的十部电影(电影评分标准差大者,分歧则大)
   * movies:  "moviesid", "name", "type"
   * rats:  "user_id", "moviesid", "score", "timestamp"
   * users: "user_id", "gender", "age", "dept", "postcode"
   */
  def seven(): Unit = {
    /*
    思路:
        基于rats表,对moviesid分组,求每部电影评分的标准差
        然后将得到的表与movies表进行等值连接
     */
    val baseData=rats.groupBy("moviesid").agg("score"->"avg")
      .join(rats,Seq("moviesid"),"right_outer")
    val mData=baseData.map(row=>{
      val avg=row.getDouble(1);
      val score=row.getInt(3);
      //sqrt(1/N*((score1-avg)^2+(score1-avg)^2+(score1-avg)^2))
      val result=Math.pow((score-avg),2);
      (row.getString(0),result);
    }).toDF("moviesid","score");
    val mData2=mData.groupBy("moviesid").agg("score"->"sum","score"->"count").toDF("moviesid","sum","count");
    val result=mData2.map(row=>{
      val sum=row.getDouble(1);
      val count=row.getLong(2);
      val result=Math.sqrt(sum/count);
      (row.getString(0),result)
    }).toDF("moviesid","result").join(movies,Seq("moviesid"),"left_outer")
      .sort($"result".desc,$"moviesid".asc).limit(10);

    result.show(false)
  }

  def main(args: Array[String]): Unit = {
    seven
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/404642?site
推荐阅读
相关标签
  

闽ICP备14008679号