当前位置:   article > 正文

Spark练习题+答案_spark work book答案

spark work book答案

Spark练习题

数据

Spark练习题的数据可以通过URL链接下载,均是免费!

然后再项目中新建一个data目录,将数据放在data目录下

  • 学生信息表(data/students.txt):https://download.csdn.net/download/holiday0520/86268311
  • 学生分数表(data/score.txt):https://download.csdn.net/download/holiday0520/86268316
  • 学生科目表(data/subject.txt):https://download.csdn.net/download/holiday0520/86268317
依赖

pom.xml文件需要导入的依赖

<dependencies>
    <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
    
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Scala Compiler -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
练习题目

1.统计年级排名前十学生各科的分数 [学号,姓名,班级,科目,分数]

2.统计总分大于年级平均分的学生 [学号,姓名,班级,总分]

3.统计每科都及格的学生 [学号,姓名,班级,科目,分数]

4.统计每个班级的前三名 [学号,姓名,班级,分数]

5.统计偏科最严重的前100名学生 [学号,姓名,班级,科目,分数]

答案
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.junit.{Before, Test}

//练习
class Demo24_Student {
  var stuRDD: RDD[Stu] = _
  var scoRDD: RDD[Sco] = _
  var subRDD: RDD[Sub] = _
  var sc: SparkContext = _

  def filterWithIdListAndPrint(ids: List[String]): Unit = {
    //将ids进行广播
    val broIds: Broadcast[List[String]] = sc.broadcast(ids)

    //将stuRDD变成KV格式 便于关联
    val studentKVRDD: RDD[(String, (String, String))] = stuRDD
      .filter(stu => broIds.value.contains(stu.id))
      .map(stu => (stu.id, (stu.name, stu.clazz)))

    //将subRDD变成KV根式,便于关联
    val subjectKVRDD: RDD[(String, String)] = subRDD.map(sub => (sub.subId, sub.subName))

    //学生表关联分数表(先过滤再关联)
    scoRDD
      .filter(sco => broIds.value.contains(sco.id))
      .map(sco => (sco.id, sco))
      .join(studentKVRDD)
      .map {
        case (id: String, (sco: Sco, (name: String, clazz: String))) =>
          (sco.subId, (id, name, clazz, sco.score))
      }
      .join(subjectKVRDD)
      .map {
        case (subId: String, ((id: String, name: String, clazz: String, score: Int), subName: String)) =>
          s"$id,$name,$clazz,$subName,$score"
      }
      .sortBy(s => s.split(",")(0)) // 按id排序结果方便查看数据
      .foreach(println)
  }

  @Before
  def init(): Unit = {
    //读取三份数据并构建对应的样例类对象,然后转换为RDD
    sc = new SparkContext(
      new SparkConf()
        .setAppName("Demo_Student")
        .setMaster("local")
    )

    stuRDD = sc.textFile("data/students.txt")
      .map(line => {
        val splits: Array[String] = line.split(",")
        Stu(splits(0), splits(1), splits(2).toInt, splits(3), splits(4))
      })

    scoRDD = sc.textFile("data/score.txt")
      .map(line => {
        val splits: Array[String] = line.split(",")
        Sco(splits(0), splits(1), splits(2).toInt)
      })

    subRDD = sc.textFile("data/subject.txt")
      .map(line => {
        val splits: Array[String] = line.split(",")
        Sub(splits(0), splits(1), splits(2).toInt)
      })

  }

  @Test
  //测试
  def printRDD(): Unit = {
    stuRDD.take(10).foreach(println)
    scoRDD.take(10).foreach(println)
    subRDD.take(10).foreach(println)
  }

  @Test
  //1.统计年级排名前十学生各科的分数 [学号,姓名,班级,科目,分数]
  def question1(): Unit = {
    //通过scoRDD计算学生总分,按降序排名,取前10的学生id,并关联学生、科目表
    val top10Ids: List[String] = scoRDD
      .map(sco => (sco.id, sco.score))
      .reduceByKey(_ + _) //计算学生总分
      .sortBy(kv => kv._2, ascending = false) //按照总分降序排列
      .map(kv => kv._1) //不要总分,直接取id
      .take(10) //取前十名
      .toList

    /*
    //将top10Ids进行广播
    val broadTop10Ids: Broadcast[List[String]] = sc.broadcast(top10Ids)

    //将stuRDD变成KV格式,便于关联
    val studentKVRDD: RDD[(String, (String, String))] = stuRDD
      .filter(stu => broadTop10Ids.value.contains(stu.id))
      .map(stu => (stu.id, (stu.name, stu.clazz)))

    //将subRDD变成KV格式,便于关联
    val subjectKVRDD: RDD[(String, String)] = subRDD
      .map(sub => (sub.subId, sub.subName))

    //学生表关联分数表(先过滤再关联)
    scoRDD
      .filter(sco => broadTop10Ids.value.contains(sco.id))
      .map(sco => (sco.id, sco))
      .join(studentKVRDD)
      .map {
        case (id: String, (sco: Sco, (name: String, clazz: String))) =>
          (sco.subId, (id, name, clazz, sco.score))
      }
      .join(subjectKVRDD)
      .map {
        case (subId: String, ((id: String, name: String, clazz: String, score: Int), subName: String)) =>
          s"$id,$name,$clazz,$subName,$score"
      }
      .sortBy(s => s.split(",")(0)) //结果按id排序,方便查看数据
      .foreach(println)
     */

    //优化
    filterWithIdListAndPrint(top10Ids)
  }

