当前位置:   article > 正文

【Spark】RDD入门编程实践(完整版)_rddfffffrrrdfffdrdrrrrddrdd4rrrrdrrrrrddrdrdrr4drd

rddfffffrrrdfffdrdrrrrddrdd4rrrrdrrrrrddrdrdrr4drdrrrrdtf$familry

1 RDD创建

1.1 从文件系统中加载数据

Spark的SparkContext通过textFile()读取数据生成内存中的RDD

1、支持的数据类型:

  • 本地文件系统
  • 分布式文件系统HDFS
  • Amazon S3等

2、从本地文件系统加载

scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")

lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:27
  • 1
  • 2
  • 3

从执行结果反馈信息可以看出,lines是一个String类型的RDD,或者我们以后可以简单称为RDD[String],也就是说,这个RDD[String]里面的元素都是String类型。

3、从HDFS中加载数据三种方式

前提:当前登录的系统用户名为hadoop

scala> val lines = sc.textFile("hdfs://node:9000/user/hadoop/word.txt")
  • 1
scala> val lines = sc.textFile("/user/hadoop/word.txt")
  • 1
scala> val lines = sc.textFile("word.txt")
  • 1

1.2 通过并行集合

可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

scala>val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

scala>val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29
  • 1
  • 2
  • 3
  • 4
  • 5

从执行结果信息可以看出,rdd是一个Int类型的RDD。上面使用数组来创建或者也可以从列表中创建:

scala>val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala>val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29
  • 1
  • 2
  • 3
  • 4
  • 5

从执行结果信息可以看出,rdd是一个Int类型的RDD

2 RDD的操作

1、RDD被创建好以后,在后续使用过程中一般会发生两种操作:

a) 转换(Transformation): 基于现有的数据集合创建一个新的数据集。

b)行动(Action):在数据集上进行运算,返回计算值。

2.1 转换

1、对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。

2、转换得到的RDD是惰性求值的,也就是说整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

3、下面列出一些常见的转换操作(Transformation API)

2.1.1 filter

筛选出满足函数func的元素,并返回一个新的数据集;

scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[16] at textFile at <console>:27

scala> lines.filter(line => line.contains("Spark")).count()
res1: Long = 2  //这是执行返回的结果
  • 1
  • 2
  • 3
  • 4
  • 5

上面的代码中,lines就是一个RDD。lines.filter()会遍历lines中的每行文本,并对每行文本执行括号中的匿名函数,也就是执行Lamda表达式:line => line.contains(“Spark”),在执行Lamda表达式时,会把当前遍历到的这行文本内容赋值给参数line,然后,执行处理逻辑line.contains(“Spark”),也就是只有当该行文本包含“Spark”才满足条件,才会被放入到结果集中。最后,等到lines集合遍历结束后,就会得到一个新结果集,这个结果集中包含了所有包含“Spark”的行。最后,对这个结果集调用count(),这是一个行动操作,会计算出结果集中的元素个数。

2.1.2 map

将每个元素传递到函数func中,并将结果返回为一个新的数据集;

scala > data = Array(1,2,3,4,5)

scala > val rdd1 = sc.parallelize(data)

scala > val rdd2 = rdd1.map(x=>x+10)
  • 1
  • 2
  • 3
  • 4
  • 5

将每一行按照空格进行拆分,保存在array中

scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")

scala> lines.map(line => line.split(" "))
  • 1
  • 2
  • 3

在这里插入图片描述

2.1.3 flatMap

与map()相似,但每个输入元素都可以映射到0或多个输出结果;

scala > val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")

scala > val words = lines.flatMap(line=>line.split(""))
  • 1
  • 2
  • 3

在这里插入图片描述

2.1.4 groupByKey

应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

2.1.5 reduceByKey

应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

  • 经过分组获得<key,value-list>
  • 根据传入的函数对value-list进行操作

2.2 行动

下面列出一些常见的行动操作(Action API)

2.2.1 count

返回数据集中的元素个数

2.2.2 collect

以数组的形式返回数据集中的所有元素

2.2.3 first

返回数据集中的第一个元素

2.2.4 take(n)

以数组的形式返回数据集中的前n个元素

