当前位置:   article > 正文

FlinkSql中的窗口

FlinkSql中的窗口
        窗口可以将无界流切割成大小有限的“桶”(bucket)来做计算,通过截取有限数据集来处理无限的流数据。

窗口表值函数(Window TVF)

        从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,
Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表
进行扩展后返回
。表函数(table function)可以看作是返回一个表的函数。

        目前 Flink 提供了以下几个窗口 TVF:

  • 滚动窗口(Tumbling Windows);
  • 滑动窗口(Hop Windows,跳跃窗口);
  • 累积窗口(Cumulate Windows);
  • 会话窗口(Session Windows,目前尚未完全支持)。

        在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:窗口起始点(window_start)、窗口结束点(window_end)、窗口时间(window_time)。起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳

滚动窗口(TUMBLE)

滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)

这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。

应用场景:统计每小时的pv,uv

滑动窗口(HOP)

        滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)
和滑动步长(slide)
两个参数。

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。

应用场景:每5分钟统计一下最近一小时的pv,uv。

累积窗口(CUMULATE)

        在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。这种特殊的窗口就叫作“累积窗口”(Cumulate Window),它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用 CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1 小时的累积窗口。注意第三个参数为步长 step ,第四个参数则是最大窗口长度。
使用场景:每小时统计一次当天的pv,uv。(如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值)。

实际案例

每天的截⽌当前分钟的累计 money(sum(money),去重 id 数(count(distinct id))。每天代表渐进式窗⼝⼤⼩为 1 天,分钟代表渐进式窗⼝移动步⻓为分钟级别。

  1. --数据源表
  2. CREATE TABLE source_table (
  3. user_id BIGINT,
  4. money BIGINT,
  5. row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
  6. WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
  7. ) WITH (
  8. ...
  9. );
  10. -- 数据处理逻辑
  11. SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end
  12. ,window_start
  13. ,sum(money) as sum_money
  14. ,count(distinct id) as count_distinct_id
  15. FROM TABLE(CUMULATE(TABLE source_table,DESCRIPTOR(row_time),INTERVAL '60' SECOND,INTERVAL '1' DAY))
  16. GROUP BY window_start
  17. ,UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000

可以看到 Windowing TVF 滚动窗⼝的写法就是把 cumulate window 的声明写在了数据源的 Table ⼦句中,所以可理解为将表进行扩展后返回。

Window TVF ⽀持 Grouping Sets、Rollup、Cube

应⽤场景:实际的案例场景中,经常会有多个维度进⾏组合(cube)计算指标的场景。如果把每个维度组合 的代码写⼀遍,然后 union all 起来,这样写起来⾮常麻烦,⽽且会导致⼀个数据源读取多遍。

  1. -- ⽤户访问明细表
  2. CREATE TABLE source_table (
  3. age STRING,
  4. sex STRING,
  5. user_id BIGINT,
  6. row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
  7. WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
  8. ) WITH (
  9. ...
  10. );
  11. --处理逻辑
  12. SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end
  13. ,if(age is null, 'ALL', age) as age
  14. ,if(sex is null, 'ALL', sex) as sex
  15. ,count(distinct user_id) as bucket_uv
  16. FROM TABLE(CUMULATE(TABLE source_table,DESCRIPTOR(row_time),INTERVAL '5' SECOND,INTERVAL '1' DAY))
  17. GROUP BY window_start
  18. ,window_end
  19. ,GROUPING SETS (
  20. ()
  21. ,(age)
  22. ,(sex)
  23. ,(age, sex)
  24. )
  25. ;

⽬前 Grouping Sets 只在 Window TVF 中⽀持,不⽀持 Group Window Aggregation。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/639043
推荐阅读
相关标签
  

闽ICP备14008679号