赞
踩
UDAF(User Defined Aggregate Function),即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。
关于UDAF的一个误区
我们可能下意识的认为UDAF是需要和group by一起使用的,实际上UDAF可以跟group by一起使用,也可以不跟group by一起使用,这个其实比较好理解,联想到mysql中的max、min等函数,可以:
select max(foo) from foobar group by bar;
表示根据bar字段分组,然后求每个分组的最大值,这时候的分组有很多个,使用这个函数对每个分组进行处理,也可以:
select max(foo) from foobar;
这种情况可以将整张表看做是一个分组,然后在这个分组(实际上就是一整张表)中求最大值。所以聚合函数实际上是对分组做处理,而不关心分组中记录的具体数量。
使用UserDefinedAggregateFunction的套路:
下面写一个计算平均值的UDAF例子,首先定义一个类继承UserDefinedAggregateFunction:
class MyAgeAvgFunction extends UserDefinedAggregateFunction{ // 聚合函数的输入数据结构 override def inputSchema: StructType = { new StructType().add("age",LongType) // StructType(StructField("age",LongType) :: Nil) } // 缓存区数据结构 override def bufferSchema: StructType = { // new StructType().add("sum",LongType) StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil) } // 聚合函数返回值数据结构 override def dataType: DataType = DoubleType // 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出 override def deterministic: Boolean = true // 初始化缓冲区 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0)=0L buffer(1)=0L } // 给聚合函数传入一条新数据进行处理 override def update(buffer: MutableAggregationBuffer, input: Row)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。