赞
踩
目录
三、RDD的Transformation算子(面试开发重点)
3.1.3 mapPartitionsWithIndex(func) 案例
3.1.9 sample(withReplacement, fraction, seed) 案例
3.1.10 distinct([numTasks])) 案例
3.1.11 coalesce(numPartitions) 案例
3.1.12 repartition(numPartitions) 案例
3.1.13 coalesce和repartition的区别
3.1.14 sortBy(func,[ascending], [numTasks]) 案例
3.1.15 pipe(command, [envVars]) 案例
3.2.2 subtract (otherDataset) 案例
3.2.3 intersection(otherDataset) 案例
3.2.4 cartesian(otherDataset) 案例
3.3.3 reduceByKey(func, [numTasks]) 案例
3.3.4 reduceByKey和groupByKey的区别
3.3.8 sortByKey([ascending], [numTasks]) 案例
3.3.10 join(otherDataset, [numTasks]) 案例
3.3.11 cogroup(otherDataset, [numTasks]) 案例
在上一篇文章中我们介绍了什么是RDD,以及RDD的属性、特征、依赖关系和缓存机制等等:
那么,本文继续重点介绍一下RDD两类算子的使用。
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。
从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
1)使用parallelize()从集合创建
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
2)使用makeRDD()从集合创建
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,我们会在第4章详细介绍。
scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
RDD整体上分为Value类型和Key-Value类型
1. 作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
2. 需求:创建一个1-10数组的RDD,将所有元素*2形成新的RDD
(1)创建
scala> var source = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
(2)打印
scala> source.collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)将所有元素*2
scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26
(4)打印最终结果
scala> mapadd.collect()
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
1. 作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次, 一个函数一次处理所有分区。
2. 需求:创建一个RDD,使每个元素*2组成新的RDD
(1)创建一个RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每个元素*2组成新的RDD
scala> rdd.mapPartitions(x=>x.map(_*2))
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27
(3)打印新的RDD
scala> res3.collect
res4: Array[Int] = Array(2, 4, 6, 8)
1. 作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];
2. 需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD
(1)创建一个RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每个元素跟所在分区形成一个元组组成一个新的RDD
scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))
indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26
(3)打印新的RDD
scala> indexRdd.collect
res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))
1. 作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
2. 需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元素的2倍(2,4,6,8,10)
(1)创建
scala> val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
(2)打印
scala> sourceFlat.collect()
res11: Array[Int] = Array(1, 2, 3, 4, 5)
(3)根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5)
scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26
(4)打印新RDD
scala> flatMap.collect()
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
1. map():每次处理一条数据。
2. mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
3. 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
1. 作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
2. 需求:创建一个4个分区的RDD,并将每个分区的数据放到一个数组
(1)创建
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
(2)将每个分区的数据放到一个数组并收集到Driver端打印
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
1. 作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
2. 需求:创建一个RDD,按照元素模以2的值进行分组。
(1)创建
scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
(2)按照元素模以2的值进行分组
scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26
(3)打印结果
scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
1. 作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。
2. 需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串)
(1)创建
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
(2)打印
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
(3)过滤出含” xiao”子串的形成一个新的RDD
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
(4)打印新RDD
scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
1. 作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。
2. 需求:创建一个RDD(1-10),从中选择放回和不放回抽样
(1)创建RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
(2)打印
scala> rdd.collect()
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)放回抽样
scala> var sample1 = rdd.sample(true,0.4,2)
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26
(4)打印放回抽样结果
scala> sample1.collect()
res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)
(5)不放回抽样
scala> var sample2 = rdd.sample(false,0.2,3)
sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26
(6)打印不放回抽样结果
scala> sample2.collect()
res17: Array[Int] = Array(1, 9)
1. 作用:对源RDD进行去重后返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。
2. 需求:创建一个RDD,使用distinct()对其去重。
(1)创建一个RDD
scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。