赞
踩
在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。
与标准 SQL 中一致,Flink SQL 中的开窗函数也是通过 OVER 子句来实现的,所以有时开窗聚合也叫作“OVER 聚合”(Over Aggregation)。基本语法如下:
SELECT
<聚合函数> OVER (
[PARTITION BY <字段 1>[, <字段 2>, ...]]
ORDER BY <时间属性字段>
<开窗范围>),
...
FROM ...
MySQL窗口函数教学,MySQL窗口函数和Hive窗口函数以及Flink窗口函数区别不大,只是语法上有些区别,例如开窗范围方面,勤奋的读者一定会看明白。
这里 OVER 关键字前面是一个聚合函数,它会应用在后面 OVER 定义的窗口上。在 OVER子句中主要有以下几个部分:
用来指定分区的键(key),类似于 GROUP BY 的分组,这部分是可选的;
OVER 窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。不论那种定义,数据都应该是以某种顺序排列好的;而表中的数据本身是无序的。所以在OVER 子句中必须用 ORDER BY 明确地指出数据基于那个字段排序。在 Flink 的流处理中,目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间属性。
对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由 BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围。目前支持的上界只能是 CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围,所以一般的形式为:
BETWEEN ... PRECEDING AND CURRENT ROW
开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。
范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。
例如开窗范围选择当前行之前 1 小时的数据:
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。
例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
需求:统计每个用户当前这次访问以及之前三次访问的平均时间
public class OverTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //1.在创建表的DDL中直接定义时间属性 String creatDDL = "CREATE TABLE clickTable (" + "user_name STRING," + "url STRING," + "ts BIGINT," + "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间 FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒 " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒 ")WITH(" + " 'connector' = 'filesystem'," + " 'path' = 'input/clicks.txt'," + " 'format' = 'csv'" + ")"; tableEnv.executeSql(creatDDL); //需求:统计每个用户当前这次访问以及之前三次访问的平均时间 Table overWindow = tableEnv.sqlQuery("select user_name," + " avg(ts) OVER(" + " PARTITION BY user_name " + " ORDER BY et " + " ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" + ") AS avg_ts " + "FROM clickTable"); tableEnv.toChangelogStream(overWindow).print("over window:"); env.execute(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。