当前位置:   article > 正文

flink UDF/UDAF/UDTF详解_flink udaf

flink udaf

总结出UDF/UDAF/UDTF三种函数的写法,并配以代码和注释说明。

一、UDF
介绍:自定义标量函数(User Defined Scalar Function),一行输入,一行输出。

import org.apache.flink.table.functions.ScalarFunction;

/**
 *   功能:字符串转小写
 *   注册函数:create function DemoUDF as 'packagename.DemoUDF';
 *   使用案例:select DemoUDF('Abc')
 */
public class DemoUDF extends ScalarFunction {

    public DemoUDF() {}

    public String eval(String str) {
        return str.toLowerCase();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

二、UDAF
介绍:自定义聚合函数,多行输入,一行输出。

import org.apache.flink.table.functions.AggregateFunction;

/**
 -   功能:求和
 -   注册函数:create function DemoUDAF as 'packagename.DemoUDAF';
 -   使用案例:
         select student_id,DemoUDAF(score)
         from tablename
         group by student_id
 */
// AggregateFunction<聚合的最终结果类型,聚合期间的中间结果类型>
public class DemoUDAF extends AggregateFunction<Long, DemoUDAF.SumAccumulator> {

    //定义一个累加器,存放聚合的中间结果
    public static class SumAccumulator{
        public long sumPrice;
    }

    //初始化累加器
    @Override
    public SumAccumulator createAccumulator() {
        SumAccumulator sumAccumulator = new SumAccumulator();
        sumAccumulator.sumPrice=0;
        return sumAccumulator;
    }

    //根据输入,更新累加器
    @Override
    public void accumulate(SumAccumulator accumulator,Long input){
        accumulator.sumPrice += input;
    }

    //返回聚合的最终结果
    @Override
    public Long getValue(SumAccumulator accumulator) {
        return accumulator.sumPrice;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

三、UDTF
介绍:自定义<表函数>,这类函数是作用于表,而不是字段。
UDTF可以细分为3类:

  • 一行输入,多行输出
  • 一列输入,多列输出
  • 一行输入,多行多列输出

UDTF之一行输入,多行输出

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


/**
 *   功能:根据指定的分隔符,将一行变成多行
 *   注册函数:create function DemoRowMore as 'packagename.DemoRowMore';
 *   使用案例:
         select *,word
         from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word)
 */

public class DemoRowMore extends TableFunction<Row> {

    public DemoRowMore() {
    }

    // 使用注解指定输出数据的名称和类型
    // 没有返回值,使用collect收集数据,可以收集多次
    @FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
    public void eval(String data,String split){
        String[] arr = data.split(split);
        for (String s : arr) {
            collect(Row.of(s));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
数据样例:
a,b,c
d,e

Flink SQL>
select word
from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word);

Flink SQL>
a
b
c
d
e
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

UDTF之一列输入,多列输出

import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 *   功能:根据给定的分隔符,将一列变成两列
 *   注册函数:create function DemoColumnMore as 'packagename.DemoColumnMore';
 *   使用案例:
        select *,name,city
        from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city)
 */
public class DemoColumnMore extends TableFunction<Row> {

    public DemoColumnMore() {
    }

    // 使用注解指定输出数据的名称和类型
    @FunctionHint(output = @DataTypeHint("ROW<name STRING,city STRING>"))
    public void eval(String data, String split){
        String[] arr = data.split(split);
        Row row = new Row(2);
        row.setField(0,arr[0]);
        row.setField(1,arr[1]);
        collect(row);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
数据样例:
'张三,上海'

Flink SQL>
select name,city
from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city)

Flink SQL>
'张三','上海'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

创作不易,希望对你有所帮助!

一健三联就是对我最大的鼓励,笔芯~

交流加微wex997520707~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/753820
推荐阅读
相关标签
  

闽ICP备14008679号