当前位置:   article > 正文

FlinkSQL学习笔记(四)常见表查询详解与用户自定义函数

FlinkSQL学习笔记(四)常见表查询详解与用户自定义函数

写在前面

1、本篇只列举一些特殊的查询方式,掌握这些查询语句的基本使用概念即可,实际用到的时候进行查询即可。
参考目录:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/overview/
2、通过对这些例子的编写,感觉Flink相比hive中常见的查询方式,更多地从时间角度进行了更新迭代,需要注意Lookup Join和Temporal Joins区别
3、自定义函数,大致了解就行,后续用到直接套模板

1、Windowing table-valued functions (Windowing TVFs)

即:窗口表值函数
参考连接:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql/queries/window-tvf/

注:代码测试的时候使用的版本为1.14,这里不支持窗口函数的独立使用,具体使用见窗口聚合函数
以滑动窗口为例:

SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES));
  • 1
  • 2

error:
在这里插入图片描述

2、Window TVF Aggregation

即:窗口聚合

  1. 数据准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode());

DataStreamSource<String> s1 = env.socketTextStream("123.56.100.37", 9999);

SingleOutputStreamOperator<Bid> s2 = s1.map(s -> {
    String[] split = s.split(",");
    DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
    return new Bid(LocalDateTime.parse(split[0],df),Double.valueOf(split[1]),split[2],split[3]);
});

//注:这里列声明的顺序和Bid一致,watermark才生效,测试过程中需注意
tableEnvironment.createTemporaryView("Bid",s2, Schema.newBuilder()
        .column("item", DataTypes.STRING())
        .column("bidtime",DataTypes.TIMESTAMP(3))
        .column("price",DataTypes.DOUBLE())
        .column("supplier_id",DataTypes.STRING())
        .watermark("bidtime","bidtime - interval '1' second")
        .build());

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  1. hopping window aggregation
    注:每分钟,计算最近5分钟的订单交易总额
SELECT window_start, window_end, SUM(price) AS sum_price
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
  GROUP BY window_start, window_end;
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述
其他:
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)):每隔10分钟计算最近10分钟的price总和
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)):每隔10分钟,计算一个窗口里面每2分钟累积的price综合

  1. GROUPING SETS
SELECT window_start, window_end, supplier_id, SUM(price) as price
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

3、Window Top-N

语法要义:在 TVF上使用 row_number() over()

  1. Window Top-N follows after Window Aggregation
    注:类似分组TopN,不过这里【window_start、window_end】就充当了组的定义
SELECT * 
  FROM (
		SELECT window_start, 
				window_end, 
				supplier_id, 
		SUM(price) as price, 
			COUNT(*) as cnt,
		ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY SUM(price) DESC) as rownum
		FROM TABLE(
			TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
		GROUP BY window_start, window_end, supplier_id
	)
	WHERE rownum <= 3;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在这里插入图片描述

  1. Window Top-N follows after Windowing TVF
    注:直接跟在TVF后面,详细查看TVF使用error的说明
SELECT *
  FROM (
    SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
    FROM TABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  ) WHERE rownum <= 3;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在这里插入图片描述

4、Window Join

参考连接:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-join/

  1. 测试环境构建核心环境
DataStreamSource<String> left1 = env.socketTextStream("123.56.100.37", 9999);

SingleOutputStreamOperator<DataTest> left2 = left1.map(s -> {
    String[] split = s.split(",");
    DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
    return new DataTest(LocalDateTime.parse(split[0],df),Integer.valueOf(split[1]),split[2]);
});


DataStreamSource<String> right1 = env.socketTextStream("123.56.100.37", 9998);

