赞
踩
Spark练习题的数据可以通过URL链接下载,均是免费!
然后再项目中新建一个data目录,将数据放在data目录下
pom.xml文件需要导入的依赖
<dependencies> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.12</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>2.4.5</version> </dependency> </dependencies> <build> <plugins> <!-- Scala Compiler --> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
1.统计年级排名前十学生各科的分数 [学号,姓名,班级,科目,分数]
2.统计总分大于年级平均分的学生 [学号,姓名,班级,总分]
3.统计每科都及格的学生 [学号,姓名,班级,科目,分数]
4.统计每个班级的前三名 [学号,姓名,班级,分数]
5.统计偏科最严重的前100名学生 [学号,姓名,班级,科目,分数]
import org.apache.spark.broadcast.Broadcast import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.junit.{Before, Test} //练习 class Demo24_Student { var stuRDD: RDD[Stu] = _ var scoRDD: RDD[Sco] = _ var subRDD: RDD[Sub] = _ var sc: SparkContext = _ def filterWithIdListAndPrint(ids: List[String]): Unit = { //将ids进行广播 val broIds: Broadcast[List[String]] = sc.broadcast(ids) //将stuRDD变成KV格式 便于关联 val studentKVRDD: RDD[(String, (String, String))] = stuRDD .filter(stu => broIds.value.contains(stu.id)) .map(stu => (stu.id, (stu.name, stu.clazz))) //将subRDD变成KV根式,便于关联 val subjectKVRDD: RDD[(String, String)] = subRDD.map(sub => (sub.subId, sub.subName)) //学生表关联分数表(先过滤再关联) scoRDD .filter(sco => broIds.value.contains(sco.id)) .map(sco => (sco.id, sco)) .join(studentKVRDD) .map { case (id: String, (sco: Sco, (name: String, clazz: String))) => (sco.subId, (id, name, clazz, sco.score)) } .join(subjectKVRDD) .map { case (subId: String, ((id: String, name: String, clazz: String, score: Int), subName: String)) => s"$id,$name,$clazz,$subName,$score" } .sortBy(s => s.split(",")(0)) // 按id排序结果方便查看数据 .foreach(println) } @Before def init(): Unit = { //读取三份数据并构建对应的样例类对象,然后转换为RDD sc = new SparkContext( new SparkConf() .setAppName("Demo_Student") .setMaster("local") ) stuRDD = sc.textFile("data/students.txt") .map(line => { val splits: Array[String] = line.split(",") Stu(splits(0), splits(1), splits(2).toInt, splits(3), splits(4)) }) scoRDD = sc.textFile("data/score.txt") .map(line => { val splits: Array[String] = line.split(",") Sco(splits(0), splits(1), splits(2).toInt) }) subRDD = sc.textFile("data/subject.txt") .map(line => { val splits: Array[String] = line.split(",") Sub(splits(0), splits(1), splits(2).toInt) }) } @Test //测试 def printRDD(): Unit = { stuRDD.take(10).foreach(println) scoRDD.take(10).foreach(println) subRDD.take(10).foreach(println) } @Test //1.统计年级排名前十学生各科的分数 [学号,姓名,班级,科目,分数] def question1(): Unit = { //通过scoRDD计算学生总分,按降序排名,取前10的学生id,并关联学生、科目表 val top10Ids: List[String] = scoRDD .map(sco => (sco.id, sco.score)) .reduceByKey(_ + _) //计算学生总分 .sortBy(kv => kv._2, ascending = false) //按照总分降序排列 .map(kv => kv._1) //不要总分,直接取id .take(10) //取前十名 .toList /* //将top10Ids进行广播 val broadTop10Ids: Broadcast[List[String]] = sc.broadcast(top10Ids) //将stuRDD变成KV格式,便于关联 val studentKVRDD: RDD[(String, (String, String))] = stuRDD .filter(stu => broadTop10Ids.value.contains(stu.id)) .map(stu => (stu.id, (stu.name, stu.clazz))) //将subRDD变成KV格式,便于关联 val subjectKVRDD: RDD[(String, String)] = subRDD .map(sub => (sub.subId, sub.subName)) //学生表关联分数表(先过滤再关联) scoRDD .filter(sco => broadTop10Ids.value.contains(sco.id)) .map(sco => (sco.id, sco)) .join(studentKVRDD) .map { case (id: String, (sco: Sco, (name: String, clazz: String))) => (sco.subId, (id, name, clazz, sco.score)) } .join(subjectKVRDD) .map { case (subId: String, ((id: String, name: String, clazz: String, score: Int), subName: String)) => s"$id,$name,$clazz,$subName,$score" } .sortBy(s => s.split(",")(0)) //结果按id排序,方便查看数据 .foreach(println) */ //优化 filterWithIdListAndPrint(top10Ids) } @Test //2.统计总分大于年级平均分的学生 [学号,姓名,班级,总分] def question2(): Unit = { //计算平均分,然后过滤出总分大于平均分的学生 val sumScoreRDD: RDD[(String, Int)] = scoRDD .map(sco => (sco.id, sco.score)) .reduceByKey(_ + _) //对多次使用的RDD进行缓存 sumScoreRDD.cache() val sumScoreAndCnt: (Int, Int) = sumScoreRDD .map(kv => (1, kv._2)) .aggregateByKey((0, 0))( (u: (Int, Int), sumScore: Int) => (u._1 + sumScore, u._2 + 1), (u1, u2) => (u1._1 + u2._1, u1._2 + u2._2) ) .collect()(0)._2 //平均成绩 val avgSumScore: Double = sumScoreAndCnt._1.toDouble / sumScoreAndCnt._2 println(avgSumScore) //过滤出总分大于平均分的学生 val passSumScoreRDD: RDD[(String, Int)] = sumScoreRDD .filter(kv => kv._2 > avgSumScore) passSumScoreRDD.cache() //取出总分大于平均分的学生的id val passIDs: List[String] = passSumScoreRDD .map(kv => kv._1) .collect() .toList val broadPassIDs: Broadcast[List[String]] = sc.broadcast(passIDs) //将stuRDD变成KV格式,便于关联 val studentKVRDD: RDD[(String, (String, String))] = stuRDD .filter(stu => broadPassIDs.value.contains(stu.id)) .map(stu => (stu.id, (stu.name, stu.clazz))) passSumScoreRDD .join(studentKVRDD) .map { case (id: String, (sumScore: Int, (name: String, clazz: String))) => s"$id,$name,$clazz,$sumScore" } .foreach(println) //释放缓存 sumScoreRDD.unpersist() passSumScoreRDD.unpersist() } @Test //3.统计每科都及格的学生 [学号,姓名,班级,科目,分数] def question3(): Unit = { //将每个学生不及格的科目成绩记录去除 val subjectKVRDD: RDD[(String, Int)] = subRDD .map(sub => (sub.subId, sub.subScore)) //找到所有科目都及格的学生的id val passAllSubIds: List[String] = scoRDD .map(sco => (sco.subId, sco)) .join(subjectKVRDD) .filter { case (subId: String, (sco: Sco, subScore: Int)) => sco.score >= subScore * 0.6 } .map { case (subId: String, (sco: Sco, subScore: Int)) => (sco.id, 1) } .reduceByKey(_ + _) //统计每个学生及格的科目数量 .filter(kv => kv._2 == 6) //取出6门科目都及格的学生成绩 .map(_._1) //取出id .collect() .toList filterWithIdListAndPrint(passAllSubIds) } @Test //4.统计每个班级的前三名 [学号,姓名,班级,分数] def question4(): Unit = { //将stuRDD变成KV格式,便于关联 val studentKVRDD: RDD[(String, (String, String))] = stuRDD .map(stu => (stu.id, (stu.name, stu.clazz))) scoRDD .map(sco => (sco.id, sco.score)) .reduceByKey(_ + _) //计算学生总分 .join(studentKVRDD) .map { case (id: String, (sumScore: Int, (name: String, clazz: String))) => (id, name, clazz, sumScore) } .groupBy(t4 => t4._3) .flatMap { case (clazz: String, t4: Iterable[(String, String, String, Int)]) => //取出班级前三名 t4.toList.sortBy(t4 => -t4._4).take(3) } .map(t4 => s"${t4._1},${t4._2},${t4._3},${t4._4}") .foreach(println) } @Test //5.统计偏科最严重的前100名学生 [学号,姓名,班级,科目,分数] def question5(): Unit = { //找到偏科最严重的前100名学生的id //流程:归一化 -> 方差 -> 排序 -> 提取前100名 //将每个学生不及格的科目成绩记录去除 val subjectKVRDD: RDD[(String, Int)] = subRDD .map(sub => (sub.subId, sub.subScore)) val top100ids: List[String] = scoRDD .map(sco => (sco.subId, sco)) .join(subjectKVRDD) .map { case (subId: String, (sco: Sco, subScore: Int)) => (sco.id, sco.score * 100.0 / subScore) } .groupByKey() .map(kv => { val id: String = kv._1 val scores: Iterable[Double] = kv._2 val avgScore: Double = scores.sum / scores.size val variance: Double = scores .map(score => { Math.pow(score - avgScore, 2) }).sum / scores.size (id, variance) }) .sortBy(-_._2) .map(_._1) .take(100) .toList filterWithIdListAndPrint(top100ids) } } case class Stu(id: String, name: String, age: Int, gender: String, clazz: String) case class Sco(id: String, subId: String, score: Int) case class Sub(subId: String, subName: String, subScore: Int)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。