当前位置:   article > 正文

sparkRDD常用算子练习_spark算子练习

spark算子练习

封装SparkContext
单例
object getSC {
  val conf = new SparkConf()
  conf.setMaster("local")
  conf.setAppName("demo")
  def getsc() = new SparkContext(conf)
}

样板类
case class GetSparkContext(masterName:String, appName: String) {
  val conf = new SparkConf()
  //设置master属性
  conf.setMaster(masterName)
  conf.setAppName(appName)

  //通过conf创建sc
  //  val sc = new SparkContext(conf)
  def getsc() = new SparkContext(conf)
}

val sc = getSC.getsc()
val sc = GetSparkContext("local[2]", "demo").getsc()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
map、filter
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)   // ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
val rdd3 = rdd2.filter(_ >= 5)
println(rdd3.collect().toBuffer)   // ArrayBuffer(6, 8, 10, 12, 14, 16, 18, 20)
  • 1
  • 2
  • 3
  • 4
flatMap
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
val res = rdd1.flatMap(_.split(" ")).collect
println(res.toBuffer)  // ArrayBuffer(a, b, c, d, e, f, h, i, j)
  • 1
  • 2
  • 3
交集、并集
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
println(rdd3.distinct.collect.toBuffer)  // ArrayBuffer(4, 6, 2, 1, 3, 5)
println(rdd4.collect.toBuffer)  // ArrayBuffer(4, 3)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
join、groupByKey
val rdd1 = sc.makeRDD(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.makeRDD(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求join
val rdd3 = rdd1.join(rdd2)
println(rdd3.collect.toBuffer)  // ArrayBuffer((tom,(1,1)), (jerry,(3,2)))
//求并集
val rdd4 = rdd1 union rdd2
println(rdd4.collect.toBuffer)  // ArrayBuffer((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2))
//按key进行分组
val rdd5=rdd4.groupByKey
println(rdd5.collect.toBuffer)  
// ArrayBuffer((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
cogroup

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))

val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup与groupByKey的区别
println(rdd3.collect.toBuffer)
// ArrayBuffer((jim,(CompactBuffer(),CompactBuffer(2))), (tom,(CompactBuffer(1, 2),
// CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
reduce

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)  // 15
  • 1
  • 2
  • 3
reduceByKey、sortByKey
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
println(rdd4.collect.toBuffer)  // ArrayBuffer((tom,4), (jerry,5), (shuke,3), (kitty,7))
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
println(rdd5.collect.toBuffer)  // ArrayBuffer((kitty,7), (jerry,5), (tom,4), (shuke,3))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
repartition、coalesce
val rdd1 = sc.parallelize(1 to 10,3)
//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.size
//增加分区
rdd1.repartition(4).partitions.size
//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size

// 注意:repartition可以增加和减少rdd中的分区数,coalesce只能减少rdd分区数,增加rdd分区数不会生效。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号