赞
踩
从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,
Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表
进行扩展后返回。表函数(table function)可以看作是返回一个表的函数。
目前 Flink 提供了以下几个窗口 TVF:
在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:窗口起始点(window_start)、窗口结束点(window_end)、窗口时间(window_time)。起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳。
滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。
应用场景:统计每小时的pv,uv。
滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)
和滑动步长(slide)两个参数。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));
这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。
应用场景:每5分钟统计一下最近一小时的pv,uv。
在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。这种特殊的窗口就叫作“累积窗口”(Cumulate Window),它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用 CUMULATE()函数来定义,具体如下:
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
每天的截⽌当前分钟的累计 money(sum(money),去重 id 数(count(distinct id))。每天代表渐进式窗⼝⼤⼩为 1 天,分钟代表渐进式窗⼝移动步⻓为分钟级别。
- --数据源表
- CREATE TABLE source_table (
- user_id BIGINT,
- money BIGINT,
- row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
- WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
- ) WITH (
- ...
- );
-
- -- 数据处理逻辑
- SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end
- ,window_start
- ,sum(money) as sum_money
- ,count(distinct id) as count_distinct_id
- FROM TABLE(CUMULATE(TABLE source_table,DESCRIPTOR(row_time),INTERVAL '60' SECOND,INTERVAL '1' DAY))
- GROUP BY window_start
- ,UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000
可以看到 Windowing TVF 滚动窗⼝的写法就是把 cumulate window 的声明写在了数据源的 Table ⼦句中,所以可理解为将表进行扩展后返回。
应⽤场景:实际的案例场景中,经常会有多个维度进⾏组合(cube)计算指标的场景。如果把每个维度组合 的代码写⼀遍,然后 union all 起来,这样写起来⾮常麻烦,⽽且会导致⼀个数据源读取多遍。
- -- ⽤户访问明细表
- CREATE TABLE source_table (
- age STRING,
- sex STRING,
- user_id BIGINT,
- row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
- WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
- ) WITH (
- ...
- );
-
- --处理逻辑
- SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end
- ,if(age is null, 'ALL', age) as age
- ,if(sex is null, 'ALL', sex) as sex
- ,count(distinct user_id) as bucket_uv
- FROM TABLE(CUMULATE(TABLE source_table,DESCRIPTOR(row_time),INTERVAL '5' SECOND,INTERVAL '1' DAY))
- GROUP BY window_start
- ,window_end
- ,GROUPING SETS (
- ()
- ,(age)
- ,(sex)
- ,(age, sex)
- )
- ;
⽬前 Grouping Sets 只在 Window TVF 中⽀持,不⽀持 Group Window Aggregation。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。