赞
踩
窗口计算离不开水印,即 watermark,关于 watermark 可以看我上一篇文章 Flink 时间属性及 WATERMARK 水印 。
滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)
、[0:05, 0:10)
、[0:10, 0:15)
等窗口。
- CREATE TEMPORARY TABLE src_kafka (
- `create_time` TIMESTAMP(3) NOT NULL METADATA from 'timestamp',
- `offset` bigINT NOT NULL METADATA from 'offset' ,
- `partition` INT NOT NULL METADATA from 'partition' ,
- cs1 STRING,
- ip STRING,
- b STRING,
- t STRING,
- n STRING,
- tm BIGINT,
- event_time as TO_TIMESTAMP(tm),
- watermark for event_time as event_time - INTERVAL '2' SECOND, -- 定义水位线
- PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'ai_log',
- 'properties.bootstrap.servers' = 'alikafka',
- 'properties.group.id' = 'pyspark-consumer',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'json'
- );
-
- SELECT
- TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start,
- TUMBLE_END(event_time, INTERVAL '1' MINUTE) as window_end,
- b as platform,
- count(distinct cs1) as uv
- from src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE),b
- ;
上述案例是要按照 每分钟的滚动窗口来统计埋点事件为 ADCompanyShow 的登录用户数量,以平台维度区分,从上图中可知 web 端每分钟登录用户数具体数值。
滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。
滑动窗口有两个参数:slide和size。slide为滑动步长,size为窗口大小。
通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。
- CREATE TEMPORARY TABLE src_kafka (
- `create_time` TIMESTAMP(3) NOT NULL METADATA from 'timestamp',
- `offset` bigINT NOT NULL METADATA from 'offset' ,
- `partition` INT NOT NULL METADATA from 'partition' ,
- cs1 STRING,
- ip STRING,
- b STRING,
- t STRING,
- n STRING,
- tm BIGINT,
- event_time as TO_TIMESTAMP(tm),
- watermark for event_time as event_time - INTERVAL '2' SECOND,
- PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'ai_log',
- 'properties.bootstrap.servers' = 'alikafka',
- 'properties.group.id' = 'pyspark-consumer',
- 'scan.startup.mode' = 'latest-offset',
- 'format' = 'json'
- );
-
-
- SELECT
- HOP_START (event_time, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start,
- HOP_END (event_time, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end,
- COUNT (cs1) as uv
- FROM src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- and b ='web'
- GROUP BY HOP (event_time, INTERVAL '30' SECOND, INTERVAL '1' MINUTE);
上述案例中,每隔30秒统计1次近1分钟内 埋点事件为 ADCompanyShow 的登录用户数,请注意:每个窗口的开始时间、结束时间 会根据 slide 进行调整,因此窗口会重叠。
滑动窗口通过窗口大小和滑动步长定义的重叠窗口,用于连续的实时分析。适用于需要频繁更新的实时分析场景,如实时指标监控、滚动平均值计算等。通过合理使用滑动窗口,可以实现对数据流的连续分析,满足实时性和数据完整性的需求。
- SELECT
- HOP_START (event_time, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) as window_start,
- HOP_END (event_time, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) as window_end,
- COUNT (cs1) as uv
- FROM src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- and b ='web'
- GROUP BY HOP (event_time, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE);
由上图可知,当 滑动步长 == 窗口大小时,滑动窗口就等于滚动窗口。
- SELECT
- HOP_START (event_time, INTERVAL '2' MINUTE, INTERVAL '1' MINUTE) as window_start,
- HOP_END (event_time, INTERVAL '2' MINUTE, INTERVAL '1' MINUTE) as window_end,
- COUNT (cs1) as uv
- FROM src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- and b ='web'
- GROUP BY HOP (event_time, INTERVAL '2' MINUTE, INTERVAL '1' MINUTE);
由上图可知,当 滑动步长 > 窗口大小时,则为跳跃窗口,窗口之间不重叠且有间隙。
会话窗口(SESSION)通过 SESSION 活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,该窗口就会关闭。
会话窗口经常用于分析用户在网站或应用上的行为。例如:
- SELECT
- SESSION_START(event_time, INTERVAL '2' minute) as window_start,
- SESSION_END(event_time, INTERVAL '2' minute) as window_end,
- cs1 as uid,
- COUNT(1) as viewcnt
- FROM src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- and b = 'web'
- GROUP BY SESSION(event_time, INTERVAL '2' minute), cs1
- ;
会话窗口的开始时间和截止时间是根据用户活动的时间戳和预定义的会话间隔来计算的。会话窗口的计算方式如下:
假设我们有一系列用户活动的时间戳,并定义会话间隔为 20 分钟。下面是计算会话窗口开始时间和截止时间的步骤:
初始化:
更新会话窗口:
输出会话窗口:
假设我们有以下用户活动时间戳,定义会话间隔为20分钟:
- 2024-08-05 10:00:00
- 2024-08-05 10:05:00
- 2024-08-05 10:25:00
- 2024-08-05 10:35:00
- 2024-08-05 11:00:00
计算步骤如下:
- 第一个事件:
- 时间戳:2024-08-05 10:00:00
- 会话开始时间:2024-08-05 10:00:00
- 会话截止时间:2024-08-05 10:20:00
-
- 第二个事件:
- 时间戳:2024-08-05 10:05:00
- 在当前会话范围内(10:20之前)
- 更新会话截止时间:2024-08-05 10:25:00
-
- 第三个事件:
- 时间戳:2024-08-05 10:25:00
- 在当前会话范围内(10:25之前)
- 更新会话截止时间:2024-08-05 10:45:00
-
- 第四个事件:
- 时间戳:2024-08-05 10:35:00
- 在当前会话范围内(10:45之前)
- 更新会话截止时间:2024-08-05 10:55:00
-
- 第五个事件:
- 时间戳:2024-08-05 11:00:00
- 不在当前会话范围内(10:55之后)
- 当前会话结束,输出会话:
- 会话开始时间:2024-08-05 10:00:00
- 会话截止时间:2024-08-05 10:55:00
- 新会话开始时间:2024-08-05 11:00:00
- 新会话截止时间:2024-08-05 11:20:00
在Flink SQL中,累计窗口(Cumulative Window)是一种特殊的窗口类型,用于按固定的时间间隔逐步累积数据。与滚动窗口(Tumbling Window)和滑动窗口(Sliding Window)不同,累计窗口会在每个窗口结束时输出累积的结果。
- with src_kafka_1 as (
- select *
- from src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- and b = 'web'
- )
-
- SELECT
- window_start,
- window_end,
- count(distinct cs1) as cumulate_dau
- FROM TABLE(
- CUMULATE(
- TABLE src_kafka_1,
- DESCRIPTOR(event_time),
- INTERVAL '1' MINUTES,
- INTERVAL '1' DAY))
- GROUP BY window_start, window_end;
我想计算当天累计活跃用户数,按照1分钟的 step 更新数据,窗口的开始时间是基于时间属性列的时间戳对窗口步长进行取整计算的结果,通常从 0
开始,如上图开始时间:2024-08-05 00:00:00;窗口的结束时间是基于时间属性列的时间戳对最大长度进行取整计算的结果,第一个窗口的结束时间:2024-08-05 16:40:00 ,是因为我在这个时间点提交的任务,之后的结束时间戳都是在之前基础上加上一个 step 。
OVER窗口(OVER Window)是传统数据库的标准开窗,不同于Group By Window,OVER窗口中每1个元素都对应1个窗口。OVER窗口可以按照实际元素的行或实际的元素值(时间戳值)确定窗口,因此流数据元素可能分布在多个窗口中。
在应用OVER窗口的流式数据中,每个元素都对应1个OVER窗口。每个元素都触发1次数据计算,每个触发计算的元素所确定的行,都是该元素所在窗口的最后1行
Flink SQL中对OVER窗口的定义遵循标准SQL的定义语法,传统OVER窗口没有对其进行更细粒度的窗口类型命名划分。按照计算行的定义方式,OVER Window可以分为以下两类:
- select event_time,cs1 as uid,count(1) over(partition by cs1 order by event_time rows between 2 preceding and current row) as cnt
- from src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- and b = 'web'
- ;
- select
- event_time
- ,cs1 as uid
- ,count(1) over(partition by cs1 order by event_time RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW) as cnt
- from src_kafka
- where n = 'ADCompanyShow'
- and cs1 is not null
- and b = 'web'
- ;
尽管离线计算和实时计算中关于 over窗口函数 的代码相同,但两者在执行过程中存在以下主要区别:
数据处理方式:
OVER
窗口函数会在数据流中实时地计算窗口结果。每当新数据到达时,窗口计算会实时更新。OVER
窗口函数会在整个数据集上一次性计算窗口结果。所有数据都读取完毕后,窗口计算才会开始。计算延迟:
状态管理:
以上就是本次关于流式计算中不同窗口计算类型,有任何问题欢迎各位在讨论区交流。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。