  @Test
  //2.统计总分大于年级平均分的学生 [学号,姓名,班级,总分]
  def question2(): Unit = {
    //计算平均分,然后过滤出总分大于平均分的学生
    val sumScoreRDD: RDD[(String, Int)] = scoRDD
      .map(sco => (sco.id, sco.score))
      .reduceByKey(_ + _)

    //对多次使用的RDD进行缓存
    sumScoreRDD.cache()

    val sumScoreAndCnt: (Int, Int) = sumScoreRDD
      .map(kv => (1, kv._2))
      .aggregateByKey((0, 0))(
        (u: (Int, Int), sumScore: Int) => (u._1 + sumScore, u._2 + 1),
        (u1, u2) => (u1._1 + u2._1, u1._2 + u2._2)
      )
      .collect()(0)._2

    //平均成绩
    val avgSumScore: Double = sumScoreAndCnt._1.toDouble / sumScoreAndCnt._2
    println(avgSumScore)

    //过滤出总分大于平均分的学生
    val passSumScoreRDD: RDD[(String, Int)] = sumScoreRDD
      .filter(kv => kv._2 > avgSumScore)

    passSumScoreRDD.cache()

    //取出总分大于平均分的学生的id
    val passIDs: List[String] = passSumScoreRDD
      .map(kv => kv._1)
      .collect()
      .toList

    val broadPassIDs: Broadcast[List[String]] = sc.broadcast(passIDs)

    //将stuRDD变成KV格式,便于关联
    val studentKVRDD: RDD[(String, (String, String))] = stuRDD
      .filter(stu => broadPassIDs.value.contains(stu.id))
      .map(stu => (stu.id, (stu.name, stu.clazz)))

    passSumScoreRDD
      .join(studentKVRDD)
      .map {
        case (id: String, (sumScore: Int, (name: String, clazz: String))) =>
          s"$id,$name,$clazz,$sumScore"
      }
      .foreach(println)

    //释放缓存
    sumScoreRDD.unpersist()
    passSumScoreRDD.unpersist()
  }

  @Test
  //3.统计每科都及格的学生 [学号,姓名,班级,科目,分数]
  def question3(): Unit = {
    //将每个学生不及格的科目成绩记录去除
    val subjectKVRDD: RDD[(String, Int)] = subRDD
      .map(sub => (sub.subId, sub.subScore))

    //找到所有科目都及格的学生的id
    val passAllSubIds: List[String] = scoRDD
      .map(sco => (sco.subId, sco))
      .join(subjectKVRDD)
      .filter {
        case (subId: String, (sco: Sco, subScore: Int)) =>
          sco.score >= subScore * 0.6
      }
      .map {
        case (subId: String, (sco: Sco, subScore: Int)) =>
          (sco.id, 1)
      }
      .reduceByKey(_ + _) //统计每个学生及格的科目数量
      .filter(kv => kv._2 == 6) //取出6门科目都及格的学生成绩
      .map(_._1) //取出id
      .collect()
      .toList

    filterWithIdListAndPrint(passAllSubIds)
  }

  @Test
  //4.统计每个班级的前三名 [学号,姓名,班级,分数]
  def question4(): Unit = {
    //将stuRDD变成KV格式,便于关联
    val studentKVRDD: RDD[(String, (String, String))] = stuRDD
      .map(stu => (stu.id, (stu.name, stu.clazz)))

    scoRDD
      .map(sco => (sco.id, sco.score))
      .reduceByKey(_ + _) //计算学生总分
      .join(studentKVRDD)
      .map {
        case (id: String, (sumScore: Int, (name: String, clazz: String))) =>
          (id, name, clazz, sumScore)
      }
      .groupBy(t4 => t4._3)
      .flatMap {
        case (clazz: String, t4: Iterable[(String, String, String, Int)]) =>
          //取出班级前三名
          t4.toList.sortBy(t4 => -t4._4).take(3)
      }
      .map(t4 => s"${t4._1},${t4._2},${t4._3},${t4._4}")
      .foreach(println)
  }

  @Test
  //5.统计偏科最严重的前100名学生 [学号,姓名,班级,科目,分数]
  def question5(): Unit = {
    //找到偏科最严重的前100名学生的id
    //流程:归一化 -> 方差 -> 排序 -> 提取前100名

    //将每个学生不及格的科目成绩记录去除
    val subjectKVRDD: RDD[(String, Int)] = subRDD
      .map(sub => (sub.subId, sub.subScore))

    val top100ids: List[String] = scoRDD
      .map(sco => (sco.subId, sco))
      .join(subjectKVRDD)
      .map {
        case (subId: String, (sco: Sco, subScore: Int)) =>
          (sco.id, sco.score * 100.0 / subScore)
      }
      .groupByKey()
      .map(kv => {
        val id: String = kv._1
        val scores: Iterable[Double] = kv._2
        val avgScore: Double = scores.sum / scores.size
        val variance: Double = scores
          .map(score => {
            Math.pow(score - avgScore, 2)
          }).sum / scores.size
        (id, variance)
      })
      .sortBy(-_._2)
      .map(_._1)
      .take(100)
      .toList

    filterWithIdListAndPrint(top100ids)
  }
}

case class Stu(id: String, name: String, age: Int, gender: String, clazz: String)

case class Sco(id: String, subId: String, score: Int)

case class Sub(subId: String, subName: String, subScore: Int)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号