赞
踩
CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, `ts` TIMESTAMP(3), proctime as PROCTIME(), -- 处理时间列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列 ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'user_behavior', -- kafka topic 'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取 'properties.bootstrap.servers' = 'chb1:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv' );
官网: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/
窗口函数分类:
窗口 TVF 的返回值是一个新的关系,它包括原关系的所有列,以及另外3个列“window_start”、“window_end”、“window_time”,以指示分配的窗口。
每个窗口没有重叠,窗口长度固定
滚动窗口的函数格式:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
- data 表名
- timecol 时间属性字段
- 时间窗口的大小 INTERVAL '10' MINUTES
测试案例
表结构
Flink SQL> desc user_log;
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
| name | type | null | key | extras | watermark |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
| user_id | STRING | true | | | |
| item_id | STRING | true | | | |
| category_id | STRING | true | | | |
| behavior | STRING | true | | | |
| ts | TIMESTAMP(3) *ROWTIME* | true | | | `ts` - INTERVAL '5' SECOND |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | AS PROCTIME() | |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
6 rows in set
1、窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES));
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[1 min]).
Please use window table-valued function with aggregate together using window_start and window_end as group keys.
-- 按照 window_start、window_end 分区
SELECT window_start, window_end, count(user_id)
FROM TABLE(TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
相对于滚动窗口,滑动窗口多一个 slide
参数,用于表示每个窗口滑动大小
滑动窗口函数表达式:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- data 表
- timecol 时间字段
- slide 滑动长度
- size 窗口大小
测试案例:
一个滑动窗口,窗口长度1分钟,滑动距离30秒
SELECT window_start, window_end, count(1)
FROM TABLE(
HOP(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
Cumulate window 就是累计窗口,简单来说,以下图里面时间轴上的一个区间为窗口步长(step)。
第一个 window 统计的是一个区间的数据;
第二个 window 统计的是第一区间和第二个区间的数据;
第三个 window 统计的是第一区间,第二个区间和第三个区间的数据。
累积计算在业务场景中非常常见,如累积 UV 场景。在 UV 大盘曲线中:我们每隔 10 分钟统计一次当天累积用户 UV。
累计窗口表达式:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- data 表
- timecol 时间字段
- step 步长
- size 窗口大小
每30秒,统一最近一分钟的pv
SELECT window_start, window_end, count(1)
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
累加窗口:(1 MINUTE,1 DAY) 按照1分钟划分窗口,每分钟计算当前分钟的数据 merge 当前分钟的前一分钟的数据结果
按照 订单数据事件时间+水位线 进行窗口触发执行
SELECT window_start, window_end, count(1)
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' DAYS))
GROUP BY window_start, window_end;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。