赞
踩
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据(计算)抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。
RDD的属性
RDD特点
RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
分区
RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
只读
如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了,如下图所示。
RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系;另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。
依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。
CheckPoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
编程模型
在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,如下图所示。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。
RDD整体上分为Value类型和Key-Value类型
map(func)案例
(1)创建
scala> var source = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
(2)打印
scala> source.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)将所有元素*2
scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
(4)打印最终结果
scala> mapadd.collect()
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
mapPartitions(func) 案例
(1)创建一个RDD
val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)使每个元素*2组成新的RDD
scala> rdd.mapPartitions(x=>x.map(_*2))
res2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at mapPartitions at <console>:26
(3)打印新的RDD
scala> res2.collect
res3: Array[Int] = Array(2, 4, 6, 8)
mapPartitionsWithIndex(func) 案例
(1)创建一个RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每个元素跟所在分区形成一个元组组成一个新的RDD
scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))
indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:25
(3)打印新的RDD
scala> indexRdd.collect
res4: Array[(Int, Int)] = Array((0,1), (1,2), (2,3), (3,4))
flatMap(func) 案例
val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
(2)打印
scala> sourceFlat.collect()
res5: Array[Int] = Array(1, 2, 3, 4, 5)
(3)根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5)
scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at flatMap at <console>:25
(4)打印新RDD
scala> flatMap.collect()
res6: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
map()和mapPartition()的区别
glom案例
(1)创建
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
(2)将每个分区的数据放到一个数组并收集到Driver端打印
scala> rdd.glom().collect()
res7: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
groupBy(func)案例
(1)创建
scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
(2)按照元素模以2的值进行分组
scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[12] at groupBy at <console>:25
(3)打印结果
scala> group.collect
res8: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
filter(func) 案例
(1)创建
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)打印
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
(3)过滤出含” xiao”子串的形成一个新的RDD
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at filter at <console>:25
(4)打印新RDD
scala> filter.collect()
res12: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
sample(withReplacement, fraction, seed) 案例
(1)创建RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24
(2)打印
scala> rdd.collect()
res13: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)放回抽样
scala> var sample1 = rdd.sample(true,0.4,2)
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[17] at sample at <console>:25
(4)打印放回抽样结果
scala> sample1.collect()
res14: Array[Int] = Array(7, 10)
(5)不放回抽样
scala> var sample2 = rdd.sample(false,0.2,3)
sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[18] at sample at <console>:25
(6)打印不放回抽样结果
scala> sample2.collect()
res15: Array[Int] = Array(10)
distinct([numTasks])) 案例
(1)创建一个RDD
scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
(2)对RDD进行去重(不指定并行度)
scala> val unionRDD = distinctRdd.distinct()
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at distinct at <console>:25
(3)打印去重后生成的新RDD
scala> unionRDD.collect()
res16: Array[Int] = Array(1, 9, 5, 6, 2)
(4)对RDD(指定并行度为2)
scala> val unionRDD = distinctRdd.distinct(2)
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at distinct at <console>:25
(5)打印去重后生成的新RDD
scala> unionRDD.collect()
res17: Array[Int] = Array(6, 2, 1, 9, 5)
coalesce(numPartitions) 案例
(1)创建一个RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
(2)查看RDD的分区数
scala> rdd.partitions.size
res20: Int = 4
(3)对RDD重新分区
scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[30] at coalesce at <console>:25
(4)查看新RDD的分区数
scala> coalesceRDD.partitions.size
res21: Int = 3
repartition(numPartitions) 案例
(1)创建一个RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
(2)查看RDD的分区数
scala> rdd.partitions.size
res22: Int = 4
(3)对RDD重新分区
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[35] at repartition at <console>:25
(4)查看新RDD的分区数
scala> rerdd.partitions.size
res23: Int = 2
coalesce和repartition的区别
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
sortBy(func,[ascending], [numTasks]) 案例
(1)创建一个RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24
(2)按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res24: Array[Int] = Array(1, 2, 3, 4)
(3)按照与3余数的大小排序
scala> rdd.sortBy(x => x%3).collect()
res25: Array[Int] = Array(3, 1, 4, 2)
pipe(command, [envVars]) 案例
(1)在/opt/module/Spark/job编写一个脚本
Shell脚本
[root@node01 job]# vi pipe.sh
#!/bin/sh
echo "AA"
while read LINE; do
echo ">>>"${LINE}
done
赋予权限
[root@node01 job]# chmod a+x pipe.sh
(2)创建一个只有一个分区的RDD
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[47] at parallelize at <console>:24
(3)将脚本作用该RDD并打印
scala> rdd.pipe("/opt/module/Spark/job/pipe.sh").collect()
res27: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
(4)创建一个有两个分区的RDD
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
(5)将脚本作用该RDD并打印
scala> rdd.pipe("/opt/module/Spark/job/pipe.sh").collect()
res28: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)
union(otherDataset) 案例
(1)创建第一个RDD
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)创建第二个RDD
scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
(3)计算两个RDD的并集
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:27
(4)打印并集结果
scala> rdd3.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
subtract (otherDataset) 案例
(1)创建第一个RDD
scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
(2)创建第二个RDD
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(3)计算第一个RDD与第二个RDD的差集并打印
scala> rdd.subtract(rdd1).collect()
res1: Array[Int] = Array(8, 6, 7)
intersection(otherDataset) 案例
(1)创建第一个RDD
scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24
(2)创建第二个RDD
scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24
(3)计算两个RDD的交集
scala> val rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at intersection at <console>:27
(4)打印计算结果
scala> rdd3.collect()
res2: Array[Int] = Array(5, 6, 7)
cartesian(otherDataset) 案例
(1)创建第一个RDD
scala> val rdd1 = sc.parallelize(1 to 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24
(2)创建第二个RDD
scala> val rdd2 = sc.parallelize(2 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:24
(3)计算两个RDD的笛卡尔积并打印
scala> rdd1.cartesian(rdd2).collect()
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))
zip(otherDataset)案例
(1)创建第一个RDD
scala> val rdd1 = sc.parallelize(Array(1,2,3),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
(2)创建第二个RDD(与1分区数相同)
scala> val rdd2 = sc.parallelize(Array("a","b","c"),3)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24
(3)第一个RDD组合第二个RDD并打印
scala> rdd1.zip(rdd2).collect
res4: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
(4)第二个RDD组合第一个RDD并打印
scala> rdd2.zip(rdd1).collect
res5: Array[(String, Int)] = Array((a,1), (b,2), (c,3))
(5)创建第三个RDD(与1,2分区数不同)
scala> val rdd3 = sc.parallelize(Array("a","b","c"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24
(6)第一个RDD组合第三个RDD并打印
scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
... 47 elided
partitionBy案例
(1)创建一个RDD
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at <console>:24
(2)查看RDD的分区数
scala> rdd.partitions.size
res7: Int = 4
(3)对RDD重新分区
scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[27] at partitionBy at <console>:25
(4)查看新RDD的分区数
scala> rdd2.partitions.size
res8: Int = 2
groupByKey案例
(1)创建一个pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[29] at map at <console>:26
(2)将相同key对应值聚合到一个sequence中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at <console>:25
(3)打印结果
scala> group.collect()
res9: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
(4)计算相同key对应值的相加结果
scala> group.map(t => (t._1, t._2.sum))
res10: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[31] at map at <console>:26
(5)打印结果
scala> res10.collect()
res11: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
reduceByKey(func, [numTasks]) 案例
(1)创建一个pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:24
(2)计算相同key对应值的相加结果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at <console>:25
(3)打印结果
scala> reduce.collect()
res12: Array[(String, Int)] = Array((female,6), (male,7))
reduceByKey和groupByKey的区别
aggregateByKey案例
参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
(1)创建一个pairRDD
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> rdd.glom.collect()
res15: Array[Array[(String, Int)]] = Array(Array((a,3), (a,2), (c,4)), Array((b,3), (c,6), (c,8)))
(2)取出每个分区相同key对应值的最大值,然后相加
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[35] at aggregateByKey at <console>:25
(3)打印结果
scala> agg.collect()
res13: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
foldByKey案例
参数:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24
scala> rdd.glom.collect()
res16: Array[Array[(Int, Int)]] = Array(Array((1,3), (1,2)), Array((1,4), (2,3)), Array((3,6), (3,8)))
(2)计算相同key对应值的相加结果
scala> val agg = rdd.foldByKey(0)(_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[40] at foldByKey at <console>:25
(3)打印结果
scala> agg.collect()
res17: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
combineByKey[C] 案例
参数:(createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
图2- combineByKey案例分析
(1)创建一个pairRDD
scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)将相同key对应的值相加,同时记录该key出现的次数,放入一个二元组
scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))
combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[1] at combineByKey at <console>:25
(3)打印合并后的结果
scala> combine.collect
res0: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
(4)计算平均值
scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)}
result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[2] at map at <console>:25
(5)打印结果
scala> result.collect()
res1: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))
sortByKey([ascending], [numTasks]) 案例
(1)创建一个pairRDD
scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[3] at parallelize at <console>:24
(2)按照key的正序
scala> rdd.sortByKey(true).collect()
res2: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
(3)按照key的倒序
scala> rdd.sortByKey(false).collect()
res3: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
mapValues案例
(1)创建一个pairRDD
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at parallelize at <console>:24
(2)对value添加字符串"|||"
scala> rdd3.mapValues(_+"|||").collect()
res4: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
join(otherDataset, [numTasks]) 案例
(1)创建第一个pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[12] at parallelize at <console>:24
(2)创建第二个pairRDD
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(3)join操作并打印结果
scala> rdd.join(rdd1).collect()
res5: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
cogroup(otherDataset, [numTasks]) 案例
(1)创建第一个pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[29] at parallelize at <console>:24
(2)创建第二个pairRDD
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:24
(3)cogroup两个RDD并打印结果
scala> rdd.cogroup(rdd1).collect()
res10: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
(1)创建第一个pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[29] at parallelize at <console>:24
(2)创建第二个pairRDD
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(4,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:24
(3)cogroup两个RDD并打印结果
第一种情况
scala> rdd.cogroup(rdd1).collect()
res11: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(6))), (1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer())))
第二种情况
scala> rdd1.cogroup(rdd).collect()
res12: Array[(Int, (Iterable[Int], Iterable[String]))] = Array((4,(CompactBuffer(6),CompactBuffer())), (1,(CompactBuffer(4),CompactBuffer(a))), (2,(CompactBuffer(5),CompactBuffer(b))), (3,(CompactBuffer(),CompactBuffer(c))))
案例实操
package cn.zut.bigdata import org.apache.spark.{SparkConf, SparkContext} object MyPractice { /** * 数据结构:时间戳,省份,城市,用户,广告,中间字段使用空格分割 * * 数据格式如下 * 1516609143867 6 7 64 16 * 1516609143869 9 4 75 18 * 1516609143869 1 7 87 12 * * 需求:统计出每一个省份广告被点击次数的TOP3 * * @param args */ def main(args: Array[String]): Unit = { // 初始化spark配置信息并建立与spark的连接 val conf = new SparkConf().setMaster("local[*]").setAppName("MyPractice") val sc = new SparkContext(conf) //在项目的根目录下创建文件夹input // 读取数据生成RDD:TS,Province,City,User,AD val line = sc.textFile("input/agent.log") // 按照最小粒度聚合:((Province,AD),1) val provinceAdToOne = line.map(x => { val fields: Array[String] = x.split(" ") ((fields(1), fields(4)), 1) }) // 计算每个省份中广告被点击的总数:((Province,AD),sum) val provinceAdToSum = provinceAdToOne.reduceByKey(_+_) // 将省份作为key,广告加点击数为value:(Province,(AD,sum)) val provinceToAdSum = provinceAdToSum.map(x => (x._1._1,(x._1._2,x._2))) // 将同一个省份的所有的、广告进行聚合(Province,List((AD1,sum1),(AD2,sum2)...)) val provinceGroup = provinceToAdSum.groupByKey() // 对同一个省份所有广告的集合进行排序并取前3条,排序规则为广告点击总数 val provinceAdTop3 = provinceGroup.mapValues(x => { x.toList.sortWith((x,y) => x._2 > y._2).take(3) }) // 将数据拉取到Driver端并打印 provinceAdTop3.collect().foreach(println) } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.zut.bigdata</groupId> <artifactId>SparkP</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.4</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.12</version> </dependency> </dependencies> </project>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。