当前位置:   article > 正文

详解 Flink Table API 和 Flink SQL 之函数

详解 Flink Table API 和 Flink SQL 之函数

一、系统内置函数

1. 比较函数

API函数表达式示例
Table API===,>,<,!=,>=,<=id===1001,age>18
SQL=,>,<,!=,>=,<=id=‘1001’,age>18

2. 逻辑函数

API函数表达式示例
Table API&&,||,!,.isFalse1>1 && 2>1,true.isFalse
SQLand,or,is false,not1>1 and 2>1,true is false

3. 算术函数

API函数表达式示例
Table API+,-,*,/,n1.power(n2)1 + 1,2.power(2)
SQL+,-,*,/,power(n1,n2)1 + 1,power(2,2)

4. 字符串函数

API函数表达式示例
Table APIstring1 + string2,.upperCase(),.lowerCase(),.charLength()‘a’ + ‘b’,
‘hello’.upperCase()
SQLstring1 || string2,upper(),lower(),char_length()‘a’ || ‘b’,upper(‘hello’)

5. 时间函数

API函数表达式示例
Table API.toDate,.toTimestamp,currentTime(),n.days,n.minutes‘20230107’.toDate,
2.days,10.minutes
SQLDate string,Timestamp string,current_time,interval string rangeDate ‘20230107’,interval ‘2’ hour/second/minute/day

6. 聚合函数

API函数表达式示例
Table API.count,.sum,.sum0id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0
SQLcount(),sum(),rank(),row_number()count(*),sum(age)

二、用户自定义函数(UDF)

UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用

1. 标量函数

Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出

/**
	用户自定义标量函数步骤:
	1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法
	2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
	3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        DataStream<String> inputStream = env.readTextFile("./sensor.txt");
        
        DataStream<SensorReading> dataStream = InputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
        
        tableEnv.createTemporaryView("sensor", dataTable);
        
        //使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值
        //2.创建自定义标量函数实例
        HashCode hashCode = new HashCode(0.8);
        
        //3.在环境中注册函数
        tableEnv.registerFunction("hashCode", hashCode);
        
        //4.使用
        //4.1 Table API
        Table resultTable = sensorTable.select("id, ts, hashCode(id)");
        
        //4.2 SQL
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");
        
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
        
        env.execute();
    }
    
    //1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法
    public static class HashCode extends ScalarFunction {
        private int factor;
        
        public HashCode(int factor) {
            this.factor = factor;
        }
        
        //该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义
        public int eval(String str) {
            return str.hashCode() * factor;
        }
        
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

2. 表函数

Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出

/**
	用户自定义表函数步骤:
	1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法
	2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
	3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        DataStream<String> inputStream = env.readTextFile("./sensor.txt");
        
        DataStream<SensorReading> dataStream = InputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
        
        tableEnv.createTemporaryView("sensor", dataTable);
        
        //使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出
        //2.创建自定义表函数实例
        Split split = new Split("_");
        
        //3.在环境中注册函数
        tableEnv.registerFunction("split", split);
        
        //4.使用
        //4.1 Table API
        //一进多出函数要配合 joinLateral 使用,侧写表
        Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");
        
        //4.2 SQL
        //一进多出函数要进行 lateral table 的关联,类似于 lateral view
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");
        
        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
        
        env.execute();
    }
    
    //1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>
    public static class Split extends TableFunction<Tuple2<String, Integer>> {
        private String separator = ",";
        
        public Split(String separator) {
            this.separator = separator;
        }
        
        //必须定义一个 public 的返回值为 void 的 eval 方法
        public void eval(String str) {
            for(String s : str.split(separator)) {
                //使用 collect() 方法输出结果
                collect(new Tuple2<>(s, s.length()));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

3. 聚合函数

Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出

/**
	用户自定义聚合函数步骤:
	1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法
	2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
	3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        DataStream<String> inputStream = env.readTextFile("./sensor.txt");
        
        DataStream<SensorReading> dataStream = InputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
        
        tableEnv.createTemporaryView("sensor", dataTable);
        
        //使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值
        //2.创建自定义聚合函数实例
        AvgTemp avgTemp = new AvgTemp();
        
        //3.在环境中注册函数
        tableEnv.registerFunction("avgTemp", avgTemp);
        
        //4.使用
        //4.1 Table API
        //聚合函数必须在 groupBy 后的 aggregate 方法使用
        Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");
        
        //4.2 SQL
        Table resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");
        
        tableEnv.toRetractStream(resultTable, Row.class).print("result");
        tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");
        
        env.execute();
    }
    
    //1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法
    public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {
        
        @Override
        public Double getValue(Tuple2<Double, Integer> accumulator) {
            return accumulator.f0 / accumulator.f1;
        }
        
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0, 0);
        }
        
        //必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 void
        public void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {
            accumulator.f0 += temp;
            accumulator.f1 += 1;
        }
        
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

4. 表聚合函数

Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出

/**
	用户自定义表聚合函数步骤:
	1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法
	2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
	3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        DataStream<String> inputStream = env.readTextFile("./sensor.txt");
        
        DataStream<SensorReading> dataStream = InputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
        
        tableEnv.createTemporaryView("sensor", dataTable);
        
        //使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值
        //2.创建自定义表聚合函数实例
        Top2Temp top2Temp = new Top2Temp();
        
        //3.在环境中注册函数
        tableEnv.registerFunction("top2Temp", top2Temp);
        
        //4.使用
        //Table API
        //表聚合函数必须在 groupBy 后的 flatAggregate 方法使用
        Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");
        
        //表聚合函数目前不能在 SQL 中使用
        
        tableEnv.toRetractStream(resultTable, Row.class).print("result");
        
        env.execute();
    }
    
    //定义一个 Accumulator
    public static class Top2TempAcc {
        private Double highestTemp = Double.MIN_VALUE;
        private Double secondHighestTemp = Double.MIN_VALUE;
        
        public Double getHighestTemp() {
            return highestTemp;
        }
        
        public void setHighestTemp(Double highestTemp) {
            this.highestTemp=highestTemp;
        }
        
        public Double getSecondHighestTemp() {
            return secondHighestTemp;
        }
        
        public void setSecondHighestTemp(Double secondHighestTemp) {
            this.secondHighestTemp=secondHighestTemp;
        }
    }
    
    //1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法
    public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {
        
        @Override
        public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {
            out.collect(new Tuple2<>(acc.getHighestTemp(), 1));
            out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));
        }
        
        @Override
        public Top2TempAcc createAccumulator() {
            return new Top2TempAcc();
        }
        
        //必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 void
        public void accumulate(Top2TempAcc acc, Double temp) {
            if(acc.getHighestTemp() < temp) {
                acc.setSecondHighestTemp(acc.getHighestTemp());
                acc.setHighestTemp(temp);
            } else if(acc.getSecondHighestTemp() < temp) {
                acc.setSecondHighestTemp(temp);
            }
        }
        
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/716296
推荐阅读
相关标签
  

闽ICP备14008679号