SingleOutputStreamOperator<DataTest> right2 = right1.map(s -> {
    String[] split = s.split(",");
    DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
    return new DataTest(LocalDateTime.parse(split[0],df),Integer.valueOf(split[1]),split[2]);
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

在这里插入图片描述

  1. INNER/LEFT/RIGHT/FULL OUTER
    测试代码:
SELECT L.num as L_Num, 
       L.id as L_Id, 
	   R.num as R_Num, 
	   R.id as R_Id,
	   COALESCE(L.window_start, R.window_start) as window_start,
	   COALESCE(L.window_end, R.window_end) as window_end
   FROM (
	   SELECT * 
	     FROM 
		TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
   ) L
   FULL JOIN (
	   SELECT * 
	     FROM 
		TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
   ) R
   ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

测试结果:
在这里插入图片描述

  1. SEMI JOIN
    本质上是通过select ... in (...sql...)实现的
    测试代码:
SELECT num,id,row_time,window_start,window_end
  FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L 
  WHERE L.num IN (
		SELECT num FROM 
	   (SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) R 
 WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

测试结果:
在这里插入图片描述

error:
参考连接:

5、Join

常规 join的实现逻辑所在类:
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator

常规 join,flink底层是会对两个参与 join的输入流中的数据进行状态存储的;所以,随着时间的推进,状态中的数据量会持续膨胀,可能会导致过于庞大,从而降低系统的整体效
率;
可以如何去缓解:自己根据自己业务系统数据特性(估算能产生关联的左表数据和右表数据到达的最
大时间差),根据这个最大时间差,去设置 ttl时长

配置参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/config/
在这里插入图片描述

5.1、Regular Joins

执行语句:

SELECT * FROM LeftTable LEFT JOIN RightTable ON LeftTable.id = RightTable.id;
  • 1

执行结果:
在这里插入图片描述

5.2、Lookup Join

在这里插入图片描述
查找联接通常用于使用从外部系统查询的数据来扩充表。联接要求一个表具有处理时间属性,另一个表由查找源连接器提供支持。查找联接使用上述处理时临时联接语法,以及由查找源连接器支持的正确表。

lookup join为了提高性能,lookup的连接器会将查询过的维表数据进行缓存(默认未开启此机制),可以通过参数开启,比如 jdbc-connector的 lookup模式下,有如下参数:

  • lookup.cache.max-rows = (none) 未开启
  • lookup.cache.ttl = (none) ttl缓存清除的时长

执行语句:

  1. 创建SQL表,创建 lookup维表(jdbc connector表)
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://123.56.100.37:3306/flinktest',
  'table-name' = 'customers',
  'username' = 'root',
  'password' = '123456'
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 执行lookup join
SELECT o.order_id, o.total, c.country, c.zip
  FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id
  • 1
  • 2
  • 3
  • 4

执行结果:
在这里插入图片描述

5.3、Interval Joins

举例说明:比如广告曝光流广告观看事件流的 join
对于这种join,目前并没有实际遇到过,因此这里只给出相关限制条件,具体用到后可以参考官网。

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
  • ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND

5.4、Temporal Joins

要义: 左表的数据永远去关联右表数据的对应时间上的版本
注:Flink会根据版本表的记录自动推测在对于位置上的时间,可以推测成功即关联上;这里以Event Time Temporal Join举例说明

  1. 创建order表
tableEnvironment.createTemporaryView("orders",s2, Schema.newBuilder()
        .column("order_id", DataTypes.STRING())
        .column("price", DataTypes.DECIMAL(32,2))
        .column("currency",DataTypes.STRING())
        .column("order_time",DataTypes.TIMESTAMP(3))
        .watermark("order_time","order_time - INTERVAL '0' SECOND")
        .build());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 创建汇率表,即版本表
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '0' SECOND,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
	'connector' = 'mysql-cdc',
	'hostname' = '123.56.100.37',
	'port' = '3306',
	'username' = 'root',
	'password' = '123456',
	'database-name' = 'flinktest',
	'table-name' = 'currency_rates',
	'server-time-zone' = 'Asia/Shanghai'
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. join语句
SELECT order_id,	
	   price,
	   orders.currency,
	   conversion_rate,
	   order_time,
       update_time
  FROM orders
  LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
    ON orders.currency = currency_rates.currency
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. 运行测试
    12:10分的order来的时候,currency_rates的汇率为1.20,时间戳位12:12,因此order对于12:10的记录无法找到对应汇率值;12:14分的order来的时候,currency_rates汇率已经改为1.22,时间戳更新为12:16,因此推测,12:14分order的汇率应该还是1.20
    在这里插入图片描述

5.5、Array Expansion(CROSS JOIN)

为给定数组中的每个元素返回一个新行(数组的行转列)

  1. 测试数据
Table table = tableEnvironment.fromValues(DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.INT()),
        DataTypes.FIELD("name", DataTypes.STRING()),
        DataTypes.FIELD("phone_numbers", DataTypes.ARRAY(DataTypes.STRING()))),
        Row.of(1, "zs", Expressions.array("137", "138", "125")),
        Row.of(2, "ls", Expressions.array("159", "126", "199")));
        
tableEnvironment.createTemporaryView("phone_table",table);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. SQL语句
SELECT id,name,phone_numbers,phone_number 
  from phone_table 
 CROSS JOIN UNNEST(phone_numbers) AS t(phone_number)
  • 1
  • 2
  • 3

测试结果:
在这里插入图片描述

6、Over Aggregation

row_number( ) over ( )
flinksql中,over聚合时,指定聚合数据区间有两种方式

  • 方式 1,带时间设定区间
    RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
  • 方式 2,按行设定区间
    ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

over函数类似在hive中的使用,这里不列举了,给出一个简单的SQL语法格式,详细参考官网

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

7、User-defined Functions

7.1、Scalar Functions

标量函数,特点:每次只接收一行数据,输出结果也是1行1列

  1. 继承ScalarFunction,实现eval方法
public static class MyLower extends ScalarFunction{
    public String eval(String input){
        return  input.toLowerCase();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  1. 注册函数
tableEnvironment.createTemporaryFunction("myLower",MyLower.class);
  • 1
  1. 执行SQL语句
select *,myLower(currency) as myLower  from currency_rates
  • 1
  1. 执行结果
    在这里插入图片描述

7.2、Aggregate Functions

聚合函数,特点:对输入的数据行(一组)进行持续的聚合,最终对每组数据输出一行(多列)结果

  1. 自定义累加器
public static class MyAccumulator{
    public int count;
    public int  sum;
}
  • 1
  • 2
  • 3
  • 4
  1. 继承AggregateFunction,至少实现下面三个方法
  • createAccumulator()
  • accumulate()
  • getValue()
public static class MyAvg extends AggregateFunction<Double,MyAccumulator> {


    // Creates and init the Accumulator for this (table)aggregate function.
    @Override
    public MyAccumulator createAccumulator() {
        MyAccumulator MyAccumulator = new MyAccumulator();
        MyAccumulator.count = 0;
        MyAccumulator.sum = 0;
        return MyAccumulator;
    }



    // rocesses the input values and update the provided accumulator instance
    public void accumulate(MyAccumulator acc, Double rate) {
        acc.sum += rate;
        acc.count += 1;
    }

    // Retracts the input values from the accumulator instance. 
    public void retract(MyAccumulator acc, Double rate) {
        acc.sum -= rate;
        acc.count -= 1;
    }

    //Called every time when an aggregation result should be materialized.
    @Override
    public Double getValue(MyAccumulator accumulator) {
        return accumulator.sum/accumulator.count;
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  1. 执行SQL语句
    注:这里用Flink-CDC提示,必须实现retract函数,因此上面加上了;实际证明这个函数可以帮助我们记录group by结果的变化情况
select place,MyAvg(conversion_rate) as MyAvg  from currency_rates group by place
  • 1
  1. 执行结果
    在这里插入图片描述

7.3、Table Functions

表值函数,特点:运行时每接收一行数据(一个或多个字段),能产出多行、多列的结果;如:explode(), unnest()

注:其实这种方式就相当于产出一张表,Flink中对于数组类型的行转列可以参考5.5、Array Expansion

  1. 自定义表值函数,分割字符串;并注册函数
@FunctionHint(output = @DataTypeHint("ROW<phoneNumber STRING, phoneLength INT>"))
public static class MySplitFunction extends TableFunction<Row> {

    public void eval(String str) {
        for (String phone : str.split(",")) {
            // use collect(...) to emit a row
            collect(Row.of(phone, phone.length()));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 编写SQL语句
SELECT id,name,phone_numbers,phone_number,phone_length
  FROM phone_table 
  LEFT JOIN LATERAL TABLE(MySplitFunction(phone_numbers)) AS T(phone_number, phone_length) ON TRUE
  • 1
  • 2
  • 3

运行结果:
在这里插入图片描述

7.4、Table Aggregate Functions

表值聚合函数,特点:对输入的数据行(一组)进行持续的聚合,最终对每组数据输出一行或多行(多列)结果,目前不支持SQL语法,只能通过tableAPI实现

下面以分组top2为例说明实现过程,相比传统的分组Top2,这种方式可以带出更多的字段

  1. 创建测试数据
Table table = tableEnvironment.fromValues(DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.INT()),
        DataTypes.FIELD("gender", DataTypes.STRING()),
        DataTypes.FIELD("score", DataTypes.DOUBLE())),
        Row.of(1, "male", "98"),
        Row.of(2, "male", "76"),
        Row.of(3, "male", "99"),
        Row.of(4, "female", "44"),
        Row.of(5, "female", "76"),
        Row.of(6, "female", "86")

);
tableEnvironment.createTemporaryView("stu_score",table);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 自定义Top2函数,即表值聚合函数
public static  class Top2Accum {
    //emitValue()根据Top2Accum数据提交,因此如果要整行数据,Top2Accum需要定义为Row类型
    public @DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>")Row first;
    public @DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>")Row second;
}

//这里groupBy的gender会一直带出来,输出的时候也会检查字段是否重复,需要将和groupBy相关的字段重命名
@FunctionHint(input =@DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>"),output = @DataTypeHint("ROW<id INT, rgender STRING, score DOUBLE,rank INT>"))
public static  class  Top2 extends TableAggregateFunction<Row,Top2Accum>{

    @Override
    public Top2Accum createAccumulator() {
        Top2Accum top2Accum = new Top2Accum();
        top2Accum.first  = null;
        top2Accum.second = null;

        return top2Accum;
    }


    //这些需要封装一整个Row给Top2Accum,因此传入Row
    public void accumulate(Top2Accum acc, Row row) {
        Double score = (Double) row.getField(2);

        if (acc.first == null ||score > (double)acc.first.getField(2)) {
            acc.second = acc.first;
            acc.first = row;
        } else if (acc.second == null || score > (double)acc.second.getField(2)) {
            acc.second = row;
        }
    }

    public void merge(Top2Accum acc, Iterable<Top2Accum> iterable) {
        for (Top2Accum otherAcc : iterable) {
            accumulate(acc, otherAcc.first);
            accumulate(acc, otherAcc.second);
        }
    }

    public void emitValue(Top2Accum acc, Collector<Row> out) {
        // emit the value and rank
        if (acc.first != null) {
            out.collect(Row.of(acc.first.getField(0),acc.first.getField(1),acc.first.getField(2),1));
        }
        if (acc.second !=null) {
            out.collect(Row.of(acc.second.getField(0),acc.second.getField(1),acc.second.getField(2),2));
        }
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  1. 通过TableAPI编程
table.groupBy($("gender"))
     .flatAggregate(call(Top2.class, row($("id"), $("gender"), $("score"))))
     .select($("id"),$("rgender"), $("score"),$("rank"))
     .execute().print();
  • 1
  • 2
  • 3
  • 4
  1. 测试结果
    在这里插入图片描述
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/639089
推荐阅读
相关标签
  

闽ICP备14008679号