赞
踩
package com.briup.spark.sc.movies_analyse import com.briup.Utils.{getSC, getSS} import org.apache.log4j.Level import org.apache.spark.{SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, RelationalGroupedDataset, Row, SparkSession} object MovieAnalyseCase { org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR) //读取三张表的数据 private val spark: SparkSession = getSS import spark.implicits._ //电影movies.dat字段组成: 电影id::名字::类别 private val movies: DataFrame = spark.read.format("csv") .option("sep", "::") .option("inferSchema", "true") .load("data/movies.dat").toDF("moviesid", "name", "type") //评分表ratings.dat字段组成:用户id::电影id::评分::时间戳 private val rats: DataFrame = spark.read.format("csv") .option("sep", "::") .option("inferSchema", "true") .load("data/ratings.dat").toDF("user_id", "moviesid", "score", "timestamp") //user.dat数据组成: 用户id::性别::年龄::职业代码::邮编 private val users: DataFrame = spark.read.format("csv") .option("sep", "::") .option("inferSchema", "true") .load("data/users.dat").toDF("user_id", "gender", "age", "dept", "postcode") /** * 男女用户的比例 * user.dat数据组成: * 用户id::性别::年龄::职业代码::邮编 */ def sexAnalyse: Unit = { val sc: SparkContext = getSC val userRDD: RDD[String] = sc.textFile("data/users.dat") val groupRDD: RDD[(String, Iterable[String])] = userRDD.map(_.split("::")(1)).groupBy(x => x) // groupRDD.foreach(println) val resultRDD: RDD[(String, Int)] = groupRDD.map(x => (x._1, x._2.size)) //得到男女人数的数组 val res: Array[(String, Int)] = resultRDD.collect() //计算男女比例 男/女 var finalRes: Double = res(0)._2.toDouble / res(1)._2.toDouble println(finalRes) } /** * 统计每个用户的平均评分中,排名前十和最后十名的用户及其评分分别是多少 * 评分表ratings.dat字段组成: * 用户id::电影id::评分::时间戳 */ def ratingScore(): Unit = { val sc: SparkContext = getSC //textFile默认为2个分区 val rdd: RDD[String] = sc.textFile("data/ratings.dat") //得到(用户id,评分)的rdd val rdd1: RDD[(String, Int)] = rdd.map(x => { val strs: Array[String] = x.split("::") (strs(0), strs(2).toInt) }) // rdd1.foreach(println) //得到(用户id,总评分)的数据集rdd val rdd2: RDD[(String, Int)] = rdd1.reduceByKey(_ + _) //分组得到(用户id,评分集合)的数据集rdd val rdd3: RDD[(String, Iterable[Int])] = rdd1.groupByKey() //求每个用户的平均评分 val resultAvg: RDD[(String, Int)] = rdd3.map(x => { val avg: Int = x._2.reduce(_ + _) / x._2.size (x._1, avg) }) //求出每个用户的平均评分中,排名前十和最后十名的用户及其评分分别是多少 // 1.先对resultAvg进行升序排序 val sortRes: RDD[(String, Int)] = resultAvg.sortBy(_._2) //得到每个用户的平均评分中,最后十名的用户及其评分 val tail10: Array[(String, Int)] = sortRes.take(10) // tail10.foreach(println) //得到每个用户的平均评分中,排名前十的用户及其评分 // 2.先对resultAvg进行降序排序 val top10: Array[(String, Int)] = resultAvg.sortBy(_._2, false).take(10) top10.foreach(println) } /** * 按性别计算每部电源的平均得分 * 评分表ratings.dat字段组成: * 用户id::电影id::评分::时间戳 * 最后结果 * (男,电影id,平均评分) */ def sexScoreAnalyse(): Unit = { val sc = getSC //1. 首先得到(用户id,电影id,评分) //获取评分表的数据 (用户id,电影id,评分) val rates = sc.textFile("data/ratings.dat").map(x => { val strs: Array[String] = x.split("::") (strs(0), strs(1), strs(2)) }) //2. 基于(用户id, 电源id)求电源评分的平均值 val rdd: RDD[(String, (String, Double))] = rates.map(x => (x._1, (x._2, x._3.toDouble))) // rdd.foreach(println) //************************************************** //基于用户表得到 (用户id:性别) val users: RDD[(String, String)] = sc.textFile("data/users.dat").map(x => { val strs: Array[String] = x.split("::") (strs(0), strs(1)) }) //(男,电影id,平均评分) //得到(用户id,(电影id,评分),性别)) val joinRDD: RDD[(String, ((String, Double), Option[String]))] = rdd.leftOuterJoin(users) // joinRDD.foreach(println) //得到((性别,电源id),电源评分) val sexRDD: RDD[((String, String), Double)] = joinRDD.map(x => { val moviesAndScore: (String, Double) = x._2._1 val sex: String = x._2._2.get ((sex, moviesAndScore._1), moviesAndScore._2) }) val groupSexRDD: RDD[((String, String), Iterable[Double])] = sexRDD.groupByKey() val avgSexRDD: RDD[((String, String), Double)] = groupSexRDD.mapValues(x => { val sum: Double = x.reduce(_ + _) val avg: Double = sum / x.size avg }) avgSexRDD.foreach(println) } /** * 过滤掉评分数据不够250条的电影,按性别计算每部电影的平均分 * 评分表:用户id,电影id,评分,时间戳 */ def filterMovies: Unit = { org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR) val spark: SparkSession = getSS import spark.implicits._ // Step1: 过滤掉评分数据不足250条的电影 val df: DataFrame = spark.read.format("csv") .option("sep", "::") .load("data/ratings.dat") .toDF("user_id", "moviesid", "score", "timestamp") // df.show(20,false) df.createOrReplaceTempView("rating") // spark.sql("select user_id,movie_id from rating").show() //过滤掉评分数据不足250条的电影id及数量 val rats_over250: Dataset[Row] = df.groupBy("moviesid").count().filter($"count" >= 250) // rats_over250.show(false) //得到过滤之后的评分表 val rats_fiter: DataFrame = rats_over250.join(rats, Seq("moviesid"), "left_outer") //得到过滤之后评分表与用户表进行等值连接 val user_rats: DataFrame = rats_fiter.join(users, "user_id") user_rats.printSchema() val moviesInfo: RelationalGroupedDataset = user_rats.groupBy("moviesid", "gender") val result: DataFrame = moviesInfo.avg("score").join(movies, "moviesid").toDF("moviesid", "gender", "scores", "name", "type") result.show(false) // user_rats.show(false) } /** * 男女观众分别最喜欢的前十部电影 */ def five: Unit = { val rats_user: DataFrame = rats.join(users, "user_id") val group: RelationalGroupedDataset = rats_user.groupBy("moviesid", "gender") val genderMovie: DataFrame = group.avg("score").join(movies, "moviesid").toDF("moviesid", "gender", "score", "name", "type") println("男女观众分别最喜欢的前十部电影:") val f: Dataset[Row] = genderMovie.filter($"gender" === "F").sort($"score".desc, $"moviesid".asc).limit(10) val m: Dataset[Row] = genderMovie.filter($"gender" === "M").sort($"score".desc, $"moviesid".asc).limit(10) f.show() m.show() } /** * 6.男女观众评分差别最大的十部电影 * movies: "moviesid", "name", "type" * rats: "user_id", "moviesid", "score", "timestamp" * users: "user_id", "gender", "age", "dept", "postcode" */ def six: Unit = { //uesrs表和rats表基于user_id进行等值连接 val rats_users: DataFrame = rats.join(users, "user_id") rats_users.groupBy("moviesid", "gender").avg("score") .sort($"moviesid") .toDF("moviesid", "gender", "avg_score") .groupBy("moviesid").pivot("gender") .max("avg_score") .sort("moviesid") // .show() .selectExpr("moviesid", "F", "M", "abs(F - M) df") .sort($"df".desc, $"moviesid") .limit(10) .join(movies, "moviesid") .select("moviesid", "name", "F", "M", "df") .sort($"df".desc, $"moviesid") .show(false) } /** * 7.所有观众评分分歧最大的十部电影(电影评分标准差大者,分歧则大) * movies: "moviesid", "name", "type" * rats: "user_id", "moviesid", "score", "timestamp" * users: "user_id", "gender", "age", "dept", "postcode" */ def seven(): Unit = { /* 思路: 基于rats表,对moviesid分组,求每部电影评分的标准差 然后将得到的表与movies表进行等值连接 */ val baseData=rats.groupBy("moviesid").agg("score"->"avg") .join(rats,Seq("moviesid"),"right_outer") val mData=baseData.map(row=>{ val avg=row.getDouble(1); val score=row.getInt(3); //sqrt(1/N*((score1-avg)^2+(score1-avg)^2+(score1-avg)^2)) val result=Math.pow((score-avg),2); (row.getString(0),result); }).toDF("moviesid","score"); val mData2=mData.groupBy("moviesid").agg("score"->"sum","score"->"count").toDF("moviesid","sum","count"); val result=mData2.map(row=>{ val sum=row.getDouble(1); val count=row.getLong(2); val result=Math.sqrt(sum/count); (row.getString(0),result) }).toDF("moviesid","result").join(movies,Seq("moviesid"),"left_outer") .sort($"result".desc,$"moviesid".asc).limit(10); result.show(false) } def main(args: Array[String]): Unit = { seven } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。