赞
踩
def contest(utils: Utils): Unit = {
val data = utils.sc.textFile("data/contest_log.txt")
val users = data.map(line => line.split(',')(3)).distinct()
println(users.count())
val pages = data.map(line => line.split(',')(1)).distinct()
println(pages.count())
val session_with_time = data.map(line => (line.split(',')(5).substring(0, 7), 1))
println(session_with_time.reduceByKey(_ + _).collect() mkString("\n", "\n", "\n"))
}
def solveQuest2(utils: Utils): Unit = { //(userID, sex) val userID_sex: RDD[(String, String)] = utils.usersRdd.map(x => (x._1, x._2)) //(userID, (movieID, rating)) val userID_movieID_rating: RDD[(String, (String, String))] = utils.ratingsRdd.map(x => (x._1, (x._2, x._3))) val userID_movieID_movie = utils.movieRdd.map(x => (x._1, x._2)) //(userID, (sex, (movieID, rating))) ---> (sex, movieID, rating) val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2)) val movieID_rating_F = movieID_rating.filter(x => x._1 == "F"). map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10) val movieID_rating_M = movieID_rating.filter(x => x._1 == "M"). map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10) movieID_rating_F.union(movieID_rating_M).foreach { case (x, y, z) => println(x + ":" + y + ":" + z) } //((sex, movieID), Iterable[(sex, movieID, rating)]) ---> (movieID, (sex, avg)) // val movieID_sex_avg:RDD[(String, (String, Double))]=movieID_rating.groupBy(x=> (x._1, x._2)) // .map(x=> { // var sum,avg=0d // val list:List[(String, String, String)]=x._2.toList // if(list.size >50){list.map(x=> ( sum +=x._3.toInt )) // avg=sum*1.0/list.size} // (x._1._2, (x._1._1, avg)) // }) // //(movieID, movieName) // val movieID_movieName:RDD[(String, String)]=utils.movieRdd.map(x=> (x._1, x._2)) // sex_movieID_avg与movie进行关联 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg) // val sex_movieName_avg:RDD[(String, String, Double)]=movieID_sex_avg.join(movieID_movieName) // .map(x=> (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x=> (x._1, x._3),false) // sex_movieName_avg.take(10).foreach(println(_)) // sex_movieName_avg.filter(_._1=="F").take(10).foreach(println(_)) }
def solveQuest3(utils: Utils): Unit = {
val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
val userID_movieID_times_sexs = userID_movieID_times.join(userID_sex)
val userID_movieID_times_sexs_movie = userID_movieID_times_sexs.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
val sex_movieID_times_movie = userID_movieID_times_sexs_movie.map(x => (x._2._1._3, (x._1, x._2._2, x._2._1._2)))
val movie_times_M = sex_movieID_times_movie.filter(x => x._1 == "M").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
println("male rating times top10")
movie_times_M.top(10)(Ordering.by(_._2)).foreach(println(_))
val movie_times_F = sex_movieID_times_movie.filter(x => x._1 == "F").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _)
println("male rating times top10")
movie_times_F.top(10)(Ordering.by(_._2)).foreach(println(_))
}
def solveQuest4(utils: Utils): Unit = {
val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
val userID_age = utils.usersRdd.filter(x => x._2 == "M" && x._3 == "18").map(x => (x._1, x._3))
val userID_movieID_times_age = userID_movieID_times.join(userID_age)
val userID_movieID_times_age_movie = userID_movieID_times_age.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie)
val movie_times = userID_movieID_times_age_movie.map(x => ((x._1, x._2._2), x._2._1._2)).reduceByKey(_ + _)
println("age in 18-24 male rating times top10")
movie_times.top(10)(Ordering.by(_._2)).foreach(println(_))
}
def solveQuest5(utils: Utils): Unit = {
val userID_rating = utils.ratingsRdd.filter(_._2 == "2116").map(x => (x._1, x._3.toDouble))
val userID_age = utils.usersRdd.map(x => (x._1, x._3))
val age_rating_times = userID_age.join(userID_rating).map(x => (x._2._1, (x._2._2, 1)))
val age_avg = age_rating_times.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
println("movie 2116 in every age avg")
age_avg.foreach(println(_))
}
def solveQuest6(utils: Utils): Unit = {
val userID_sex = utils.usersRdd.map(x => (x._1, x._2))
val movieID_movie = utils.movieRdd.map(x => (x._1, x._2))
val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1)))
val userID_movieID_times_F = userID_movieID_times.join(userID_sex).filter(_._2._2 == "F").map(x => (x._2._1._1, (x._1, x._2._1._2)))
val uid_time = userID_movieID_times_F.join(movieID_movie).map(x => (x._2._1._1, x._2._1._2)).reduceByKey(_ + _)
val uid = uid_time.top(1)(Ordering.by(_._2))(0)._1
val mid_rating = utils.ratingsRdd.filter(_._1 == uid).map(x => (x._2, x._3))
val mid_movive_rating = movieID_movie.join(mid_rating).map(x => (x._1, x._2._1, x._2._2.toDouble))
val top10 = mid_movive_rating.top(10)(Ordering.by(_._3))
println("movie fav F highest raing top 10")
top10.foreach(println(_))
}
def solveQuest7(utils: Utils): Unit = {
val mid_movie = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7)))
val mid_year = utils.movieRdd.map(x => (x._1, x._2.substring(x._2.length - 5, x._2.length - 1)))
val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
val mid_avg_ge4 = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)).filter(_._2 >= 4.0)
val year_times = mid_year.join(mid_avg_ge4).map(x => (x._2._1, 1)).reduceByKey(_ + _)
val year = year_times.top(1)(Ordering.by(_._2))(0)._1
val year_mid_avg = mid_year.join(mid_avg_ge4).filter(_._2._1 == year).map(x => (x._1, x._2._2))
val top10 = year_mid_avg.join(mid_movie).map(x => (x._2._2, x._2._1)).top(10)(Ordering.by(_._2))
top10.foreach(println(_))
}
def solveQuest8(utils: Utils): Unit = {
val mid_movie_year_type = utils.movieRdd.map(x => (x._1, (x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3)))
val usem = mid_movie_year_type.filter(x => x._2._2 == "1997" && x._2._3.contains("Comedy"))
val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble))
val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2))
val movie1997_avg = usem.join(mid_avg).map(x => (x._1, x._2._1._1, x._2._2))
val top10 = movie1997_avg.top(10)(Ordering.by(_._3))
top10.foreach(println(_))
}
def solveQuest9(utils: Utils): Unit = { val types = utils.movieRdd.map(_._3.split('|')).flatMap(x => x).distinct().map(x => (x, 1)) val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)) val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)) val mrdd_avg = utils.movieRdd.map(x => (x._1, (x._2, x._3.split('|')))).join(mid_avg).map(x => (x._2._1._2, (x._2._1._1, x._2._2))) val type_avg = mrdd_avg.map(x => { for (i <- 0 until (x._1.length - 1)) yield (x._1(i), x._2) }).flatMap(x => x) val types_avg = types.join(type_avg).map(x => (x._1, ArrayBuffer(x._2._2))).reduceByKey((a, b) => a ++= b) val tmp = types_avg.collect() tmp.foreach(x => { println("top5 in : " + x._1) utils.sc.makeRDD(x._2).top(5)(Ordering.by(_._2)).foreach(println(_)) }) }
def solveQuest0(utils: Utils): Unit = {
val movieID_name_year = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3))
val years = movieID_name_year.map(_._3).distinct().sortBy(_.toInt).collect()
for (year <- years) {
val movieID_type = movieID_name_year.filter(_._3.equals(year)).map(x => (x._1, x._4))
val aveRatings = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).join(movieID_type).map(x => (x._2._2, (x._2._1, 1)))
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, x._2._1 / x._2._2))
val topType = aveRatings.top(1)(Ordering.by(_._2))(0)
println("In " + year + ", the highest rating movie type is " + topType._1 + " with average rating as " + topType._2)
}
}
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer class Utils { val conf = new SparkConf().setAppName("FileReview").setMaster("local") //初始化sc对象 val sc = new SparkContext(conf) val movie = sc.textFile("ml-1m/movies.dat") val ratings = sc.textFile("ml-1m/ratings.dat") val users = sc.textFile("ml-1m/users.dat") val movieRdd: RDD[(String, String, String)] = movie.map(_.split("::")).map(m => (m(0), m(1), m(2))) val ratingsRdd: RDD[(String, String, String, String)] = ratings.map(_.split("::")).map(r => (r(0), r(1), r(2), r(3))) val usersRdd: RDD[(String, String, String, String, String)] = users.map(_.split("::")).map(u => (u(0), u(1), u(2), u(3), u(4))) } object four { def contest(utils: Utils): Unit = { val data = utils.sc.textFile("data/contest_log.txt") val users = data.map(line => line.split(',')(3)).distinct() println(users.count()) val pages = data.map(line => line.split(',')(1)).distinct() println(pages.count()) val session_with_time = data.map(line => (line.split(',')(5).substring(0, 7), 1)) println(session_with_time.reduceByKey(_ + _).collect() mkString("\n", "\n", "\n")) } /* * 1. 求被评分次数最多的 10 部电影,并给出评分次数(电影名,评分次数) * */ def solveQuest1(utils: Utils): Unit = { val movieID_rating: RDD[(String, Int)] = utils.ratingsRdd.map(x => (x._2, 1)) val movieID_times: RDD[(String, Int)] = movieID_rating.reduceByKey(_ + _).sortBy(_._2, false) //获得电影id和电影名 val movieID_name: RDD[(String, String)] = utils.movieRdd.map(x => (x._1, x._2)) //关联movieID_times和movieID_name,获得电影id,电影名,评分次数 val result: RDD[(String, Int)] = movieID_times.join(movieID_name).sortBy(_._2._1, false).map(x => (x._2._2, x._2._1)) result.take(10).foreach(println(_)) } /* * 2. 分别求男性,女性当中评分最高的 10 部电影(性别,电影名,影评分) * */ def solveQuest2(utils: Utils): Unit = { //(userID, sex) val userID_sex: RDD[(String, String)] = utils.usersRdd.map(x => (x._1, x._2)) //(userID, (movieID, rating)) val userID_movieID_rating: RDD[(String, (String, String))] = utils.ratingsRdd.map(x => (x._1, (x._2, x._3))) val userID_movieID_movie = utils.movieRdd.map(x => (x._1, x._2)) //(userID, (sex, (movieID, rating))) ---> (sex, movieID, rating) val movieID_rating: RDD[(String, String, String)] = userID_sex.join(userID_movieID_rating).map(x => (x._2._1, x._2._2._1, x._2._2._2)) val movieID_rating_F = movieID_rating.filter(x => x._1 == "F"). map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10) val movieID_rating_M = movieID_rating.filter(x => x._1 == "M"). map(x => (x._2, x)).join(userID_movieID_movie).map(x => (x._2._1._1, x._2._2, x._2._1._3)).sortBy(_._3, false).take(10) movieID_rating_F.union(movieID_rating_M).foreach { case (x, y, z) => println(x + ":" + y + ":" + z) } //((sex, movieID), Iterable[(sex, movieID, rating)]) ---> (movieID, (sex, avg)) // val movieID_sex_avg:RDD[(String, (String, Double))]=movieID_rating.groupBy(x=> (x._1, x._2)) // .map(x=> { // var sum,avg=0d // val list:List[(String, String, String)]=x._2.toList // if(list.size >50){list.map(x=> ( sum +=x._3.toInt )) // avg=sum*1.0/list.size} // (x._1._2, (x._1._1, avg)) // }) // //(movieID, movieName) // val movieID_movieName:RDD[(String, String)]=utils.movieRdd.map(x=> (x._1, x._2)) // sex_movieID_avg与movie进行关联 (movieID, ((sex, avg), movieName)) ---> (sex, movieName, avg) // val sex_movieName_avg:RDD[(String, String, Double)]=movieID_sex_avg.join(movieID_movieName) // .map(x=> (x._2._1._1, x._2._2, x._2._1._2)).sortBy(x=> (x._1, x._3),false) // sex_movieName_avg.take(10).foreach(println(_)) // sex_movieName_avg.filter(_._1=="F").take(10).foreach(println(_)) } /* * 3. 分别求男性,女性看过评分次数最多的 10 部电影(性别,电影名) * */ def solveQuest3(utils: Utils): Unit = { val userID_sex = utils.usersRdd.map(x => (x._1, x._2)) val movieID_movie = utils.movieRdd.map(x => (x._1, x._2)) val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1))) val userID_movieID_times_sexs = userID_movieID_times.join(userID_sex) val userID_movieID_times_sexs_movie = userID_movieID_times_sexs.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie) val sex_movieID_times_movie = userID_movieID_times_sexs_movie.map(x => (x._2._1._3, (x._1, x._2._2, x._2._1._2))) val movie_times_M = sex_movieID_times_movie.filter(x => x._1 == "M").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _) println("male rating times top10") movie_times_M.top(10)(Ordering.by(_._2)).foreach(println(_)) val movie_times_F = sex_movieID_times_movie.filter(x => x._1 == "F").map(x => ((x._2._1, x._2._2), x._2._3)).reduceByKey(_ + _) println("male rating times top10") movie_times_F.top(10)(Ordering.by(_._2)).foreach(println(_)) } /* * 4. 年龄段在“18-24”的男人,最喜欢看(评分次数最多的)10部电影 * */ def solveQuest4(utils: Utils): Unit = { val movieID_movie = utils.movieRdd.map(x => (x._1, x._2)) val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1))) val userID_age = utils.usersRdd.filter(x => x._2 == "M" && x._3 == "18").map(x => (x._1, x._3)) val userID_movieID_times_age = userID_movieID_times.join(userID_age) val userID_movieID_times_age_movie = userID_movieID_times_age.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._2))).join(movieID_movie) val movie_times = userID_movieID_times_age_movie.map(x => ((x._1, x._2._2), x._2._1._2)).reduceByKey(_ + _) println("age in 18-24 male rating times top10") movie_times.top(10)(Ordering.by(_._2)).foreach(println(_)) } /* * 5. 求 movieid = 2116 这部电影各年龄段(因为年龄就只有 7 个,就按这个 7 个分就好了)的平均影评(年龄段,影评分) * */ def solveQuest5(utils: Utils): Unit = { val userID_rating = utils.ratingsRdd.filter(_._2 == "2116").map(x => (x._1, x._3.toDouble)) val userID_age = utils.usersRdd.map(x => (x._1, x._3)) val age_rating_times = userID_age.join(userID_rating).map(x => (x._2._1, (x._2._2, 1))) val age_avg = age_rating_times.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)) println("movie 2116 in every age avg") age_avg.foreach(println(_)) } /* * 6. 求最喜欢看电影(影评次数最多)的那位女性评最高分的 10 部电影的平均影评分(观影者,电影名,影评分) * */ def solveQuest6(utils: Utils): Unit = { val userID_sex = utils.usersRdd.map(x => (x._1, x._2)) val movieID_movie = utils.movieRdd.map(x => (x._1, x._2)) val userID_movieID_times = utils.ratingsRdd.map(x => (x._1, (x._2, 1))) val userID_movieID_times_F = userID_movieID_times.join(userID_sex).filter(_._2._2 == "F").map(x => (x._2._1._1, (x._1, x._2._1._2))) val uid_time = userID_movieID_times_F.join(movieID_movie).map(x => (x._2._1._1, x._2._1._2)).reduceByKey(_ + _) val uid = uid_time.top(1)(Ordering.by(_._2))(0)._1 val mid_rating = utils.ratingsRdd.filter(_._1 == uid).map(x => (x._2, x._3)) val mid_movive_rating = movieID_movie.join(mid_rating).map(x => (x._1, x._2._1, x._2._2.toDouble)) val top10 = mid_movive_rating.top(10)(Ordering.by(_._3)) println("movie fav F highest raing top 10") top10.foreach(println(_)) } /* * 7. 求好片(平均评分>=4.0)最多的那个年份的最好看(平均评分最高)的 10 部电影 * */ def solveQuest7(utils: Utils): Unit = { val mid_movie = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7))) val mid_year = utils.movieRdd.map(x => (x._1, x._2.substring(x._2.length - 5, x._2.length - 1))) val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)) val mid_avg_ge4 = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)).filter(_._2 >= 4.0) val year_times = mid_year.join(mid_avg_ge4).map(x => (x._2._1, 1)).reduceByKey(_ + _) val year = year_times.top(1)(Ordering.by(_._2))(0)._1 val year_mid_avg = mid_year.join(mid_avg_ge4).filter(_._2._1 == year).map(x => (x._1, x._2._2)) val top10 = year_mid_avg.join(mid_movie).map(x => (x._2._2, x._2._1)).top(10)(Ordering.by(_._2)) top10.foreach(println(_)) } /* * 8.求 1997 年上映的电影中,评分最高的 10 部 Comedy 类电影 * */ def solveQuest8(utils: Utils): Unit = { val mid_movie_year_type = utils.movieRdd.map(x => (x._1, (x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3))) val usem = mid_movie_year_type.filter(x => x._2._2 == "1997" && x._2._3.contains("Comedy")) val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)) val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)) val movie1997_avg = usem.join(mid_avg).map(x => (x._1, x._2._1._1, x._2._2)) val top10 = movie1997_avg.top(10)(Ordering.by(_._3)) top10.foreach(println(_)) } /* * 9. 该影评库中各种类型电影中评价最高的 5 部电影(类型,电影名,平均影评分) * */ def solveQuest9(utils: Utils): Unit = { val types = utils.movieRdd.map(_._3.split('|')).flatMap(x => x).distinct().map(x => (x, 1)) val mid_rat = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)) val mid_avg = mid_rat.map(x => (x._1, (x._2, 1))).reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2)).map(x => (x._1, x._2._1 / x._2._2)) val mrdd_avg = utils.movieRdd.map(x => (x._1, (x._2, x._3.split('|')))).join(mid_avg).map(x => (x._2._1._2, (x._2._1._1, x._2._2))) val type_avg = mrdd_avg.map(x => { for (i <- 0 until (x._1.length - 1)) yield (x._1(i), x._2) }).flatMap(x => x) val types_avg = types.join(type_avg).map(x => (x._1, ArrayBuffer(x._2._2))).reduceByKey((a, b) => a ++= b) val tmp = types_avg.collect() tmp.foreach(x => { println("top5 in : " + x._1) utils.sc.makeRDD(x._2).top(5)(Ordering.by(_._2)).foreach(println(_)) }) } /* * 10. 各年评分最高的电影类型(年份,类型,影评分) * */ def solveQuest0(utils: Utils): Unit = { val movieID_name_year = utils.movieRdd.map(x => (x._1, x._2.substring(0, x._2.length - 7), x._2.substring(x._2.length - 5, x._2.length - 1), x._3)) val years = movieID_name_year.map(_._3).distinct().sortBy(_.toInt).collect() for (year <- years) { val movieID_type = movieID_name_year.filter(_._3.equals(year)).map(x => (x._1, x._4)) val aveRatings = utils.ratingsRdd.map(x => (x._2, x._3.toDouble)).join(movieID_type).map(x => (x._2._2, (x._2._1, 1))) .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, x._2._1 / x._2._2)) val topType = aveRatings.top(1)(Ordering.by(_._2))(0) println("In " + year + ", the highest rating movie type is " + topType._1 + " with average rating as " + topType._2) } } def main(args: Array[String]): Unit = { val utils = new Utils() } }
spark rdd复杂操作
flatmap & reduceByKey 算子使用问题,查看官方文档解决
暂无
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。