赞
踩
代码
RDD创建
1、sc.parallelize(seq) 把seq这个数据并行化分片到节点
val a = sc.parallelize(List("a","b","c"))
2、sc.makeRDD(seq) 把seq这个数据并行化分片到节点,他的实现就是parallelize
val aaa = sc.makeRDD(List("d","e","f"))
3、sc.makeRDD(seq[(T,seq)] 这种方式可以指定RDD的存放位置
val aa = sc.makeRDD(List((1,List("a","b","c")),(2,List("d","e","f"))))
(RDD中的操作:转换,行动)
RDD转换
map(func):将函数应用于RDD的每一元素,并返回一个新的RDD
val sourceRdd = sc.makeRDD(1 to 10)
sourceRdd.map(_ + 2)
res0.collect
filter(func):通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD
val filter = sc.makeRDD(Array("aa1","bb1","aa2","cc3"))
filter.filter(_.startsWith("aa")).collect
flatMap(func):将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合
val flat = sc.makeRDD(1 to 3)
flat.flatMap((1 to _)).collect
mapPartitions(func):将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。
val person = sc.makeRDD(List(("a","female"),("b","male"),("c","female")))
def partitionsFun(iter: Iterator[(String,String)]):Iterator[String] = {
var woman = List[String]()
while(iter.hasNext){
val next = iter.next()
next match {
case (_,"female") => woman = next._1 :: woman
case _=>
}
}
woman.iterator
}
person.mapPartitions(partitionsFun).collect
mapPartitionsWithIndex(func):将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。
val person = sc.makeRDD(List(("a","female"),("b","male"),("c","female")))
def partitionsFun(index: Int ,iter: Iterator[(String,String)]):Iterator[String] = {
var woman = List[String]()
while(iter.hasNext){
val next = iter.next()
next match {
case (name,"female") => woman = "[" + index.toString + "]" + name :: woman
case _=>
}
}
woman.iterator
}
person.mapPartitions(partitionsFun).collect
sample(withReplacement, fraction, seed):在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。
val sample = sc.makeRDD(1 to 100)
sample.sample(false,0.1,4).collect
union(otherDataset):将两个RDD中的元素进行合并,返回一个新的RDD(合并元素不去重)
val a = sc.makeRDD(1 to 10)
sc.makeRDD(5 to 15).union(a).collect
intersection(otherDataset):将两个RDD做交集,返回一个新的RDD
distinct([numTask]):去重
partitionBy():根据设置的分区器重新将RDD进行分区,返回新的RDD。
val hash = sc.makeRDD(List((1,"a"),(2,"b"),(3,"c")))
hash.partitionBy(new org.apache.spark.HashPartitioner(2))
res8.partitions.size
res8.partitioner
reduceByKey(func):根据Key值将相同Key的元组的值用func进行计算,返回新的RDD
val reduce = sc.makeRDD(List(("female",2),("fe
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。