赞
踩
单例
object getSC {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("demo")
def getsc() = new SparkContext(conf)
}
样板类
case class GetSparkContext(masterName:String, appName: String) {
val conf = new SparkConf()
//设置master属性
conf.setMaster(masterName)
conf.setAppName(appName)
//通过conf创建sc
// val sc = new SparkContext(conf)
def getsc() = new SparkContext(conf)
}
val sc = getSC.getsc()
val sc = GetSparkContext("local[2]", "demo").getsc()
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true) // ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
val rdd3 = rdd2.filter(_ >= 5)
println(rdd3.collect().toBuffer) // ArrayBuffer(6, 8, 10, 12, 14, 16, 18, 20)
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
val res = rdd1.flatMap(_.split(" ")).collect
println(res.toBuffer) // ArrayBuffer(a, b, c, d, e, f, h, i, j)
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
println(rdd3.distinct.collect.toBuffer) // ArrayBuffer(4, 6, 2, 1, 3, 5)
println(rdd4.collect.toBuffer) // ArrayBuffer(4, 3)
val rdd1 = sc.makeRDD(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.makeRDD(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val rdd3 = rdd1.join(rdd2)
println(rdd3.collect.toBuffer) // ArrayBuffer((tom,(1,1)), (jerry,(3,2)))
//求并集
val rdd4 = rdd1 union rdd2
println(rdd4.collect.toBuffer) // ArrayBuffer((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2))
//按key进行分组
val rdd5=rdd4.groupByKey
println(rdd5.collect.toBuffer)
// ArrayBuffer((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
println(rdd3.collect.toBuffer)
// ArrayBuffer((jim,(CompactBuffer(),CompactBuffer(2))), (tom,(CompactBuffer(1, 2),
// CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _) // 15
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
println(rdd4.collect.toBuffer) // ArrayBuffer((tom,4), (jerry,5), (shuke,3), (kitty,7))
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
println(rdd5.collect.toBuffer) // ArrayBuffer((kitty,7), (jerry,5), (tom,4), (shuke,3))
val rdd1 = sc.parallelize(1 to 10,3)
//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.size
//增加分区
rdd1.repartition(4).partitions.size
//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size
// 注意:repartition可以增加和减少rdd中的分区数,coalesce只能减少rdd分区数,增加rdd分区数不会生效。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。