2.2.5 reduce(func)

通过函数func(输入两个参数并返回一个值)聚合数据集中的元素

2.2.6 foreach(func)

将数据集中的每个元素传递到函数func中运行

2.2.7 惰性机制

这里给出一段简单的代码来解释Spark的惰性机制。

scala> val lines = sc.textFile("word.txt")
scala> val lineLengths = lines.map(s => s.length)
scala> val totalLength = lineLengths.reduce((a, b) => a + b)
  • 1
  • 2
  • 3

上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中,这时的lines只是一个指向这个文件的指针。

第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。

第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。此时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给Driver Program。

2.4 持久化

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29

scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算
3

scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

上面代码执行过程中,前后共触发了两次从头到尾的计算

实际上,可以通过持久化(缓存)机制可避免这种重复计算的开销。

可以使用persist() 方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。

2.4.1 persist()

1、调用persist()后并不会马上计算生成RDD并把它持久化

2、遇到动作类型操作才会真正持久化

3、persist()的圆括号中包含的是持久化级别参数

  • persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。
  • persist(MEMORY_AND_DISK):表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。

4、一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)

5、使用.unpersist() 方法手动吧持久化的RDD从缓存中移除

例子如下:

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29

scala> rdd.cache()  //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这时rdd还没有被计算生成

scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
3

scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.5 分区

1、RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。

2、RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目

3、效果

  • 增加并行度,实现分布式计算
  • 减少通信开销

4、对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

a) 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
b) Apache Mesos:默认的分区数为8;
c) Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;

5、设置分区的方法

  • 语法格式:sc.textFile(path,分区数量)
scala>val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

scala>val rdd = sc.parallelize(array,2) #设置两个分区
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29
  • 1
  • 2
  • 3
  • 4
  • 5
  • 对已经分区的进行重新分区
scala > val data = sc.textFile("file:///....",2)

scala > val rdd = data.repartition(1)//重新分区
  • 1
  • 2
  • 3
  • 自定义分区
    自定义一个Partitioner类,继承org.apache.spark.Partitioner
    覆盖方法:
    numPartitions:Int 返回创建出来的分区数量
    getPartition(key:Any):Int返回给定键的分区编号
    equals() Java判断相等性的标准方法
//根据key的最后一位数字,写到不同文件中
import org.apache.spark.Partitioner

//自定义分区类
class MyPartitioner(numParts:Int)extends Partitioner{
	//覆盖分区数量
	override def numPartitions:Int = numParts
	//覆盖分区号获取函数
	override def  getPartition(Key:Any):Int={
		//key的最后一位,生成区编号
		key.toString.toInt%10
	}
}

object TestPartitioner{
	def main(args:Array[String]){
		val conf = new SparkConf()
		val sc = new SparkContext(conf)
		//模拟5个分区
		val data = sc.parallelize(1 to 10,5)
		//根据尾号转变为10个分区
		data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("file:///partitionrt")
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
//根据尾号转变为10个分区
data.map((_,1)).partitionBy(new MyPartitioner(10)).map(_._1).saveAsTextFile("file:///partitionrt")
  • 1
  • 2

注意:Partitioner自定义分区类只支持键值对
在这里插入图片描述

3 键值对RDD应用编程

3.1 创建

3.1.1 从文件加载

scala> val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")

scala> val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))
  • 1
  • 2
  • 3

3.1.2 通过并行集合(数组)

scala> val list = List("Hadoop","Spark","Hive","Spark")

scala> val rdd = sc.parallelize(list)

scala> val pairRDD = rdd.map(word => (word,1))
  • 1
  • 2
  • 3
  • 4
  • 5

3.2 转换操作

常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等

3.2.1 reduceByKey(func)

使用func函数合并具有相同key的值

scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)
  • 1
  • 2
  • 3
  • 4

3.2.2 groupByKey()

对具有相同的key进行分组

scala> pairRDD.groupByKey()
  • 1

在这里插入图片描述
在这里插入图片描述

3.2.3 keys

把pair RDD中的key返回形成一个新的RDD

scala> pairRDD.keys

scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.2.4 values

scala> pairRDD.values

