赞
踩
使用flink datastream我们都知道有可以进行开窗聚合处理,今天记录一下flinksql的窗口聚合学习。
滚动窗口(Tumble)
滑动窗口(HOP)
Session窗口(Session)
渐进式窗口(CUMULATE)
创建源表
CREATE TABLE source_table( --维度数据 `city_id` STRING, --用户id `user_id` BIGINT, --金额 `price` BIGINT, --事件时间戳 `rowtime` AS CAST(CURRENT_TIMESTAMP AS timestamp(3)), --watermark设置 WATERMARK FOR rowtime AS rowtime - INTERVAL '3' second )WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.city_id.length' = '1', 'fields.user_id.min' = '1', 'fields.user_id.max' = '50', 'fields.price.min' = '1', 'fields.price.max' = '50' );
创建目标表
CREATE TABLE sink_table( --维度数据 `city_id` STRING, --pv pagevisit `pv` BIGINT, --总金额 `sum_price` BIGINT, --最大金额 `max_price` BIGINT, --最小金额 `min_price` BIGINT, --uv 去重 `uv` BIGINT, --窗口开始时间 `winodwstart` BIGINT, --窗口结束时间 `windowend` BIGINT ) WITH ( 'connector'='print' )
flink 1.13版本之前支持这种方式
INSERT INTO sink_table
SELECT
city_id,
COUNT(user_id) as pv,
COUNT(price) as sum_price,
MAX(price) as max_price,
MIN(price) as min_price,
COUNT(DISTINCT user_id) as uv,
UNIX_TIMESTAMP(CAST(TUMBLE_START(rowtime,INTERVAL '5' SECOND) AS STRING)) * 1000 as winodwstart,
UNIX_TIMESTAMP(CAST(TUMBLE_END(rowtime,INTERVAL '5' SECOND) AS STRING)) * 1000 as windowend
FROM source_table
GROUP BY
city_id,
TUMBLE(rowtime,INTERVAL '5' SECOND)
INSERT INTO sink_table SELECT city_id, COUNT(user_id) as pv, COUNT(DISTINCT user_id) as uv, COUNT(price) as sum_price, MAX(price) as max_price, MIN(price) as min_price, UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end FROM TABLE(TUMBLE( TABLE source_table, DESCRIPTOR(rowtime), INTERVAL '5' SECOND )) GROUP BY window_start, window_end, city_id
第⼀个参数 TABLE source_table 声明数据源表;
第⼆个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;
第三个参数 INTERVAL ‘5’ SECOND 声明滚动窗⼝⼤⼩为 5s。
两种方法都可以,但是尽量1.13版本之后建议使用window tvf方案
创建目标表
CREATE TABLE sink_table1(
--维度数据
`city_id` STRING,
--uv 去重
`uv` BIGINT
--窗口开始时间
`window_start`,
--窗口结束时间
`window_end`
)WITH (
'connector'='print'
)
INSERT INTO sink_table1
SELECT
city_id,
COUNT(DISTINCT user_id) as uv,
UNIX_TIMESTAMP(CAST(HOP_START(rowtime,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '10' SECOND) AS STRING)) * 1000 as window_end
FROM source_table
GROUP BY
city_id,
--十秒的滑动窗口,每5秒进行一次滑动,并触发计算,有窗口重叠
HOP(rowtime,INTERVAL '5' SECOND,INTERVAL '10' SECOND);
INSERT INTO sink_table1 SELECT city_id, COUNT(DISTINCT user_id) as uv, UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start, UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end --十秒的滑动窗口,每5秒进行一次滑动,并触发计算,有窗口重叠 FROM TABLE(HOP( TABLE source_table, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND )) GROUP BY city_id, window_start, window_end
创建目标表
CREATE TABLE sink_table2(
--维度数据
`city_id` STRING,
--pv 计算每个session窗口内的商品数量
`pv` BIGINT
--窗口开始时间
`window_start`,
--窗口结束时间
`window_end`
)WITH(
'connector'='print'
)
执行sql统计每五秒的会话窗口 windowaggregation
INSERT INTO sink_table2
SELECT
city_id,
COUNT(user_id) as pv,
UNIX_TIMESTAMP(CAST(SESSION_START(rowtime,INTERVAL '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(SESSION_END(rowtime,INTERVAL '5' SECOND) AS STRING))* 1000 as window_end
FROM source_table
GROUP BY
city_id,
SESSION(rowtime,INTERVAL '5' SECOND)
使用Window tvf的方式测试报错
创建源表
CREATE TABLE source_table1( --用户id `user_id` STRING, --消费金额 `money` BIGINT, --事件时间 `rowtime` AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), --指定watermark WATERMARK FOR rowtime AS rowtime - INTERVAL '2' SECOND )WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.user_id.length' = '1', 'fields.money.min' = '1', 'fields.money.max' = '50' )
创建目标表
CREATE TABLE sink_table3(
--用户id总数 去重
`usernum` STRING,
--窗口内总金额数量
`summoney` BIGINT,
--窗口开始时间
`window_start` BIGINT,
--窗口结束时间
`window_end` BIGINT
)WITH (
'connector'='print'
)
执行sql插入,注意这里1.13(streaming)只能使用window tvf模式
INSERT INTO sink_table3
SELECT
COUNT(DISTINCT user_id) as usernum,
COUNT(money) as summoney,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end
FROM TABLE(CUMULATE(
TABLE source_table1,
DESCRIPTOR(rowtime),
INTERVAL ‘5’ SECOND,
INTERVAL ‘20’ SECOND
))
GROUP BY
window_start,
window_end
第⼀个参数 TABLE source_table 声明数据源表;
第⼆个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;
第三个参数 INTERVAL ‘5’ SECOND 声明渐进式窗⼝触发的渐进步⻓为 5s。
第四个参数 INTERVAL ‘20’ SECOND 声明整个渐进式窗⼝的⼤⼩为 20 s,到了20s新开⼀个窗⼝重新累计。
本次对于flinksql里面的四个窗口分别进行了演示
1.滚动窗⼝:将每个元素指定给指定窗⼝⼤⼩的窗⼝。滚动窗⼝具有固定⼤⼩,且不重叠
2.滑动窗口:滑动窗⼝也是将元素指定给固定⻓度的窗⼝。与滚动窗⼝功能⼀样,也有窗⼝⼤⼩的概 念。不⼀样的地⽅在于,滑动窗⼝有另⼀个参数控制窗⼝计算的频率(滑动窗⼝滑动的步⻓)。因此,如果 滑动的步⻓⼩于窗⼝⼤⼩,则滑动窗⼝之间每个窗⼝是可以重叠。在这种情况下,⼀条数据就会分配到多个 窗⼝当中。
3.会话窗口:Session 时间窗⼝和滚动、滑动窗⼝不⼀样,其没有固定的持续时间,如果在定义 的间隔期(Session Gap)内没有新的数据出现,则 Session 就会窗⼝关闭
4.渐进式窗口:渐进式窗⼝在其实就是 固定窗⼝间隔内提前触发的的 滚动窗⼝ ,其实就是 Tumble Window + early-fire 的⼀个事件时间的版本。
这里需要注意尽量使用Window TVF模式,但是会话窗口的好像只能支持Window AGGREGATION.其他的均可以只是Window TVF模式。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。