当前位置:   article > 正文

Spark RDD编程初级实践_rdd初级编程实践

rdd初级编程实践

湖工大永远滴神 茂林!!!

RDD(Resilient Distributed Datasets, 弹性分布式数据集)是Spark最为核心的概念,它是一个只读的、可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,可在多次计算间重用。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作,从而实现各种复杂的应用。

RDD编程都是从创建RDD开始的,可以通过多种方式创建得到RDD。例如,从本地文件或者分布式文件系统HDFS中读取数据创建RDD,或者使用parallelize()方法从一个集合中创建得到RDD。

创建得到RDD以后,就可以对RDD执行各种操作,包括转换操作和行动操作。RDD编程主要是对RDD各种操作API的使用,无论多复杂的Spark应用程序,最终都是借助于这些RDD操作来实现的。另外,通过持久化,可以把RDD保存在内存或者磁盘中,避免多次重复计算。通过对RDD进行分区,不仅可以增加程序并行度,而且在一些应用场景中可以降低网络通信开销。

键值对RDD(Pair RDD)是指每个RDD元素都是(key, value)键值对类型,是一种常见的RDD类型,在Spark编程中经常被使用。常用的键值对转换操作包括reduceByKey(func)、groupByKey()、sortByKey()、sortBy()、mapValues(func)、join()、combineByKey()等。

提交异常问题解决

点击右上角的重置代码仓库图标,再重新提交

第一关 数据去重

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object RemDup {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RemDup").setMaster("local")
    val sc = new SparkContext(conf)
    //输入文件fileA.txt和fileB.txt已保存在本地文件系统/root/step1_files目录中
    val dataFile = "file:///root/step1_files"
    val data = sc.textFile(dataFile, 2)
    /********** Begin **********/
    //第一步:执行过滤操作,把空行丢弃。
    val rdd1 = data.filter(_.trim().length > 0)
    
    //第二步:执行map操作,取出RDD中每个元素,去除尾部空格并生成一个(key, value)键值对。
    val rdd2 = rdd1.map(line => (line.trim, ""))
    
    //第三步:执行groupByKey操作,把所有key相同的value都组织成一个value-list。
    val rdd3 = rdd2.groupByKey()
    
    //第四步:对RDD进行重新分区,变成一个分区,
    //在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。
    val rdd4 = rdd3.partitionBy(new HashPartitioner(1))
    
    //第五步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。
    val rdd5 = rdd4.sortByKey()
    
    //第六步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。
    val rdd6 = rdd5.keys
    
    //第七步:执行collect操作,以数组的形式返回RDD中所有元素。
    val rdd7 = rdd6.collect()
    
    //第八步:执行foreach操作,并使用println打印出数组中每个元素的值。
    println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
    
    rdd7.foreach(println)
    /********** End **********/
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

第二个 整合排序

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object FileSort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FileSort").setMaster("local")
    val sc = new SparkContext(conf)
    //输入文件file1.txt、file2.txt和file3.txt已保存在本地文件系统/root/step2_files目录中
    val dataFile = "file:///root/step2_files"
    val data = sc.textFile(dataFile, 3)
    /********** Begin **********/
    //第一步:执行过滤操作,把空行丢弃。
    val rdd1 = data.filter(_.trim().length > 0)
    
    //第二步:执行map操作,取出RDD中每个元素,去除尾部空格并转换成整数,生成一个(key, value)键值对。
    val rdd2 = rdd1.map(line => (line.trim.toInt, ""))
    
    //第三步:对RDD进行重新分区,变成一个分区,
    //在分布式环境下只有把所有分区合并成一个分区,才能让所有元素排序后总体有序。
    val rdd3 = rdd2.partitionBy(new HashPartitioner(1))
    
    //第四步:执行sortByKey操作,对RDD中所有元素都按照key的升序排序。
    val rdd4 = rdd3.sortByKey()
    
    //第五步:执行keys操作,将键值对RDD中所有元素的key返回,形成一个新的RDD。
    val rdd5 = rdd4.keys
    
    //第六步:执行map操作,取出RDD中每个元素,生成一个(key, value)键值对,
    //其中key是整数的排序位次,value是原待排序的整数。
    var index = 0
    val rdd6 = rdd5.map(t => {
      index = index + 1
      (index, t)
    })
    
    //第七步:执行collect操作,以数组的形式返回RDD中所有元素。
    val rdd7 = rdd6.collect()
    
    //第八步:执行foreach操作,依次遍历数组中每个元素,分别取出(key, value)键值对中key和value,
    //按如下格式输出:key value
    println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
    
    rdd7.foreach(t => println(t._1 + " " + t._2))
    /********** End **********/
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

第三关 求平均值

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf


object AvgScore {
  def main(args: Array[String]): Unit = {
  
    val conf = new SparkConf().setAppName("FileSort").setMaster("local")
    val sc = new SparkContext(conf)
    
    //输入文件AlgorithmScore.txt、DataBaseScore.txt和PythonScore.txt已保存在本地文件系统/root/step3_files目录中
    val dataFile = "file:///root/step3_files"
    val data = sc.textFile(dataFile)
    
    /********** Begin **********/
    //第一步:执行过滤操作,把空行丢弃。
    val rdd1 = data.filter(_.trim().length > 0)
    
    //第二步:执行map操作,取出RDD中每个元素(即一行文本),以空格作为分隔符将一行文本拆分成两个字符串,
    //拆分后得到的字符串封装在一个数组对象中,成为新的RDD中一个元素。
    var rdd2 = rdd1.map(line => line.split(" "))
    
    //第三步:执行map操作,取出RDD中每个元素(即字符串数组),取字符串数组中第一个元素去除尾部空格,
    //取字符串数组中第二个元素去除尾部空格并转换成整数,并由这两部分构建一个(key, value)键值对。
    val rdd3 = rdd2.map(t => (t(0).trim, t(1).trim.toInt))
    
    //第四步:执行mapValues操作,取出键值对RDD中每个元素的value,使用x=>(x,1)这个匿名函数进行转换。
    val rdd4 = rdd3.mapValues(x => (x, 1))
    
    //第五步:执行reduceByKey操作,计算出每个学生所有课程的总分数和总课程门数。
    val rdd5 = rdd4.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
    
    //第六步:执行mapValues操作,计算出每个学生的平均成绩。
    val rdd6 = rdd5.mapValues(x => (x._1.toDouble / x._2))
    
    //第七步:执行collect操作,以数组的形式返回RDD中所有元素。
    val rdd7 = rdd6.collect()
    
    //第八步:执行foreach操作,按如下格式打印出每个学生的平均成绩:姓名 成绩,其中成绩要求保留两位小数。
    println("") //注意:此行不要修改,否则会影响测试结果,在此行之后继续完成第八步的代码。
    
    rdd7.foreach(t => {
      val x = t._2
      println(t._1 + " " + f"$x%1.2f")
    })
    /********** End **********/
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/749253
推荐阅读
相关标签
  

闽ICP备14008679号