scala> pairRDD.values.foreach(println)
1
1
1
1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3.2.5 sortByKey()

返回一个根据键排序的RDD(默认升序)

降序:sortByKey(false)

scala> pairRDD.sortByKey()

scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

根据value排序:sortBy

scala > d.sortBy(_._2,false).collect
  • 1

3.2.6 mapValues(func)

对键值对RDD中的每个value都应用一个函数,key不会发生变化

scala > pairRDD.mapValues(x=>x+1)
  • 1

3.2.7 join()

将几个RDD当中元素key相同的进行连接

scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))

scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))

scala> pairRDD1.join(pairRDD2)

scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4 数据读写

4.1 文件数据读写

4.1.1 本地文件读写

//即使不存在也不会报错(惰性机制)
scala> val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

//指定目录,生成part-00000
scala> textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")

//再次加载,只给出路径名称
scala> val textFile = sc.TextFile("file:///usr/local/spark/mycode/wordcount/writeback")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4.1.2 HDFS读写

scala> val textFile = sc.textFile("hdfs://mnode1:9000/user/hadoop/word.txt")
scala> val textFile = sc.textFile("/user/hadoop/word.txt")
scala> val textFile = sc.textFile("word.txt")

//指定目录,生成part-00000
scala> textFile.saveAsTextFile("writeback")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4.1.3 JSON文件读写

解析:

//可能会失败,成功后会返回Some对象
scala > JSON.parseFull(jsonString:String)函数解析
  • 1
  • 2

读取JSON例子:

val jsonFile = sc.textFile("file:///people.json")
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach({r => r match{
			case Some(map:Map[String,Any])=>println(map)
			case None => println("failed")
			case other => println("other)
			 }
			}
			)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4.2 HBase读写

4.2.1 准备工作

新建一个终端,执行下面命令,把HBase的lib目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,需要拷贝的jar文件包括:所有hbase开头的jar文件、guava-11.0.2.jar可以打开一个终端按照以下命令来操作:

cd /usr/local/spark/jars
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava*.jar ./
  • 1
  • 2
  • 3
  • 4
  • 5

只有这样,后面编译和运行过程才不会出错。

4.2.2 读取HBase数据

就需要使用SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中

object SparkOperateHBase {
	def main(args: Array[String]) {
	
	    val conf = HBaseConfiguration.create()
	    val sc = new SparkContext(new SparkConf())
	    
	    //设置查询的表名
	    conf.set(TableInputFormat.INPUT_TABLE, "student")
	    //放在stuRDD中
	    val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
	  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
	  classOf[org.apache.hadoop.hbase.client.Result])
	    //检查记录条数
	    val count = stuRDD.count()
	    println("Students RDD Count:" + count)
	    
	    //持久化
	    stuRDD.cache()
	    //遍历输出
	    stuRDD.foreach({ case (_,result) =>
	        val key = Bytes.toString(result.getRow)
	        val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
	        val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
	        val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
	        println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
	    })
	  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在sbt文件中添加

name := "SparkRDD"
version := "0.1"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "2.3.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "2.3.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "2.3.2"
libraryDependencies += "org.apache.hbase" % "hbase-mapreduce" % "2.3.2"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4.2.3 写HBase数据

object SparkWriteHBase {  
  def main(args: Array[String]): Unit = {  
    val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")  
    val sc = new SparkContext(sparkConf) 
    val tablename = "student" 
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
    //设置作业
    val job = new Job(sc.hadoopConfiguration) 
    job.setOutputKeyClass(classOf[ImmutableBytesWritable]) 
    job.setOutputValueClass(classOf[Result]) 
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) 
    
    //生成一个RDD
    val indataRDD = sc.makeRDD(Array("3,Wangwu,M,26","4,Zhaoliu,M,27")) //构建两行记录
    val rdd = indataRDD.map(_.split(',')).map{arr=>{  
    val put = new Put(Bytes.toBytes(arr(0))) //行健的值 
    put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  //info:name列的值
      put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))  //info:gender列的值
      put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))  //info:age列的值
      (new ImmutableBytesWritable, put)
    }}
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  } 
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/749272
推荐阅读
相关标签
  

闽ICP备14008679号