当前位置:   article > 正文

Spark程序设计进阶_每年上映电影数量最多的前三名电影类型降序排列,spark

每年上映电影数量最多的前三名电影类型降序排列,spark

竞赛网站访问日志分析

def contest(utils: Utils): Unit = {
    val data = utils.sc.textFile("data/contest_log.txt")
    val users = data.map(line => line.split(',')(3)).distinct()
    println(users.count())
    val pages = data.map(line => line.split(',')(1)).distinct()
    println(pages.count())
    val session_with_time = data.map(line => (line.split(',')(5).substring(0, 7), 1))
    println(session_with_time.reduceByKey(_ + _).collect() mkString("\n", "\n", "\n"))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

影评

1. 求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)


  • 1

2. 分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)

def solveQuest2(utils: Utils): Unit = {
    //(userID, sex)
    val userID_sex: RDD[(String, String)] = utils.usersRdd.map(x => (x._1, x._2))
    //(userID, (movieID, rating))
    val userID_movieID_rating: RDD[(String, (String, String))] = utils.ratingsRdd.map(x => (x._1, (x._2, x._3)))
    val userID_movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    //(userID, (sex, (movieID, rating)))  ---> (sex, movieID, rating)
    val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2))


    val movieID_rating_F = movieID_rating.filter(x => x._1 == "F").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)
    val movieID_rating_M = movieID_rating.filter(x => x._1 == "M").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)

    movieID_rating_F.union(movieID_rating_M).foreach { case (x, y, z) => println(x + ":" + y + ":" + z) }

    //((sex, movieID), Iterable[(sex, movieID, rating)])  ---> (movieID, (sex, avg))
    //   val movieID_sex_avg:RDD[(String, (String, Double))]=movieID_rating.groupBy(x=> (x._1, x._2))
    //      .map(x=> {
    //      var sum,avg=0d
    //      val list:List[(String, String, String)]=x._2.toList
    //      if(list.size >50){list.map(x=> ( sum +=x._3.toInt ))
    //        avg=sum*1.0/list.size}
    //      (x._1._2, (x._1._1, avg))
    //    })
    //    //(movieID, movieName)
    //    val movieID_movieName:RDD[(String, String)]=utils.movieRdd.map(x=> (x._1, x._2))
    //    sex_movieID_avg与movie进行关联 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg)
    //    val sex_movieName_avg:RDD[(String, String, Double)]=movieID_sex_avg.join(movieID_movieName)
    //      .map(x=> (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x=> (x._1, x._3),false)
    //    sex_movieName_avg.take(10).foreach(println(_))
    //    sex_movieName_avg.filter(_._1=="F").take(10).foreach(println(_))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

