当前位置:   article > 正文

Spark(2)-基础tranform算子(二)

Spark(2)-基础tranform算子(二)

一、算子列表

编号名称
19repartitionAndSortWithinPartitions算子
20sortBy算子
21sortByKey算子
22reparation算子
23coalesce算子
24cogroup算子
25join算子
26leftOuterJoin算子
27rightOuterJoin算子
28fullOuterJoin算子
29intersection算子
30subtract算子
31cartesian算子

二、代码示例

  1. package sparkCore
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
  4. object basic_transform_03 {
  5. def main(args: Array[String]): Unit = {
  6. val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
  7. val sc:SparkContext = new SparkContext(conf)
  8. sc.setLogLevel("WARN")
  9. //19.repartitionAndSortWithinPartitions:按照值的分区器进行分区,并且将数据按照指定的规则在分区内排序,底层使用的是ShuffledRDD,设置
  10. // 了指定的分区器和排序规则
  11. println("*********19.repartitionAndSortWithinPartitions算子*****")
  12. val lst1: List[(String, Int)] = List(
  13. ("spark", 1), ("spark", 1), ("Hive", 1),
  14. ("Mysql", 1), ("Java", 1), ("Python", 1),
  15. ("Mysql", 1), ("kafka", 1), ("flink", 1)
  16. )
  17. val rdd19: RDD[(String, Int)] = sc.parallelize(lst1, 4)
  18. val partitioner: HashPartitioner = new HashPartitioner(rdd19.partitions.length)
  19. // rdd19按照指定的分区器进行分区,并且每个分区内的结果按照key(spark,hive等)的字母的字典顺序进行排序
  20. val ReSortwithinPatitioner_rdd: RDD[(String, Int)] = rdd19.repartitionAndSortWithinPartitions(partitioner)
  21. println(ReSortwithinPatitioner_rdd.collect().toList)
  22. //20.sortBy
  23. println("*********20.sortBy算子*************")
  24. val lst2: List[String] = List(
  25. "maple", "maple", "kelly", "Avery",
  26. "kelly", "Jacky", "Paul", "Avery",
  27. "maple", "maple", "Paul", "Avery"
  28. )
  29. val rdd20: RDD[String] = sc.parallelize(lst2)
  30. val words: RDD[String] = rdd20.flatMap(_.split(" "))
  31. val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
  32. val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
  33. // 根据单词出现的次数,从高到低排序
  34. val sorted_rdd: RDD[(String, Int)] = reduced.sortBy(_._2, false)
  35. println(sorted_rdd.collect().toList)
  36. //21.sortByKey
  37. println("*********21.sortByKey算子*************")
  38. val sortedByKey_rdd: RDD[(Int, (String, Int))] = reduced.map(t => (t._2, t)).sortByKey(false)
  39. println(sortedByKey_rdd.collect().toList)
  40. //22.reparation:功能是重新分区,⼀定会shuffle,即将数据打散.
  41. println("*********22.reparation算子*************")
  42. val rdd22: RDD[String] = sc.parallelize(lst2, 3)
  43. //reparation方法一定会shuffle
  44. // 无论将分区数量变多、变少或不变,都会shuffle
  45. // reparation的底层调⽤的是coalesce,shuffle = true
  46. val rep_rdd: RDD[String] = rdd22.repartition(3)
  47. //23.coalesce:可以shuffle,也可以不shuffle,如果将分区数量减少,并且shuffle = false,就是将分区进⾏合并
  48. println("*********23.coalesce算子*************")
  49. val rdd23: RDD[String] = sc.parallelize(lst2,4)
  50. // 与reparation一样,必然会shuffle和重新分区
  51. val coalesce_rdd1: RDD[String] = rdd23.coalesce(4, true)
  52. println("coalesce_rdd1:",coalesce_rdd1.collect().toList)
  53. //分区减少,且shuffle为false,并不会分区
  54. val coalesce_rdd2: RDD[String] = rdd23.coalesce(2, false)
  55. println("coalesce_rdd2:",coalesce_rdd2.collect().toList)
  56. //24.cogroup:使⽤相同的分区器(HashPartitioner),将来⾃多个RDD中的key相同的数据通过⽹络传⼊到同⼀台机器的同⼀个分区中
  57. //注意:两个RDD中对应的数据都必须是对偶元组类型,并且key类型⼀定相同
  58. println("*********24.cogroup算子*************")
  59. val rdd24_1: RDD[(String, Int)] = sc.parallelize(List(("tom", 1), ("tom", 2), ("Jerry", 3), ("Paul", 3)))
  60. val rdd24_2: RDD[(String, Int)] = sc.parallelize(List(("tom", 10), ("tom", 20), ("Jacky", 3), ("Avery", 30)))
  61. val cogroup_rdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd24_1.cogroup(rdd24_2)
  62. println(cogroup_rdd.collect().toList)
  63. //25.join:相当于SQL中的内关联join
  64. println("*********25.join算子*************")
  65. val rdd25_1: RDD[(String, Int)] = sc.parallelize(List(("tom", 1), ("tom", 2), ("Jerry", 3), ("Paul", 3)))
  66. val rdd25_2: RDD[(String, Int)] = sc.parallelize(List(("tom", 10), ("tom", 20), ("Jacky", 3), ("Avery", 30)))
  67. val join_rdd: RDD[(String, (Int, Int))] = rdd25_1.join(rdd25_2)
  68. println(join_rdd.collect().toList)
  69. //26.leftOuterJoin:相当于SQL中的左外关联
  70. println("*********26.leftOuterJoin算子*************")
  71. val rdd26_1: RDD[(String, Int)] = sc.parallelize(List(("tom", 1), ("tom", 2), ("Jerry", 3), ("Paul", 3)))
  72. val rdd26_2: RDD[(String, Int)] = sc.parallelize(List(("tom", 10), ("tom", 20), ("Jacky", 3), ("Avery", 30)))
  73. val leftJoin_rdd: RDD[(String, (Int, Option[Int]))] = rdd26_1.leftOuterJoin(rdd26_2)
  74. println(leftJoin_rdd.collect().toList)
  75. //27.rightOuterJoin:相当于SQL中的右外关联
  76. println("*********27.rightOuterJoin算子*************")
  77. val rdd27_1: RDD[(String, Int)] = sc.parallelize(List(("tom", 1), ("tom", 2), ("Jerry", 3), ("Paul", 3)))
  78. val rdd27_2: RDD[(String, Int)] = sc.parallelize(List(("tom", 10), ("tom", 20), ("Jacky", 3), ("Avery", 30)))
  79. val rightJoin_rdd: RDD[(String, (Option[Int], Int))] = rdd27_1.rightOuterJoin(rdd27_2)
  80. println(rightJoin_rdd.collect().toList)
  81. //28.fullOuterJoin:相当于SQL中的全关联
  82. println("*********28.fullOuterJoin算子*************")
  83. val rdd28_1: RDD[(String, Int)] = sc.parallelize(List(("tom", 1), ("tom", 2), ("Jerry", 3), ("Paul", 3)))
  84. val rdd28_2: RDD[(String, Int)] = sc.parallelize(List(("tom", 10), ("tom", 20), ("Jacky", 3), ("Avery", 30)))
  85. val fullOutJoin_rdd: RDD[(String, (Option[Int], Option[Int]))] = rdd28_1.fullOuterJoin(rdd28_2)
  86. println(fullOutJoin_rdd.collect().toList)
  87. //29.intersection:求交集
  88. println("*********29.intersection算子*************")
  89. val rdd29_1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
  90. val rdd29_2: RDD[Int] = sc.parallelize(List(3, 4, 5,6,7))
  91. val rdd29_inter: RDD[Int] = rdd29_1.intersection(rdd29_2)
  92. println("rdd29_inter:", rdd29_inter.collect().toList)
  93. //底层实现使用的是cogroup
  94. val rdd29_11: RDD[(Int, Null)] = rdd29_1.map((_, null))
  95. val rdd29_22: RDD[(Int, Null)] = rdd29_2.map((_, null))
  96. val rdd29_co: RDD[(Int, (Iterable[Null], Iterable[Null]))] = rdd29_11.cogroup(rdd29_22)
  97. //List((1,(CompactBuffer(null),CompactBuffer())), (2,(CompactBuffer(null),CompactBuffer())), (3,(CompactBuffer(null),CompactBuffer(null))), (4,(CompactBuffer(null),CompactBuffer(null))), (5,(CompactBuffer(null),CompactBuffer(null))), (6,(CompactBuffer(),CompactBuffer(null))), (7,(CompactBuffer(),CompactBuffer(null))))
  98. println("rdd29_co:",rdd29_co.collect().toList)
  99. val rdd29_co_inter = rdd29_co.filter(it => it._2._1.nonEmpty & it._2._2.nonEmpty).keys
  100. print("rdd29_co_inter", rdd29_co_inter.collect().toList)
  101. //30.subtract:两个RDD的差集,将第⼀个RDD中的数据,如果在第⼆个RDD中出现了,就从第⼀个RDD中移除
  102. println("*********30.subtract算子*************")
  103. val rdd30_1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
  104. val rdd30_2: RDD[Int] = sc.parallelize(List(3, 4, 5,6,7))
  105. val substract_rdd: RDD[Int] = rdd30_1.subtract(rdd30_2)
  106. println(substract_rdd.collect().toList)
  107. //31.cartesian:笛卡尔积
  108. println("*********31.cartesian算子*************")
  109. val rdd31_1: RDD[String] = sc.parallelize(List("Maple", "Kelly", "Avery"))
  110. val rdd31_2: RDD[String] = sc.parallelize(List("Jerry", "Maple", "Tom"))
  111. val cartesian_rdd: RDD[(String, String)] = rdd31_1.cartesian(rdd31_2)
  112. println(cartesian_rdd.collect().toList)
  113. }
  114. }

 三、部分算子示意图

1、coalesce算子shuffle参数设置成false

2、cogroup

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/249475
推荐阅读
相关标签
  

闽ICP备14008679号