当前位置:   article > 正文

spark aggregate函数

java spark seq aggregate 2.12

aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

def aggregate[U : ClassTag](zeroValue : U)(seqOp : (U, T) = > U, combOp : (U, U) = > U) : U
 
注意:
1.每个分区开始聚合第一个元素都是zeroValue
2.分区之间的聚合,zeroValue也参与运算
 
scala> val rdd = sc.parallelize(List(18,28,7,66,-19,100,29,55,4),3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

//先对分区内的元素进行聚合;
scala> def InnerCom(a:Int, b:Int) : Int = {
     | println("InnerCom: " + a + " : " + b)
     | math.min(a,b)
     | }
InnerCom: (a: Int, b: Int)Int

//对聚合后的分区之间进行聚合
scala> def partitionCom(a:Int, b:Int): Int = {
     | println("partitionCom: " + a + " : " + b)
     | a + b
     | }
partitionCom: (a: Int, b: Int)Int

//3个分区,min(分区1)=7,min(分区2)=-19,min(分区1)=4
//50 + 7 + -19 + 4 = 42
scala> rdd.aggregate(50)(InnerCom,partitionCom)
InnerCom: 50 : 18
InnerCom: 18 : 28
InnerCom: 18 : 7
partitionCom: 50 : 7
InnerCom: 50 : 66
InnerCom: 50 : -19
InnerCom: -19 : 100
partitionCom: 57 : -19
InnerCom: 50 : 29
InnerCom: 29 : 55
InnerCom: 29 : 4
partitionCom: 38 : 4
res5: Int = 42

 

转载于:https://www.cnblogs.com/gaohuajie/p/7494894.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/736382
推荐阅读
相关标签
  

闽ICP备14008679号