3. 分别求男性,女性看过评分次数最多的 10 部电影(性别,电影名)

  def solveQuest3(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_sexs = userID_movieID_times.join(userID_sex)
    val userID_movieID_times_sexs_movie = userID_movieID_times_sexs.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val sex_movieID_times_movie = userID_movieID_times_sexs_movie.map(x => (x._2._1._3, (x._1, x._2._2, x._2._1._2)))
    val movie_times_M = sex_movieID_times_movie.filter(x => x._1 == "M").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_M.top(10)(Ordering.by(_._2)).foreach(println(_))
    val movie_times_F = sex_movieID_times_movie.filter(x => x._1 == "F").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_F.top(10)(Ordering.by(_._2)).foreach(println(_))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

4. 年龄段在“18-24”的男人,最喜欢看(评分次数最多的)10部电影

  def solveQuest4(utils: Utils): Unit = {
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_age = utils.usersRdd.filter(x => x._2 == "M" && x._3 == "18").map(x => (x._1, x._3))
    val userID_movieID_times_age = userID_movieID_times.join(userID_age)
    val userID_movieID_times_age_movie = userID_movieID_times_age.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val movie_times = userID_movieID_times_age_movie.map(x => ((x._1, x._2._2), x._2._1._2)).reduceByKey(_ + _)
    println("age in 18-24 male rating times top10")
    movie_times.top(10)(Ordering.by(_._2)).foreach(println(_))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5. 求 movieid = 2116 这部电影各年龄段(因为年龄就只有 7 个,就按这个 7 个分就好了)的平均影评(年龄段,影评分)

  def solveQuest5(utils: Utils): Unit = {
    val userID_rating = utils.ratingsRdd.filter(_._2 == "2116").map(x => (x._1, x._3.toDouble))
    val userID_age = utils.usersRdd.map(x => (x._1, x._3))
    val age_rating_times = userID_age.join(userID_rating).map(x => (x._2._1, (x._2._2, 1)))
    val age_avg = age_rating_times.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    println("movie 2116 in every age avg")
    age_avg.foreach(println(_))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

6. 求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)

  def solveQuest6(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_F = userID_movieID_times.join(userID_sex).filter(_._2._2 == "F").map(x => (x._2._1._1, (x._1, x._2._1._2)))
    val uid_time = userID_movieID_times_F.join(movieID_movie).map(x => (x._2._1._1, x._2._1._2)).reduceByKey(_ + _)
    val uid = uid_time.top(1)(Ordering.by(_._2))(0)._1

    val mid_rating = utils.ratingsRdd.filter(_._1 == uid).map(x => (x._2, x._3))
    val mid_movive_rating = movieID_movie.join(mid_rating).map(x => (x._1, x._2._1, x._2._2.toDouble))
    val top10 = mid_movive_rating.top(10)(Ordering.by(_._3))
    println("movie fav F highest raing top 10")
    top10.foreach(println(_))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

7. 求好片(平均评分>=4.0)最多的那个年份的最好看(平均评分最高)的 10 部电影

  def solveQuest7(utils: Utils): Unit = {
    val mid_movie = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7)))
    val mid_year = utils.movieRdd.map(x => (x._1, x._2.substring(x._2.length - 5, x._2.length - 1)))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg_ge4 = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)).filter(_._2 >= 4.0)
    val year_times = mid_year.join(mid_avg_ge4).map(x => (x._2._1, 1)).reduceByKey(_ + _)
    val year = year_times.top(1)(Ordering.by(_._2))(0)._1
    val year_mid_avg = mid_year.join(mid_avg_ge4).filter(_._2._1 == year).map(x => (x._1, x._2._2))
    val top10 = year_mid_avg.join(mid_movie).map(x => (x._2._2, x._2._1)).top(10)(Ordering.by(_._2))
    top10.foreach(println(_))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

8.求 1997 年上映的电影中,评分最高的 10 部 Comedy 类电影

  def solveQuest8(utils: Utils): Unit = {
    val mid_movie_year_type = utils.movieRdd.map(x => (x._1, (x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3)))
    val usem = mid_movie_year_type.filter(x => x._2._2 == "1997" && x._2._3.contains("Comedy"))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val movie1997_avg = usem.join(mid_avg).map(x => (x._1, x._2._1._1, x._2._2))
    val top10 = movie1997_avg.top(10)(Ordering.by(_._3))
    top10.foreach(println(_))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

9. 该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)

  def solveQuest9(utils: Utils): Unit = {
    val types = utils.movieRdd.map(_._3.split('|')).flatMap(x => x).distinct().map(x => (x, 1))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val mrdd_avg = utils.movieRdd.map(x => (x._1, (x._2, x._3.split('|')))).join(mid_avg).map(x => (x._2._1._2, (x._2._1._1, x._2._2)))

    val type_avg = mrdd_avg.map(x => {
      for (i <- 0 until (x._1.length - 1)) yield (x._1(i), x._2)
    }).flatMap(x => x)
    val types_avg = types.join(type_avg).map(x => (x._1, ArrayBuffer(x._2._2))).reduceByKey((a, b) => a ++= b)
    val tmp = types_avg.collect()
    tmp.foreach(x => {
      println("top5 in : " + x._1)
      utils.sc.makeRDD(x._2).top(5)(Ordering.by(_._2)).foreach(println(_))
    })
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

10. 各年评分最高的电影类型(年份,类型,影评分)

  def solveQuest0(utils: Utils): Unit = {
    val movieID_name_year = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3))
    val years = movieID_name_year.map(_._3).distinct().sortBy(_.toInt).collect()
    for (year <- years) {
      val movieID_type = movieID_name_year.filter(_._3.equals(year)).map(x => (x._1, x._4))
      val aveRatings = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).join(movieID_type).map(x => (x._2._2, (x._2._1, 1)))
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, x._2._1 / x._2._2))
      val topType = aveRatings.top(1)(Ordering.by(_._2))(0)
      println("In " + year + ", the highest rating movie type is " + topType._1 + " with average rating as " + topType._2)
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

