赞
踩
API | 函数表达式 | 示例 |
---|---|---|
Table API | ===,>,<,!=,>=,<= | id===1001,age>18 |
SQL | =,>,<,!=,>=,<= | id=‘1001’,age>18 |
API | 函数表达式 | 示例 |
---|---|---|
Table API | &&,||,!,.isFalse | 1>1 && 2>1,true.isFalse |
SQL | and,or,is false,not | 1>1 and 2>1,true is false |
API | 函数表达式 | 示例 |
---|---|---|
Table API | +,-,*,/,n1.power(n2) | 1 + 1,2.power(2) |
SQL | +,-,*,/,power(n1,n2) | 1 + 1,power(2,2) |
API | 函数表达式 | 示例 |
---|---|---|
Table API | string1 + string2,.upperCase(),.lowerCase(),.charLength() | ‘a’ + ‘b’, ‘hello’.upperCase() |
SQL | string1 || string2,upper(),lower(),char_length() | ‘a’ || ‘b’,upper(‘hello’) |
API | 函数表达式 | 示例 |
---|---|---|
Table API | .toDate,.toTimestamp,currentTime(),n.days,n.minutes | ‘20230107’.toDate, 2.days,10.minutes |
SQL | Date string,Timestamp string,current_time,interval string range | Date ‘20230107’,interval ‘2’ hour/second/minute/day |
API | 函数表达式 | 示例 |
---|---|---|
Table API | .count,.sum,.sum0 | id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0 |
SQL | count(),sum(),rank(),row_number() | count(*),sum(age) |
UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用
Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出
/** 用户自定义标量函数步骤: 1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法 2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数 3.在 Table API 和 SQL 中使用 */ public class TestScalarFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> inputStream = env.readTextFile("./sensor.txt"); DataStream<SensorReading> dataStream = InputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp"); tableEnv.createTemporaryView("sensor", dataTable); //使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值 //2.创建自定义标量函数实例 HashCode hashCode = new HashCode(0.8); //3.在环境中注册函数 tableEnv.registerFunction("hashCode", hashCode); //4.使用 //4.1 Table API Table resultTable = sensorTable.select("id, ts, hashCode(id)"); //4.2 SQL Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor"); tableEnv.toAppendStream(resultTable, Row.class).print("result"); tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql"); env.execute(); } //1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法 public static class HashCode extends ScalarFunction { private int factor; public HashCode(int factor) { this.factor = factor; } //该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义 public int eval(String str) { return str.hashCode() * factor; } } }
Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出
/** 用户自定义表函数步骤: 1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法 2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数 3.在 Table API 和 SQL 中使用 */ public class TestTableFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> inputStream = env.readTextFile("./sensor.txt"); DataStream<SensorReading> dataStream = InputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp"); tableEnv.createTemporaryView("sensor", dataTable); //使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出 //2.创建自定义表函数实例 Split split = new Split("_"); //3.在环境中注册函数 tableEnv.registerFunction("split", split); //4.使用 //4.1 Table API //一进多出函数要配合 joinLateral 使用,侧写表 Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length"); //4.2 SQL //一进多出函数要进行 lateral table 的关联,类似于 lateral view Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)"); tableEnv.toAppendStream(resultTable, Row.class).print("result"); tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql"); env.execute(); } //1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer> public static class Split extends TableFunction<Tuple2<String, Integer>> { private String separator = ","; public Split(String separator) { this.separator = separator; } //必须定义一个 public 的返回值为 void 的 eval 方法 public void eval(String str) { for(String s : str.split(separator)) { //使用 collect() 方法输出结果 collect(new Tuple2<>(s, s.length())); } } } }
Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出
/** 用户自定义聚合函数步骤: 1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法 2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数 3.在 Table API 和 SQL 中使用 */ public class TestAggregateFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> inputStream = env.readTextFile("./sensor.txt"); DataStream<SensorReading> dataStream = InputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp"); tableEnv.createTemporaryView("sensor", dataTable); //使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值 //2.创建自定义聚合函数实例 AvgTemp avgTemp = new AvgTemp(); //3.在环境中注册函数 tableEnv.registerFunction("avgTemp", avgTemp); //4.使用 //4.1 Table API //聚合函数必须在 groupBy 后的 aggregate 方法使用 Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp"); //4.2 SQL Table resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id"); tableEnv.toRetractStream(resultTable, Row.class).print("result"); tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql"); env.execute(); } //1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法 public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> { @Override public Double getValue(Tuple2<Double, Integer> accumulator) { return accumulator.f0 / accumulator.f1; } @Override public Tuple2<Double, Integer> createAccumulator() { return new Tuple2<>(0.0, 0); } //必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 void public void accumulate(Tuple2<Double, Integer> accumulator, Double temp) { accumulator.f0 += temp; accumulator.f1 += 1; } } }
Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出
/** 用户自定义表聚合函数步骤: 1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法 2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数 3.在 Table API 和 SQL 中使用 */ public class TestAggregateFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> inputStream = env.readTextFile("./sensor.txt"); DataStream<SensorReading> dataStream = InputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp"); tableEnv.createTemporaryView("sensor", dataTable); //使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值 //2.创建自定义表聚合函数实例 Top2Temp top2Temp = new Top2Temp(); //3.在环境中注册函数 tableEnv.registerFunction("top2Temp", top2Temp); //4.使用 //Table API //表聚合函数必须在 groupBy 后的 flatAggregate 方法使用 Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank"); //表聚合函数目前不能在 SQL 中使用 tableEnv.toRetractStream(resultTable, Row.class).print("result"); env.execute(); } //定义一个 Accumulator public static class Top2TempAcc { private Double highestTemp = Double.MIN_VALUE; private Double secondHighestTemp = Double.MIN_VALUE; public Double getHighestTemp() { return highestTemp; } public void setHighestTemp(Double highestTemp) { this.highestTemp=highestTemp; } public Double getSecondHighestTemp() { return secondHighestTemp; } public void setSecondHighestTemp(Double secondHighestTemp) { this.secondHighestTemp=secondHighestTemp; } } //1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法 public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> { @Override public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) { out.collect(new Tuple2<>(acc.getHighestTemp(), 1)); out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2)); } @Override public Top2TempAcc createAccumulator() { return new Top2TempAcc(); } //必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 void public void accumulate(Top2TempAcc acc, Double temp) { if(acc.getHighestTemp() < temp) { acc.setSecondHighestTemp(acc.getHighestTemp()); acc.setHighestTemp(temp); } else if(acc.getSecondHighestTemp() < temp) { acc.setSecondHighestTemp(temp); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。