赞
踩
总结出UDF/UDAF/UDTF三种函数的写法,并配以代码和注释说明。
一、UDF
介绍:自定义标量函数(User Defined Scalar Function),一行输入,一行输出。
import org.apache.flink.table.functions.ScalarFunction;
/**
* 功能:字符串转小写
* 注册函数:create function DemoUDF as 'packagename.DemoUDF';
* 使用案例:select DemoUDF('Abc')
*/
public class DemoUDF extends ScalarFunction {
public DemoUDF() {}
public String eval(String str) {
return str.toLowerCase();
}
}
二、UDAF
介绍:自定义聚合函数,多行输入,一行输出。
import org.apache.flink.table.functions.AggregateFunction; /** - 功能:求和 - 注册函数:create function DemoUDAF as 'packagename.DemoUDAF'; - 使用案例: select student_id,DemoUDAF(score) from tablename group by student_id */ // AggregateFunction<聚合的最终结果类型,聚合期间的中间结果类型> public class DemoUDAF extends AggregateFunction<Long, DemoUDAF.SumAccumulator> { //定义一个累加器,存放聚合的中间结果 public static class SumAccumulator{ public long sumPrice; } //初始化累加器 @Override public SumAccumulator createAccumulator() { SumAccumulator sumAccumulator = new SumAccumulator(); sumAccumulator.sumPrice=0; return sumAccumulator; } //根据输入,更新累加器 @Override public void accumulate(SumAccumulator accumulator,Long input){ accumulator.sumPrice += input; } //返回聚合的最终结果 @Override public Long getValue(SumAccumulator accumulator) { return accumulator.sumPrice; } }
三、UDTF
介绍:自定义<表函数>,这类函数是作用于表,而不是字段。
UDTF可以细分为3类:
UDTF之一行输入,多行输出
import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; /** * 功能:根据指定的分隔符,将一行变成多行 * 注册函数:create function DemoRowMore as 'packagename.DemoRowMore'; * 使用案例: select *,word from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word) */ public class DemoRowMore extends TableFunction<Row> { public DemoRowMore() { } // 使用注解指定输出数据的名称和类型 // 没有返回值,使用collect收集数据,可以收集多次 @FunctionHint(output = @DataTypeHint("ROW<word STRING>")) public void eval(String data,String split){ String[] arr = data.split(split); for (String s : arr) { collect(Row.of(s)); } } }
数据样例:
a,b,c
d,e
Flink SQL>
select word
from tablename,LATERAL TABLE(DemoRowMore(fieldname,',')) as T(word);
Flink SQL>
a
b
c
d
e
UDTF之一列输入,多列输出
import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; /** * 功能:根据给定的分隔符,将一列变成两列 * 注册函数:create function DemoColumnMore as 'packagename.DemoColumnMore'; * 使用案例: select *,name,city from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city) */ public class DemoColumnMore extends TableFunction<Row> { public DemoColumnMore() { } // 使用注解指定输出数据的名称和类型 @FunctionHint(output = @DataTypeHint("ROW<name STRING,city STRING>")) public void eval(String data, String split){ String[] arr = data.split(split); Row row = new Row(2); row.setField(0,arr[0]); row.setField(1,arr[1]); collect(row); } }
数据样例:
'张三,上海'
Flink SQL>
select name,city
from tablename,LATERAL TABLE(DemoColumnMore(fieldname,',')) as T(name,city)
Flink SQL>
'张三','上海'
创作不易,希望对你有所帮助!
一健三联就是对我最大的鼓励,笔芯~
交流加微wex997520707~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。