完整代码

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

class Utils {
  val conf = new SparkConf().setAppName("FileReview").setMaster("local")
  //初始化sc对象
  val sc = new SparkContext(conf)
  val movie = sc.textFile("ml-1m/movies.dat")
  val ratings = sc.textFile("ml-1m/ratings.dat")
  val users = sc.textFile("ml-1m/users.dat")
  val movieRdd: RDD[(String, String, String)] = movie.map(_.split("::")).map(m => (m(0), m(1), m(2)))
  val ratingsRdd: RDD[(String, String, String, String)] = ratings.map(_.split("::")).map(r => (r(0), r(1), r(2), r(3)))
  val usersRdd: RDD[(String, String, String, String, String)] = users.map(_.split("::")).map(u => (u(0), u(1), u(2), u(3), u(4)))
}

object four {

  def contest(utils: Utils): Unit = {
    val data = utils.sc.textFile("data/contest_log.txt")
    val users = data.map(line => line.split(',')(3)).distinct()
    println(users.count())
    val pages = data.map(line => line.split(',')(1)).distinct()
    println(pages.count())
    val session_with_time = data.map(line => (line.split(',')(5).substring(0, 7), 1))
    println(session_with_time.reduceByKey(_ + _).collect() mkString("\n", "\n", "\n"))
  }

  /*
  * 1. 求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数)
  * */
  def solveQuest1(utils: Utils): Unit = {
    val movieID_rating: RDD[(String, Int)] = utils.ratingsRdd.map(x => (x._2, 1))
    val movieID_times: RDD[(String, Int)] = movieID_rating.reduceByKey(_ + _).sortBy(_._2, false)
    //获得电影id和电影名
    val movieID_name: RDD[(String, String)] = utils.movieRdd.map(x => (x._1, x._2))
    //关联movieID_times和movieID_name,获得电影id,电影名,评分次数
    val result: RDD[(String, Int)] = movieID_times.join(movieID_name).sortBy(_._2._1, false).map(x => (x._2._2, x._2._1))
    result.take(10).foreach(println(_))
  }

