当前位置:   article > 正文

【Spark】(十一)使用UDAF(User Defined Aggregate Function)_userdefinedaggregatefunction

userdefinedaggregatefunction

一、UDAF简介

UDAF(User Defined Aggregate Function),即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。

关于UDAF的一个误区
我们可能下意识的认为UDAF是需要和group by一起使用的,实际上UDAF可以跟group by一起使用,也可以不跟group by一起使用,这个其实比较好理解,联想到mysql中的max、min等函数,可以:

select max(foo) from foobar group by bar;
  • 1

表示根据bar字段分组,然后求每个分组的最大值,这时候的分组有很多个,使用这个函数对每个分组进行处理,也可以:

select max(foo) from foobar;
  • 1

这种情况可以将整张表看做是一个分组,然后在这个分组(实际上就是一整张表)中求最大值。所以聚合函数实际上是对分组做处理,而不关心分组中记录的具体数量。

二、UDAF使用

2.1 继承UserDefinedAggregateFunction

使用UserDefinedAggregateFunction的套路:

  1. 自定义类继承UserDefinedAggregateFunction,对每个阶段方法做实现
  2. 在spark中注册UDAF,为其绑定一个名字
  3. 然后就可以在sql语句中使用上面绑定的名字调用

下面写一个计算平均值的UDAF例子,首先定义一个类继承UserDefinedAggregateFunction:

class MyAgeAvgFunction extends UserDefinedAggregateFunction{
   

  // 聚合函数的输入数据结构
  override def inputSchema: StructType = {
   
    new StructType().add("age",LongType)
//    StructType(StructField("age",LongType) :: Nil)
  }

  // 缓存区数据结构
  override def bufferSchema: StructType = {
   
//    new StructType().add("sum",LongType)
    StructType(StructField("sum",LongType) :: StructField("count",LongType) :: Nil)
  }

  // 聚合函数返回值数据结构
  override def dataType: DataType = DoubleType

  // 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
  override def deterministic: Boolean = true

  // 初始化缓冲区
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
   
    buffer(0)=0L
    buffer(1)=0L
  }

  // 给聚合函数传入一条新数据进行处理
  override def update(buffer: MutableAggregationBuffer, input: Row)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/603715
推荐阅读
相关标签
  

闽ICP备14008679号