赞
踩
1、本篇只列举一些特殊的查询方式,掌握这些查询语句的基本使用概念即可,实际用到的时候进行查询即可。
参考目录:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/overview/
2、通过对这些例子的编写,感觉Flink相比hive中常见的查询方式,更多地从时间角度进行了更新迭代,需要注意Lookup Join和Temporal Joins区别
3、自定义函数,大致了解就行,后续用到直接套模板
即:窗口表值函数
参考连接:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql/queries/window-tvf/
注:代码测试的时候使用的版本为1.14,这里不支持窗口函数的独立使用,具体使用见窗口聚合函数
以滑动窗口为例:
SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz), INTERVAL '10' MINUTES));
error:
即:窗口聚合
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode()); DataStreamSource<String> s1 = env.socketTextStream("123.56.100.37", 9999); SingleOutputStreamOperator<Bid> s2 = s1.map(s -> { String[] split = s.split(","); DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"); return new Bid(LocalDateTime.parse(split[0],df),Double.valueOf(split[1]),split[2],split[3]); }); //注:这里列声明的顺序和Bid一致,watermark才生效,测试过程中需注意 tableEnvironment.createTemporaryView("Bid",s2, Schema.newBuilder() .column("item", DataTypes.STRING()) .column("bidtime",DataTypes.TIMESTAMP(3)) .column("price",DataTypes.DOUBLE()) .column("supplier_id",DataTypes.STRING()) .watermark("bidtime","bidtime - interval '1' second") .build());
SELECT window_start, window_end, SUM(price) AS sum_price
FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
GROUP BY window_start, window_end;
其他:
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
:每隔10分钟计算最近10分钟的price总和
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
:每隔10分钟,计算一个窗口里面每2分钟累积的price综合
SELECT window_start, window_end, supplier_id, SUM(price) as price
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
语法要义:在 TVF上使用 row_number() over()
分组TopN
,不过这里【window_start、window_end】就充当了组的定义SELECT *
FROM (
SELECT window_start,
window_end,
supplier_id,
SUM(price) as price,
COUNT(*) as cnt,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY SUM(price) DESC) as rownum
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, supplier_id
)
WHERE rownum <= 3;
SELECT *
FROM (
SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
) WHERE rownum <= 3;
参考连接:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-join/
DataStreamSource<String> left1 = env.socketTextStream("123.56.100.37", 9999); SingleOutputStreamOperator<DataTest> left2 = left1.map(s -> { String[] split = s.split(","); DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"); return new DataTest(LocalDateTime.parse(split[0],df),Integer.valueOf(split[1]),split[2]); }); DataStreamSource<String> right1 = env.socketTextStream("123.56.100.37", 9998); SingleOutputStreamOperator<DataTest> right2 = right1.map(s -> { String[] split = s.split(","); DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"); return new DataTest(LocalDateTime.parse(split[0],df),Integer.valueOf(split[1]),split[2]); });
SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, COALESCE(L.window_start, R.window_start) as window_start, COALESCE(L.window_end, R.window_end) as window_end FROM ( SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) L FULL JOIN ( SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)) ) R ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
测试结果:
select ... in (...sql...)
实现的SELECT num,id,row_time,window_start,window_end
FROM (SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) L
WHERE L.num IN (
SELECT num FROM
(SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))) R
WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
测试结果:
error:
参考连接:
常规 join的实现逻辑所在类:
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator
常规 join,flink底层是会对两个参与 join的输入流中的数据进行状态存储的;所以,随着时间的推进,状态中的数据量会持续膨胀,可能会导致过于庞大,从而降低系统的整体效
率;
可以如何去缓解:自己根据自己业务系统数据特性(估算能产生关联的左表数据和右表数据到达的最
大时间差),根据这个最大时间差,去设置 ttl时长
配置参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/config/
执行语句:
SELECT * FROM LeftTable LEFT JOIN RightTable ON LeftTable.id = RightTable.id;
执行结果:
查找联接通常用于使用从外部系统查询的数据来扩充表。联接要求一个表具有处理时间属性,另一个表由查找源连接器提供支持。查找联接使用上述处理时临时联接语法,以及由查找源连接器支持的正确表。
lookup join
为了提高性能,lookup的连接器会将查询过的维表数据进行缓存(默认未开启此机制),可以通过参数开启,比如 jdbc-connector的 lookup模式下,有如下参数:
执行语句:
创建 lookup维表(jdbc connector表)
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://123.56.100.37:3306/flinktest',
'table-name' = 'customers',
'username' = 'root',
'password' = '123456'
)
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id
执行结果:
举例说明:比如广告曝光流和广告观看事件流的 join
对于这种join,目前并没有实际遇到过,因此这里只给出相关限制条件,具体用到后可以参考官网。
要义: 左表的数据永远去关联右表数据的对应时间上的版本
注:Flink会根据版本表
的记录自动推测在对于位置上的时间,可以推测成功即关联上;这里以Event Time Temporal Join
举例说明
tableEnvironment.createTemporaryView("orders",s2, Schema.newBuilder()
.column("order_id", DataTypes.STRING())
.column("price", DataTypes.DECIMAL(32,2))
.column("currency",DataTypes.STRING())
.column("order_time",DataTypes.TIMESTAMP(3))
.watermark("order_time","order_time - INTERVAL '0' SECOND")
.build());
CREATE TABLE currency_rates ( currency STRING, conversion_rate DECIMAL(32, 2), update_time TIMESTAMP(3), WATERMARK FOR update_time AS update_time - INTERVAL '0' SECOND, PRIMARY KEY(currency) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '123.56.100.37', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'flinktest', 'table-name' = 'currency_rates', 'server-time-zone' = 'Asia/Shanghai' );
SELECT order_id,
price,
orders.currency,
conversion_rate,
order_time,
update_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency
为给定数组中的每个元素返回一个新行(数组的行转列)
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("phone_numbers", DataTypes.ARRAY(DataTypes.STRING()))),
Row.of(1, "zs", Expressions.array("137", "138", "125")),
Row.of(2, "ls", Expressions.array("159", "126", "199")));
tableEnvironment.createTemporaryView("phone_table",table);
SELECT id,name,phone_numbers,phone_number
from phone_table
CROSS JOIN UNNEST(phone_numbers) AS t(phone_number)
测试结果:
row_number( ) over ( )
flinksql中,over聚合时,指定聚合数据区间有两种方式
RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
over函数类似在hive中的使用,这里不列举了,给出一个简单的SQL语法格式,详细参考官网
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
标量函数,特点:每次只接收一行数据,输出结果也是1行1列
public static class MyLower extends ScalarFunction{
public String eval(String input){
return input.toLowerCase();
}
}
tableEnvironment.createTemporaryFunction("myLower",MyLower.class);
select *,myLower(currency) as myLower from currency_rates
聚合函数,特点:对输入的数据行(一组)进行持续的聚合,最终对每组数据输出一行(多列)结果
public static class MyAccumulator{
public int count;
public int sum;
}
至少实现下面三个方法
public static class MyAvg extends AggregateFunction<Double,MyAccumulator> { // Creates and init the Accumulator for this (table)aggregate function. @Override public MyAccumulator createAccumulator() { MyAccumulator MyAccumulator = new MyAccumulator(); MyAccumulator.count = 0; MyAccumulator.sum = 0; return MyAccumulator; } // rocesses the input values and update the provided accumulator instance public void accumulate(MyAccumulator acc, Double rate) { acc.sum += rate; acc.count += 1; } // Retracts the input values from the accumulator instance. public void retract(MyAccumulator acc, Double rate) { acc.sum -= rate; acc.count -= 1; } //Called every time when an aggregation result should be materialized. @Override public Double getValue(MyAccumulator accumulator) { return accumulator.sum/accumulator.count; } }
select place,MyAvg(conversion_rate) as MyAvg from currency_rates group by place
表值函数,特点:运行时每接收一行数据(一个或多个字段),能产出多行、多列的结果;如:explode(), unnest()
注:其实这种方式就相当于产出一张表,Flink中对于数组类型的行转列可以参考5.5、Array Expansion
并注册函数
@FunctionHint(output = @DataTypeHint("ROW<phoneNumber STRING, phoneLength INT>"))
public static class MySplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String phone : str.split(",")) {
// use collect(...) to emit a row
collect(Row.of(phone, phone.length()));
}
}
}
SELECT id,name,phone_numbers,phone_number,phone_length
FROM phone_table
LEFT JOIN LATERAL TABLE(MySplitFunction(phone_numbers)) AS T(phone_number, phone_length) ON TRUE
运行结果:
表值聚合函数,特点:对输入的数据行(一组)进行持续的聚合,最终对每组数据输出一行或多行(多列)结果,目前不支持SQL语法,只能通过tableAPI实现
下面以分组top2
为例说明实现过程,相比传统的分组Top2,这种方式可以带出更多的字段
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("gender", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())),
Row.of(1, "male", "98"),
Row.of(2, "male", "76"),
Row.of(3, "male", "99"),
Row.of(4, "female", "44"),
Row.of(5, "female", "76"),
Row.of(6, "female", "86")
);
tableEnvironment.createTemporaryView("stu_score",table);
public static class Top2Accum { //emitValue()根据Top2Accum数据提交,因此如果要整行数据,Top2Accum需要定义为Row类型 public @DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>")Row first; public @DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>")Row second; } //这里groupBy的gender会一直带出来,输出的时候也会检查字段是否重复,需要将和groupBy相关的字段重命名 @FunctionHint(input =@DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>"),output = @DataTypeHint("ROW<id INT, rgender STRING, score DOUBLE,rank INT>")) public static class Top2 extends TableAggregateFunction<Row,Top2Accum>{ @Override public Top2Accum createAccumulator() { Top2Accum top2Accum = new Top2Accum(); top2Accum.first = null; top2Accum.second = null; return top2Accum; } //这些需要封装一整个Row给Top2Accum,因此传入Row public void accumulate(Top2Accum acc, Row row) { Double score = (Double) row.getField(2); if (acc.first == null ||score > (double)acc.first.getField(2)) { acc.second = acc.first; acc.first = row; } else if (acc.second == null || score > (double)acc.second.getField(2)) { acc.second = row; } } public void merge(Top2Accum acc, Iterable<Top2Accum> iterable) { for (Top2Accum otherAcc : iterable) { accumulate(acc, otherAcc.first); accumulate(acc, otherAcc.second); } } public void emitValue(Top2Accum acc, Collector<Row> out) { // emit the value and rank if (acc.first != null) { out.collect(Row.of(acc.first.getField(0),acc.first.getField(1),acc.first.getField(2),1)); } if (acc.second !=null) { out.collect(Row.of(acc.second.getField(0),acc.second.getField(1),acc.second.getField(2),2)); } } }
table.groupBy($("gender"))
.flatAggregate(call(Top2.class, row($("id"), $("gender"), $("score"))))
.select($("id"),$("rgender"), $("score"),$("rank"))
.execute().print();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。