  /*
  * 2. 分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分)
  * */
  def solveQuest2(utils: Utils): Unit = {
    //(userID, sex)
    val userID_sex: RDD[(String, String)] = utils.usersRdd.map(x => (x._1, x._2))
    //(userID, (movieID, rating))
    val userID_movieID_rating: RDD[(String, (String, String))] = utils.ratingsRdd.map(x => (x._1, (x._2, x._3)))
    val userID_movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    //(userID, (sex, (movieID, rating)))  ---> (sex, movieID, rating)
    val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2))


    val movieID_rating_F = movieID_rating.filter(x => x._1 == "F").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)
    val movieID_rating_M = movieID_rating.filter(x => x._1 == "M").
      map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10)

    movieID_rating_F.union(movieID_rating_M).foreach { case (x, y, z) => println(x + ":" + y + ":" + z) }

    //((sex, movieID), Iterable[(sex, movieID, rating)])  ---> (movieID, (sex, avg))
    //   val movieID_sex_avg:RDD[(String, (String, Double))]=movieID_rating.groupBy(x=> (x._1, x._2))
    //      .map(x=> {
    //      var sum,avg=0d
    //      val list:List[(String, String, String)]=x._2.toList
    //      if(list.size >50){list.map(x=> ( sum +=x._3.toInt ))
    //        avg=sum*1.0/list.size}
    //      (x._1._2, (x._1._1, avg))
    //    })
    //    //(movieID, movieName)
    //    val movieID_movieName:RDD[(String, String)]=utils.movieRdd.map(x=> (x._1, x._2))
    //    sex_movieID_avg与movie进行关联 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg)
    //    val sex_movieName_avg:RDD[(String, String, Double)]=movieID_sex_avg.join(movieID_movieName)
    //      .map(x=> (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x=> (x._1, x._3),false)
    //    sex_movieName_avg.take(10).foreach(println(_))
    //    sex_movieName_avg.filter(_._1=="F").take(10).foreach(println(_))
  }

  /*
    * 3. 分别求男性,女性看过评分次数最多的 10 部电影(性别,电影名)
    * */
  def solveQuest3(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_sexs = userID_movieID_times.join(userID_sex)
    val userID_movieID_times_sexs_movie = userID_movieID_times_sexs.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val sex_movieID_times_movie = userID_movieID_times_sexs_movie.map(x => (x._2._1._3, (x._1, x._2._2, x._2._1._2)))
    val movie_times_M = sex_movieID_times_movie.filter(x => x._1 == "M").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_M.top(10)(Ordering.by(_._2)).foreach(println(_))
    val movie_times_F = sex_movieID_times_movie.filter(x => x._1 == "F").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
    println("male rating times top10")
    movie_times_F.top(10)(Ordering.by(_._2)).foreach(println(_))
  }

  /*
    * 4. 年龄段在“18-24”的男人,最喜欢看(评分次数最多的)10部电影
    * */
  def solveQuest4(utils: Utils): Unit = {
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_age = utils.usersRdd.filter(x => x._2 == "M" && x._3 == "18").map(x => (x._1, x._3))
    val userID_movieID_times_age = userID_movieID_times.join(userID_age)
    val userID_movieID_times_age_movie = userID_movieID_times_age.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
    val movie_times = userID_movieID_times_age_movie.map(x => ((x._1, x._2._2), x._2._1._2)).reduceByKey(_ + _)
    println("age in 18-24 male rating times top10")
    movie_times.top(10)(Ordering.by(_._2)).foreach(println(_))
  }

  /*
    * 5. 求 movieid = 2116 这部电影各年龄段(因为年龄就只有 7 个,就按这个 7 个分就好了)的平均影评(年龄段,影评分)
    * */
  def solveQuest5(utils: Utils): Unit = {
    val userID_rating = utils.ratingsRdd.filter(_._2 == "2116").map(x => (x._1, x._3.toDouble))
    val userID_age = utils.usersRdd.map(x => (x._1, x._3))
    val age_rating_times = userID_age.join(userID_rating).map(x => (x._2._1, (x._2._2, 1)))
    val age_avg = age_rating_times.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    println("movie 2116 in every age avg")
    age_avg.foreach(println(_))
  }

  /*
      * 6. 求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分)
      * */
  def solveQuest6(utils: Utils): Unit = {
    val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
    val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
    val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
    val userID_movieID_times_F = userID_movieID_times.join(userID_sex).filter(_._2._2 == "F").map(x => (x._2._1._1, (x._1, x._2._1._2)))
    val uid_time = userID_movieID_times_F.join(movieID_movie).map(x => (x._2._1._1, x._2._1._2)).reduceByKey(_ + _)
    val uid = uid_time.top(1)(Ordering.by(_._2))(0)._1

    val mid_rating = utils.ratingsRdd.filter(_._1 == uid).map(x => (x._2, x._3))
    val mid_movive_rating = movieID_movie.join(mid_rating).map(x => (x._1, x._2._1, x._2._2.toDouble))
    val top10 = mid_movive_rating.top(10)(Ordering.by(_._3))
    println("movie fav F highest raing top 10")
    top10.foreach(println(_))
  }

  /*
    * 7. 求好片(平均评分>=4.0)最多的那个年份的最好看(平均评分最高)的 10 部电影
    * */
  def solveQuest7(utils: Utils): Unit = {
    val mid_movie = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7)))
    val mid_year = utils.movieRdd.map(x => (x._1, x._2.substring(x._2.length - 5, x._2.length - 1)))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg_ge4 = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)).filter(_._2 >= 4.0)
    val year_times = mid_year.join(mid_avg_ge4).map(x => (x._2._1, 1)).reduceByKey(_ + _)
    val year = year_times.top(1)(Ordering.by(_._2))(0)._1
    val year_mid_avg = mid_year.join(mid_avg_ge4).filter(_._2._1 == year).map(x => (x._1, x._2._2))
    val top10 = year_mid_avg.join(mid_movie).map(x => (x._2._2, x._2._1)).top(10)(Ordering.by(_._2))
    top10.foreach(println(_))
  }

  /*
    * 8.求 1997 年上映的电影中,评分最高的 10 部 Comedy 类电影
    * */
  def solveQuest8(utils: Utils): Unit = {
    val mid_movie_year_type = utils.movieRdd.map(x => (x._1, (x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3)))
    val usem = mid_movie_year_type.filter(x => x._2._2 == "1997" && x._2._3.contains("Comedy"))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val movie1997_avg = usem.join(mid_avg).map(x => (x._1, x._2._1._1, x._2._2))
    val top10 = movie1997_avg.top(10)(Ordering.by(_._3))
    top10.foreach(println(_))
  }

  /*
    * 9. 该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分)
    * */
  def solveQuest9(utils: Utils): Unit = {
    val types = utils.movieRdd.map(_._3.split('|')).flatMap(x => x).distinct().map(x => (x, 1))
    val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
    val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
    val mrdd_avg = utils.movieRdd.map(x => (x._1, (x._2, x._3.split('|')))).join(mid_avg).map(x => (x._2._1._2, (x._2._1._1, x._2._2)))

    val type_avg = mrdd_avg.map(x => {
      for (i <- 0 until (x._1.length - 1)) yield (x._1(i), x._2)
    }).flatMap(x => x)
    val types_avg = types.join(type_avg).map(x => (x._1, ArrayBuffer(x._2._2))).reduceByKey((a, b) => a ++= b)
    val tmp = types_avg.collect()
    tmp.foreach(x => {
      println("top5 in : " + x._1)
      utils.sc.makeRDD(x._2).top(5)(Ordering.by(_._2)).foreach(println(_))
    })
  }

  /*
      * 10. 各年评分最高的电影类型(年份,类型,影评分)
      * */
  def solveQuest0(utils: Utils): Unit = {
    val movieID_name_year = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3))
    val years = movieID_name_year.map(_._3).distinct().sortBy(_.toInt).collect()
    for (year <- years) {
      val movieID_type = movieID_name_year.filter(_._3.equals(year)).map(x => (x._1, x._4))
      val aveRatings = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).join(movieID_type).map(x => (x._2._2, (x._2._1, 1)))
        .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, x._2._1 / x._2._2))
      val topType = aveRatings.top(1)(Ordering.by(_._2))(0)
      println("In " + year + ", the highest rating movie type is " + topType._1 + " with average rating as " + topType._2)
    }
  }

  def main(args: Array[String]): Unit = {
    val utils = new Utils()

  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210

实验总结及问题

学会使用什么做什么事情

spark rdd复杂操作

遇到什么问题,如何解决

flatmap & reduceByKey 算子使用问题,查看官方文档解决

还有什么问题尚未解决,可能是什么原因导致的

暂无

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/993591
推荐阅读
相关标签
  

闽ICP备14008679号