赞
踩
Flink Table 和 SQL内置了很多SQL中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。
一、系统内置函数
Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。SQL中支持的很多函数,Table API和SQL都已经做了实现,其它还在快速开发扩展中。
以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/functions/systemFunctions.html
1. 比较函数
SQL:
value1 = value2
value1 > value2
Table API:
ANY1 === ANY2
ANY1 > ANY2
2. 逻辑函数
SQL:
boolean1 OR boolean2
boolean IS FALSE
NOT boolean
Table API:
BOOLEAN1 || BOOLEAN2
BOOLEAN.isFalse
!BOOLEAN
3. 算术函数
SQL:
numeric1 + numeric2
POWER(numeric1, numeric2)
Table API:
NUMERIC1 + NUMERIC2
NUMERIC1.power(NUMERIC2)
4. 字符串函数
SQL:
string1 || string2
UPPER(string)
CHAR_LENGTH(string)
Table API:
STRING1 + STRING2
STRING.upperCase()
STRING.charLength()
5. 时间函数
SQL:
DATE string
TIMESTAMP string
CURRENT_TIME
INTERVAL string range
Table API:
STRING.toDate
STRING.toTimestamp
currentTime()
NUMERIC.days
NUMERIC.minutes
6. 聚合函数
SQL:
COUNT(*)
SUM([ ALL | DISTINCT ] expression)
RANK()
ROW_NUMBER()
Table API:
FIELD.count
FIELD.sum0
二、UDF
用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#table-functions
1. 注册用户自定义函数UDF
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的Table API注册函数。
函数通过调用registerFunction()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它。
2. 标量函数(Scalar Functions)
用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。
为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接public声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.读取元素得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流转换为动态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4.不注册函数直接使用
// table.select(call(Mylenth.class,$("id"))).execute().print();
//4.1先注册再使用
tableEnv.createTemporarySystemFunction("MyLenth", Mylenth.class);
//TableAPI
// table.select(call("MyLenth", $("id"))).execute().print();
//SQL
tableEnv.executeSql("select MyLenth(id) from "+table).print();
}
//自定义UDF函数,求数据的长度
public static class Mylenth extends ScalarFunction{
public int eval(String value) {
return value.length();
}
}
3. 表函数(Table Functions)
与用户定义的标量函数类似,用户定义的表函数,可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。
为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是public的,并命名为eval。求值方法的参数类型,决定表函数的所有有效参数。
返回表的类型由TableFunction的泛型类型确定。求值方法使用protected collect(T)方法发出输出行。
在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。
joinLateral算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。
而leftOuterJoinLateral算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。
在SQL中,则需要使用Lateral Table(<TableFunction>),或者带有ON TRUE条件的左连接。
下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.读取文件得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流转换为动态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4先注册再使用
tableEnv.createTemporarySystemFunction("split", SplitFunction.class);
//TableAPI
/* table
.joinLateral(call("split", $("id")))
.select($("id"),$("word"))
.execute()
.print();
*/
//SQL
tableEnv.executeSql("select id, word from "+table +", lateral table(split(id))").print();
}
//自定义UDTF函数将传入的id按照下划线炸裂成两条数据
//hint暗示,主要作用为类型推断时使用
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split("_")) {
collect(Row.of(s));
}
}
}
4. 聚合函数(Aggregate Functions)
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。
上图中显示了一个聚合的例子。
假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。
AggregateFunction的工作原理如下。
首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。
随后,对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。
接下来我们写一个自定义AggregateFunction,计算一下每个WaterSensor中VC的平均值。
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.读取文件得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流转换为动态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4先注册再使用
tableEnv.createTemporarySystemFunction("myavg", MyAvg.class);
//TableAPI
table.groupBy($("id"))
.select($("id"),call("myavg",$("vc")))
.execute()
.print();
//SQL
tableEnv.executeSql("select id, myavg(vc) from "+table +" group by id").print();
}
//定义一个类当做累加器,并声明总数和总个数这两个值
public static class MyAvgAccumulator{
public long sum = 0;
public int count = 0;
}
//自定义UDAF函数,求每个WaterSensor中VC的平均值
public static class MyAvg extends AggregateFunction<Double, MyAvgAccumulator> {
//创建一个累加器
@Override
public MyAvgAccumulator createAccumulator() {
return new MyAvgAccumulator();
}
//做累加操作
public void accumulate(MyAvgAccumulator acc, Integer vc) {
acc.sum += vc;
acc.count += 1;
}
//将计算结果值返回
@Override
public Double getValue(MyAvgAccumulator accumulator) {
return accumulator.sum*1D/accumulator.count;
}
}
5. 表聚合函数(Table Aggregate Functions)
用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。
比如现在我们需要找到表中所有WaterSensor的前2个最高水位线,即执行top2()表聚合。
用户定义的表聚合函数,是通过继承TableAggregateFunction抽象类来实现的。
TableAggregateFunction的工作原理如下。
首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunction的createAccumulator()方法可以创建空累加器。
随后,对每个输入行调用函数的accumulate()方法来更新累加器。
处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。
AggregationFunction要求必须实现的方法:
除了上述方法之外,还有一些可选择实现的方法。
接下来我们写一个自定义TableAggregateFunction,用来提取每个WaterSensor最高的两个水位值。
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.读取文件得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流转换为动态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4先注册再使用
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
//TableAPI
table.groupBy($("id"))
.flatAggregate(call("Top2", $("vc")).as("top", "rank"))
.select($("id"), $("top"), $("rank"))
.execute()
.print();
}
//定义一个类当做累加器,并声明第一和第二这两个值
public static class vCTop2 {
public Integer first = Integer.MIN_VALUE;
public Integer second = Integer.MIN_VALUE;
}
//自定义UDATF函数(多进多出),求每个WaterSensor中最高的两个水位值
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, vCTop2> {
//创建累加器
@Override
public vCTop2 createAccumulator() {
return new vCTop2();
}
//比较数据,如果当前数据大于累加器中存的数据则替换,并将原累加器中的数据往下(第二)赋值
public void accumulate(vCTop2 acc, Integer value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
//计算(排名)
public void emitValue(vCTop2 acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
总结
Flink提供众多系统内置函数:比较函数、逻辑函数、算术函数、字符串函数、时间函数、聚合函数。对于无法满足的需求,我们可以用UDF自定义函数解决。用户自定义聚合函数(UDA)可以把一个表中的数据,聚合成一个标量值。用户定义的表聚合函数(UDTA),可以把一个表中数据,聚合为具有多行和多列的结果表。跟聚合函数AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。