当前位置:   article > 正文

27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)

flink 的sql之select (窗口函数)介绍及详细示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文简单的介绍了Flink 的窗口函数(即滚动窗口、滑动窗口和累积窗口)及具体的示例验证过程。
本文依赖flink和kafka集群能正常使用。
本文的示例是在Flink 1.17版本中验证的。
注:
本文在写作过程中出现1个官方示例直接使用offset关键字设置偏移量时不能运行通过情况和一个官方示例在表有主键的情况下不能通过窗口进行聚合运行通过,由于想尽快将该专栏写完,故未深究,待完成该专栏后再仔细的研究原因。当然知道原因的大佬,欢迎指出,谢谢。
分别对应的目录是:
1)、示例1-使用offset累积窗口查询、统计
2)、示例2-使用滚动窗口查询、统计(表含主键)

一、Windowing table-valued functions (Windowing TVFs)

Windows 是处理无限流的核心。Windows 将流拆分为有限大小的“桶”,我们可以在其上应用计算。本文档重点介绍如何在 Flink SQL 中执行窗口化,以及程序员如何从其提供的功能中获得最大收益。

Apache Flink 提供了几个窗口表值函数 (TVF) 来将表的元素划分为窗口,包括:

  • Tumble Windows(滚动窗口)
  • Hop Windows(滑动窗口)
  • Cumulate Windows(累积窗口)
  • Session Windows (会话窗口,截至Flink 1.17版本还不支持)

每个元素在逻辑上可以属于多个窗口,具体取决于您使用的窗口表值函数。例如,HOP 窗口创建重叠窗口,其中可以将单个元素分配给多个窗口。

Windowing TVFs 是 Flink 定义的多态表函数(缩写为 PTF)。PTF 是 SQL 2016 标准的一部分,这是一个特殊的表函数,但可以将表作为参数。PTF 是更改表格形状的强大功能。由于 PTF 在语义上与表类似,因此它们的调用发生在 SELECT 语句的 FROM 子句中。

Windowing TVFs 是旧版分组窗口函数的替代品。Windowing TVFs 更符合 SQL 标准,并且更强大,可以支持复杂的基于窗口的计算,例如 Window TopN、Window Join。但是,分组窗口函数只能支持窗口聚合。

如何应用基于窗口 TVF 的进一步计算,将在后面的章节中进行介绍:

  • Window Aggregation
  • Window TopN
  • Window Join
  • Window Deduplication

Apache Flink 提供了 3 个内置windowing TVFs:TUMBLE、HOP 和 CUMULATE。windowing TVF 的返回值是一个新关系,它包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的附加 3 列,以指示分配的窗口。在流式处理模式下,“window_time”字段是窗口的时间属性。在批处理模式下,“window_time”字段是基于输入时间字段类型的 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。“window_time”字段可用于后续的基于时间的操作,例如聚合上的另一个windowing TVF或 interval joins。window_time的值始终等于 window_end - 1ms。

1、TUMBLE滚动窗口

TUMBLE 函数将每个元素分配给指定窗口大小的窗口。Tumbling windows具有固定大小,不会重叠。例如,假设您指定了一个大小为 5 分钟的Tumbling windows。在这种情况下,Flink 将评估当前窗口,并且每五分钟启动一个新窗口,如下图所示。
在这里插入图片描述

TUMBLE 函数根据时间属性字段为关系的每一行分配一个窗口。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。TUMBLE 的返回值是一个新关系,包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是window TVF 之后的常规timestamp 列。

TUMBLE 函数三个必需参数,一个可选参数:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])

# data:是一个表参数,可以是与时间属性列的任何关系。
# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到tumbling windows。
# size:是指定tumbling windows宽度的持续时间。
# offset:是一个可选参数,用于指定窗口开始移动的偏移量。

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1)、示例1-使用滚动窗口查询、统计(表不含主键)

具体验证过程如下

---1、建表
Flink SQL> CREATE TABLE orders (
>     `id`    STRING,
>     price       DECIMAL(32,2),
>     proctime as PROCTIME()
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'orders_topic',
>   'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'csv'
> );
---2、插入数据并查询
# 插入数据略,就是用kafka写入该表中
# 最终表内数据
 
---3、滑动窗口的2种方式查询
SELECT * FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL '5' MINUTES));
# 或
 SELECT * FROM TABLE(
    TUMBLE(
      DATA => TABLE orders,   -- DATA必须是第一个参数
      TIMECOL => DESCRIPTOR(proctime),
      SIZE => INTERVAL '5' MINUTES));

Flink SQL> SELECT * FROM TABLE(
>    TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL '5' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 10:38:26.566 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.566 |
| +I |                              2 |                              15.00 | 2023-09-19 10:38:26.566 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.566 |
| +I |                              3 |                              20.00 | 2023-09-19 10:38:26.566 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.566 |
| +I |                              4 |                              30.00 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                              5 |                              60.00 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                              6 |                             800.98 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                              7 |                             100.90 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                              8 |                              11.00 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                              9 |                              18.00 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                             10 |                             123.00 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                             11 |                              35.78 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |
| +I |                             12 |                              45.68 | 2023-09-19 10:38:26.567 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:26.567 |


Flink SQL> SELECT * FROM TABLE(
>    TUMBLE(
>      DATA => TABLE orders,
>      TIMECOL => DESCRIPTOR(proctime),
>      SIZE => INTERVAL '5' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 10:38:58.165 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.165 |
| +I |                              2 |                              15.00 | 2023-09-19 10:38:58.165 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.165 |
| +I |                              3 |                              20.00 | 2023-09-19 10:38:58.165 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.165 |
| +I |                              4 |                              30.00 | 2023-09-19 10:38:58.165 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.165 |
| +I |                              5 |                              60.00 | 2023-09-19 10:38:58.165 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.165 |
| +I |                              6 |                             800.98 | 2023-09-19 10:38:58.166 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.166 |
| +I |                              7 |                             100.90 | 2023-09-19 10:38:58.166 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.166 |
| +I |                              8 |                              11.00 | 2023-09-19 10:38:58.166 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.166 |
| +I |                              9 |                              18.00 | 2023-09-19 10:38:58.166 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.166 |
| +I |                             10 |                             123.00 | 2023-09-19 10:38:58.166 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.166 |
| +I |                             11 |                              35.78 | 2023-09-19 10:38:58.166 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.166 |
| +I |                             12 |                              45.68 | 2023-09-19 10:38:58.167 | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 | 2023-09-19 10:38:58.167 |

---4、滑动窗口的计算
# orders表一边写入数据,一边进行窗口计算,结果如下:
Flink SQL> SELECT window_start, window_end, sum(price)
>   FROM TABLE(
>     TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL '5' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+------------------------------------------+
| op |            window_start |              window_end |                                   EXPR$2 |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2023-09-19 10:35:00.000 | 2023-09-19 10:40:00.000 |                                  1270.34 |


Flink SQL> SELECT window_start, window_end, sum(price)
>   FROM TABLE(
>     TUMBLE(TABLE orders, DESCRIPTOR(proctime),INTERVAL '5' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+------------------------------------------+
| op |            window_start |              window_end |                                   EXPR$2 |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2023-09-19 10:40:00.000 | 2023-09-19 10:45:00.000 |                                  1428.02 |


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

2)、示例2-使用滚动窗口查询、统计(表含主键)

验证过程如下,表如果设置了主键,好像不能对数据进行计算。

-----1、建表
Flink SQL> CREATE TABLE orders2 (
>    `id`    STRING,
>    price       DECIMAL(32,2),
>    proctime as PROCTIME(),
>    PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'orders2_topic',
>   'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'value.format' = 'debezium-json'
> );

Flink SQL> desc orders2;
+----------+-----------------------------+-------+---------+---------------+-----------+
|     name |                        type |  null |     key |        extras | watermark |
+----------+-----------------------------+-------+---------+---------------+-----------+
|       id |                      STRING | FALSE | PRI(id) |               |           |
|    price |              DECIMAL(32, 2) |  TRUE |         |               |           |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |         | AS PROCTIME() |           |
+----------+-----------------------------+-------+---------+---------------+-----------+
-----2、插入数据,并查询
Flink SQL> select * from orders2;
+----+--------------------------------+------------------------------------+-------------------------+
| op |                             id |                              price |                proctime |
+----+--------------------------------+------------------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 13:23:00.764 |
| +I |                              2 |                              12.00 | 2023-09-19 13:23:34.945 |
| +I |                              3 |                               4.00 | 2023-09-19 13:23:43.993 |
| +I |                              4 |                              20.00 | 2023-09-19 13:23:51.384 |
-----3、滑动窗口的2种查询方式
Flink SQL> SELECT * FROM TABLE( TUMBLE(TABLE orders2, DESCRIPTOR(proctime),INTERVAL '5' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 13:25:14.066 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:14.066 |
| +I |                              2 |                              12.00 | 2023-09-19 13:25:14.066 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:14.066 |
| +I |                              3 |                               4.00 | 2023-09-19 13:25:14.066 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:14.066 |
| +I |                              4 |                              20.00 | 2023-09-19 13:25:14.067 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:14.067 |


Flink SQL> SELECT * FROM TABLE(
>    TUMBLE(
>      DATA => TABLE orders2,
>      TIMECOL => DESCRIPTOR(proctime),
>      SIZE => INTERVAL '5' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 13:25:39.548 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:39.548 |
| +I |                              2 |                              12.00 | 2023-09-19 13:25:39.548 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:39.548 |
| +I |                              3 |                               4.00 | 2023-09-19 13:25:39.548 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:39.548 |
| +I |                              4 |                              20.00 | 2023-09-19 13:25:39.548 | 2023-09-19 13:25:00.000 | 2023-09-19 13:30:00.000 | 2023-09-19 13:25:39.548 |

-----4、滑动窗口的计算
Flink SQL> SELECT window_start, window_end, sum(price)
>   FROM TABLE(
>     TUMBLE(TABLE orders2, DESCRIPTOR(proctime),INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, orders2]], fields=[id, price])

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

3)、官方示例-使用滚动窗口查询、统计(未验证)

下面是官方的示例,本文未做验证,具体实现可参考上文的例子。

-- tables must have time attribute, e.g. `bidtime` in this table
Flink SQL> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
|        name |                   type | null | key | extras |                       watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
|     bidtime | TIMESTAMP(3) *ROWTIME* | true |     |        | `bidtime` - INTERVAL '1' SECOND |
|       price |         DECIMAL(10, 2) | true |     |        |                                 |
|        item |                 STRING | true |     |        |                                 |
+-------------+------------------------+------+-----+--------+---------------------------------+

Flink SQL> SELECT * FROM Bid;
+------------------+-------+------+
|          bidtime | price | item |
+------------------+-------+------+
| 2020-04-15 08:05 |  4.00 | C    |
| 2020-04-15 08:07 |  2.00 | A    |
| 2020-04-15 08:09 |  5.00 | D    |
| 2020-04-15 08:11 |  3.00 | B    |
| 2020-04-15 08:13 |  1.00 | E    |
| 2020-04-15 08:17 |  6.00 | F    |
+------------------+-------+------+

Flink SQL> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Flink SQL> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

2、HOP滑动窗口

HOP 函数将元素分配给固定长度的窗口。与 TUMBLE 窗口函数一样,窗口的大小由窗口大小参数配置。附加的窗口滑动参数(window slide)控制跳跃窗口的启动频率。因此,如果slide小于窗口大小,则滑动窗口(hopping windows)可能会重叠。在这种情况下,元素被分配给多个窗口。hopping windows也称为“滑动窗口(sliding windows)”。

例如,有大小为 10 分钟的窗口,该窗口滑动 5 分钟。这样,将每 5 分钟获得一个窗口,其中包含过去 10 分钟内到达的事件,如下图所示。
在这里插入图片描述
HOP 函数分配在大小间隔内覆盖行的窗口,并根据时间属性字段移动每slide 。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。HOP 的返回值是一个新关系,它包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是 windowing TVF 后的常规时间戳列。

HOP 采用四个必需参数,一个可选参数:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

# data:是一个表参数,可以是与时间属性列的任何关系。
# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到hopping windows。
# slide:是指定顺序hopping windows开始之间的持续时间
# size :是指定hopping windows宽度的持续时间。
# offset :是一个可选参数,用于指定窗口开始移动的偏移量。

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1)、示例1-使用滑动窗口查询、统计

----表结构
CREATE TABLE orders (
    `id`    STRING,
    price       DECIMAL(32,2),
    proctime as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders_topic',
  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);
Flink SQL> desc orders;
+----------+-----------------------------+-------+-----+---------------+-----------+
|     name |                        type |  null | key |        extras | watermark |
+----------+-----------------------------+-------+-----+---------------+-----------+
|       id |                      STRING |  TRUE |     |               |           |
|    price |              DECIMAL(32, 2) |  TRUE |     |               |           |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |           |
+----------+-----------------------------+-------+-----+---------------+-----------+

---滑动窗口的两种使用方式1
SELECT * FROM TABLE( HOP(TABLE orders, DESCRIPTOR(proctime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
---滑动窗口的两种使用方式2,data需要是西一个参数
SELECT * FROM TABLE(
    HOP(
      DATA => TABLE orders,
      TIMECOL => DESCRIPTOR(proctime),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));

---滑动窗口的计算
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(proctime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
--------以下是具体的验证数据--------
Flink SQL> select * from orders;
+----+--------------------------------+------------------------------------+-------------------------+
| op |                             id |                              price |                proctime |
+----+--------------------------------+------------------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 14:37:33.300 |
| +I |                              2 |                              15.00 | 2023-09-19 14:37:33.300 |
| +I |                              3 |                              20.00 | 2023-09-19 14:37:33.300 |
| +I |                              4 |                              30.00 | 2023-09-19 14:37:33.300 |
| +I |                              5 |                              60.00 | 2023-09-19 14:37:33.300 |
| +I |                              6 |                             800.98 | 2023-09-19 14:37:33.300 |
| +I |                              7 |                             100.90 | 2023-09-19 14:37:33.300 |
| +I |                              8 |                              11.00 | 2023-09-19 14:37:33.300 |
| +I |                              9 |                              18.00 | 2023-09-19 14:37:33.300 |
| +I |                             10 |                             123.00 | 2023-09-19 14:37:33.300 |
| +I |                             11 |                              35.78 | 2023-09-19 14:37:33.300 |
| +I |                             12 |                              45.68 | 2023-09-19 14:37:33.301 |
| +I |                             13 |                              22.00 | 2023-09-19 14:37:33.301 |
| +I |                             14 |                              56.78 | 2023-09-19 14:37:33.301 |
| +I |                             15 |                              78.90 | 2023-09-19 14:37:33.301 |

Flink SQL> SELECT * FROM TABLE( HOP(TABLE orders, DESCRIPTOR(proctime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              1 |                              10.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              2 |                              15.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              2 |                              15.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              3 |                              20.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              3 |                              20.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              4 |                              30.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              4 |                              30.00 | 2023-09-19 14:38:39.892 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.892 |
| +I |                              5 |                              60.00 | 2023-09-19 14:38:39.893 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              5 |                              60.00 | 2023-09-19 14:38:39.893 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              6 |                             800.98 | 2023-09-19 14:38:39.893 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              6 |                             800.98 | 2023-09-19 14:38:39.893 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              7 |                             100.90 | 2023-09-19 14:38:39.893 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              7 |                             100.90 | 2023-09-19 14:38:39.893 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              8 |                              11.00 | 2023-09-19 14:38:39.893 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              8 |                              11.00 | 2023-09-19 14:38:39.893 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              9 |                              18.00 | 2023-09-19 14:38:39.893 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.893 |
| +I |                              9 |                              18.00 | 2023-09-19 14:38:39.893 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.893 |
| +I |                             10 |                             123.00 | 2023-09-19 14:38:39.893 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.893 |
| +I |                             10 |                             123.00 | 2023-09-19 14:38:39.894 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             11 |                              35.78 | 2023-09-19 14:38:39.894 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             11 |                              35.78 | 2023-09-19 14:38:39.894 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             12 |                              45.68 | 2023-09-19 14:38:39.894 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             12 |                              45.68 | 2023-09-19 14:38:39.894 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             13 |                              22.00 | 2023-09-19 14:38:39.894 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             13 |                              22.00 | 2023-09-19 14:38:39.894 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             14 |                              56.78 | 2023-09-19 14:38:39.894 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             14 |                              56.78 | 2023-09-19 14:38:39.894 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             15 |                              78.90 | 2023-09-19 14:38:39.894 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:38:39.894 |
| +I |                             15 |                              78.90 | 2023-09-19 14:38:39.894 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:38:39.894 |

Flink SQL> SELECT * FROM TABLE(
>     HOP(
>       DATA => TABLE orders,
>       TIMECOL => DESCRIPTOR(proctime),
>       SLIDE => INTERVAL '5' MINUTES,
>       SIZE => INTERVAL '10' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              1 |                              10.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              2 |                              15.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              2 |                              15.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              3 |                              20.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              3 |                              20.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              4 |                              30.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              4 |                              30.00 | 2023-09-19 14:39:35.143 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.143 |
| +I |                              5 |                              60.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              5 |                              60.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              6 |                             800.98 | 2023-09-19 14:39:35.144 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              6 |                             800.98 | 2023-09-19 14:39:35.144 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              7 |                             100.90 | 2023-09-19 14:39:35.144 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              7 |                             100.90 | 2023-09-19 14:39:35.144 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              8 |                              11.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              8 |                              11.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              9 |                              18.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.144 |
| +I |                              9 |                              18.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.144 |
| +I |                             10 |                             123.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.144 |
| +I |                             10 |                             123.00 | 2023-09-19 14:39:35.144 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.144 |
| +I |                             11 |                              35.78 | 2023-09-19 14:39:35.145 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             11 |                              35.78 | 2023-09-19 14:39:35.145 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             12 |                              45.68 | 2023-09-19 14:39:35.145 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             12 |                              45.68 | 2023-09-19 14:39:35.145 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             13 |                              22.00 | 2023-09-19 14:39:35.145 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             13 |                              22.00 | 2023-09-19 14:39:35.145 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             14 |                              56.78 | 2023-09-19 14:39:35.145 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             14 |                              56.78 | 2023-09-19 14:39:35.145 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             15 |                              78.90 | 2023-09-19 14:39:35.145 | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 | 2023-09-19 14:39:35.145 |
| +I |                             15 |                              78.90 | 2023-09-19 14:39:35.145 | 2023-09-19 14:30:00.000 | 2023-09-19 14:40:00.000 | 2023-09-19 14:39:35.145 |

Flink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     HOP(TABLE orders, DESCRIPTOR(proctime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+------------------------------------------+
| op |            window_start |              window_end |                                   EXPR$2 |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2023-09-19 14:35:00.000 | 2023-09-19 14:45:00.000 |                                  1428.02 |

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142

2)、官方示例-使用滑动窗口查询、统计(未验证)

> SELECT * FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
    HOP(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |           window_time   |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the hopping windowed table
> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
| 2020-04-15 08:15 | 2020-04-15 08:25 |  6.00 |
+------------------+------------------+-------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

3、CUMULATE累积窗口

Cumulating windows在某些情况下非常有用,例如在固定的窗口间隔内提前触发滚动窗口。例如,每日仪表板从 00:00 到每分钟绘制累积 UV,10:00 的 UV 表示从 00:00 到 10:00 的 UV 总数。这可以通过累积窗口轻松有效地实现。

CUMULATE 函数将元素分配给在步长初始间隔内覆盖行的窗口,并在每一步扩展到另一个步长(保持窗口开始固定),直到最大窗口大小。您可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,并将每个滚动窗口拆分为多个窗口,窗口开始和窗口结束步长差异相同。因此,累积窗口确实会重叠并且没有固定大小。

例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将获得每天的窗口:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) 。
在这里插入图片描述

累积(CUMULATE)函数根据时间属性列分配窗口。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。CUMULATE 的返回值是一个新关系,包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是窗口 TVF 之后的常规时间戳列。

CUMULATE 采用四个必需参数,一个可选参数:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

# data:是一个表参数,可以是与时间属性列的任何关系。
# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
# step :是指定顺序累积窗口结束之间增加的窗口大小的持续时间。
# size :是指定累积窗口的最大宽度的持续时间。大小必须是步长的整数倍。
# offset :是一个可选参数,用于指定窗口开始移动的偏移量。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1)、示例1-使用累积窗口查询、统计

-----表结构
CREATE TABLE orders (
    `id`    STRING,
    price       DECIMAL(32,2),
    proctime as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders_topic',
  'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);
Flink SQL> desc orders;
+----------+-----------------------------+-------+-----+---------------+-----------+
|     name |                        type |  null | key |        extras | watermark |
+----------+-----------------------------+-------+-----+---------------+-----------+
|       id |                      STRING |  TRUE |     |               |           |
|    price |              DECIMAL(32, 2) |  TRUE |     |               |           |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |           |
+----------+-----------------------------+-------+-----+---------------+-----------+
-----累积窗口的查询方式1
SELECT * FROM TABLE( 
      CUMULATE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-----累积窗口的查询方式2
SELECT * FROM TABLE(
    CUMULATE(
      DATA => TABLE orders,
      TIMECOL => DESCRIPTOR(proctime),
      STEP => INTERVAL '2' MINUTES,
      SIZE => INTERVAL '10' MINUTES));
-----累积窗口的计算
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;

-----以下是验证过程----------------
Flink SQL> select * from orders;
+----+--------------------------------+------------------------------------+-------------------------+
| op |                             id |                              price |                proctime |
+----+--------------------------------+------------------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 14:37:33.300 |
| +I |                              2 |                              15.00 | 2023-09-19 14:37:33.300 |
| +I |                              3 |                              20.00 | 2023-09-19 14:37:33.300 |
| +I |                              4 |                              30.00 | 2023-09-19 14:37:33.300 |
| +I |                              5 |                              60.00 | 2023-09-19 14:37:33.300 |
| +I |                              6 |                             800.98 | 2023-09-19 14:37:33.300 |
| +I |                              7 |                             100.90 | 2023-09-19 14:37:33.300 |
| +I |                              8 |                              11.00 | 2023-09-19 14:37:33.300 |
| +I |                              9 |                              18.00 | 2023-09-19 14:37:33.300 |
| +I |                             10 |                             123.00 | 2023-09-19 14:37:33.300 |
| +I |                             11 |                              35.78 | 2023-09-19 14:37:33.300 |
| +I |                             12 |                              45.68 | 2023-09-19 14:37:33.301 |
| +I |                             13 |                              22.00 | 2023-09-19 14:37:33.301 |
| +I |                             14 |                              56.78 | 2023-09-19 14:37:33.301 |
| +I |                             15 |                              78.90 | 2023-09-19 14:37:33.301 |

Flink SQL> desc orders;
+----------+-----------------------------+-------+-----+---------------+-----------+
|     name |                        type |  null | key |        extras | watermark |
+----------+-----------------------------+-------+-----+---------------+-----------+
|       id |                      STRING |  TRUE |     |               |           |
|    price |              DECIMAL(32, 2) |  TRUE |     |               |           |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE |     | AS PROCTIME() |           |
+----------+-----------------------------+-------+-----+---------------+-----------+

Flink SQL> SELECT * FROM TABLE( CUMULATE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              1 |                              10.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              2 |                              15.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              2 |                              15.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              3 |                              20.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              3 |                              20.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              4 |                              30.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              4 |                              30.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              5 |                              60.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              5 |                              60.00 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              6 |                             800.98 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              6 |                             800.98 | 2023-09-19 14:56:48.460 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.460 |
| +I |                              7 |                             100.90 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                              7 |                             100.90 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                              8 |                              11.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                              8 |                              11.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                              9 |                              18.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                              9 |                              18.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             10 |                             123.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             10 |                             123.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             11 |                              35.78 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             11 |                              35.78 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             12 |                              45.68 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             12 |                              45.68 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             13 |                              22.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             13 |                              22.00 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             14 |                              56.78 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             14 |                              56.78 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             15 |                              78.90 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:56:48.461 |
| +I |                             15 |                              78.90 | 2023-09-19 14:56:48.461 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:56:48.461 |

Flink SQL> SELECT * FROM TABLE(
>     CUMULATE(
>       DATA => TABLE orders,
>       TIMECOL => DESCRIPTOR(proctime),
>       STEP => INTERVAL '2' MINUTES,
>       SIZE => INTERVAL '10' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 14:57:13.264 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              1 |                              10.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              2 |                              15.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              2 |                              15.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              3 |                              20.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              3 |                              20.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              4 |                              30.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              4 |                              30.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              5 |                              60.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              5 |                              60.00 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              6 |                             800.98 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              6 |                             800.98 | 2023-09-19 14:57:13.265 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.265 |
| +I |                              7 |                             100.90 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.266 |
| +I |                              7 |                             100.90 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.266 |
| +I |                              8 |                              11.00 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.266 |
| +I |                              8 |                              11.00 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.266 |
| +I |                              9 |                              18.00 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.266 |
| +I |                              9 |                              18.00 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.266 |
| +I |                             10 |                             123.00 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.266 |
| +I |                             10 |                             123.00 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.266 |
| +I |                             11 |                              35.78 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.266 |
| +I |                             11 |                              35.78 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.266 |
| +I |                             12 |                              45.68 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.266 |
| +I |                             12 |                              45.68 | 2023-09-19 14:57:13.266 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.266 |
| +I |                             13 |                              22.00 | 2023-09-19 14:57:13.267 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.267 |
| +I |                             13 |                              22.00 | 2023-09-19 14:57:13.267 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.267 |
| +I |                             14 |                              56.78 | 2023-09-19 14:57:13.267 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.267 |
| +I |                             14 |                              56.78 | 2023-09-19 14:57:13.267 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.267 |
| +I |                             15 |                              78.90 | 2023-09-19 14:57:13.267 | 2023-09-19 14:50:00.000 | 2023-09-19 14:58:00.000 | 2023-09-19 14:57:13.267 |
| +I |                             15 |                              78.90 | 2023-09-19 14:57:13.267 | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 | 2023-09-19 14:57:13.267 |

Flink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     CUMULATE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+------------------------------------------+
| op |            window_start |              window_end |                                   EXPR$2 |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2023-09-19 14:50:00.000 | 2023-09-19 15:00:00.000 |                                  1428.02 |



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153

2)、官方示例-使用累积窗口查询、统计(未验证)

SELECT * FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
SELECT * FROM TABLE(
    CUMULATE(
      DATA => TABLE Bid,
      TIMECOL => DESCRIPTOR(bidtime),
      STEP => INTERVAL '2' MINUTES,
      SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the cumulating windowed table
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 |  4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 |  6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:12 |  3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

4、Window Offset

以上三个窗口函数是windows的函数,下面介绍window offset。

Offset偏移量是一个可选参数,可用于更改窗口分配。它可以是正持续时间和负持续时间。窗口偏移的默认值为 0。如果设置不同的偏移值,则同一记录可能会分配给不同的窗口。
以下示例“对于大小为 10 分钟的翻转窗口,时间戳为 2023-09-30 00:00:04 的记录将分配给哪个窗口?”进行说明,具体如下:

  • 如果偏移值为 -16 MINUTE,则记录将分配给窗口 [2023-09-29 23:54:00, 2023-09-30 00:04:00)。
  • 如果偏移值为 -6 MINUTE,则记录分配给窗口 [2023-09-29 23:54:00,2023-09-30 00:04:00)。
  • 如果偏移量为 -4 MINUTE,则记录分配给窗口 [2023-09-29 23:56:00, 2023-09-30 00:06:00)。
  • 如果偏移量为 0,则记录分配给窗口 [2023-09-29 00:00:00, 2023-09-30 00:10:00)。
  • 如果偏移量为 4 MINUTE,则记录分配给窗口 [2023-09-29 23:54:00, 2023-09-30 00:04:00)。
  • 如果偏移量为 6 MINUTE,则记录分配给窗口 [2023-09-29 23:56:00, 2023-09-30 00:06:00)。
  • 如果偏移量为 16 MINUTE,则记录分配给窗口 [2023-09-29 23:56:00, 2023-09-30 00:06:00)。

我们可以发现,一些窗口偏移参数可能对窗口的分配具有相同的影响。在上述情况下,-16 分钟、-6 分钟和 4 分钟对于大小为 10 分钟的滚动窗口具有相同的效果。

窗口偏移的影响仅用于更新窗口分配,对水印没有影响。
目前 Flink(截至1.17版本) 不支持评估单个窗口表值函数,窗口表值函数应与聚合操作一起使用

1)、示例1-使用offset累积窗口查询、统计

此示例仅用于解释表值函数生成的语法和数据

# orders表结构和数据参考上文中的例子

# 带上offset的滚动窗口查询方式1
SELECT * FROM TABLE(
   TUMBLE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
   
# 带上offset的滚动窗口查询方式2,实际验证的结果是下面写法报不能识别OFFSET关键字
# 错误信息:org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'OFFSET' at line 6, column 6.
SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE orders,
     TIMECOL => DESCRIPTOR(proctime),
     SIZE => INTERVAL '10' MINUTES,
     OFFSET => INTERVAL '1' MINUTES));
     
# 滑动窗口带offset的计算
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
  GROUP BY window_start, window_end;

------------------------以下为验证过程----------------------------
Flink SQL> SELECT * FROM TABLE(
>    TUMBLE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op |                             id |                              price |                proctime |            window_start |              window_end |             window_time |
+----+--------------------------------+------------------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |                              1 |                              10.00 | 2023-09-19 15:36:32.623 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.623 |
| +I |                              2 |                              15.00 | 2023-09-19 15:36:32.623 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.623 |
| +I |                              3 |                              20.00 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                              4 |                              30.00 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                              5 |                              60.00 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                              6 |                             800.98 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                              7 |                             100.90 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                              8 |                              11.00 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                              9 |                              18.00 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                             10 |                             123.00 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                             11 |                              35.78 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                             12 |                              45.68 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                             13 |                              22.00 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                             14 |                              56.78 | 2023-09-19 15:36:32.624 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.624 |
| +I |                             15 |                              78.90 | 2023-09-19 15:36:32.625 | 2023-09-19 15:31:00.000 | 2023-09-19 15:41:00.000 | 2023-09-19 15:36:32.625 |

Flink SQL> SELECT * FROM TABLE(
>    TUMBLE(
>      DATA => TABLE orders,
>      TIMECOL => DESCRIPTOR(proctime),
>      SIZE => INTERVAL '10' MINUTES,
>      OFFSET => INTERVAL '1' MINUTES));
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'OFFSET' at line 6, column 6.

Flink SQL> SELECT window_start, window_end, SUM(price)
>   FROM TABLE(
>     TUMBLE(TABLE orders, DESCRIPTOR(proctime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
>   GROUP BY window_start, window_end;
+----+-------------------------+-------------------------+------------------------------------------+
| op |            window_start |              window_end |                                   EXPR$2 |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2023-09-19 15:41:00.000 | 2023-09-19 15:51:00.000 |                                  1428.02 |

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

2)、官方示例-使用offset累积窗口查询、统计(未验证)

-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
--  window table-valued function should be used with aggregate operation,
--  this example is just used for explaining the syntax and the data produced by table-valued function.
Flink SQL> SELECT * FROM TABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
Flink SQL> SELECT * FROM TABLE(
   TUMBLE(
     DATA => TABLE Bid,
     TIMECOL => DESCRIPTOR(bidtime),
     SIZE => INTERVAL '10' MINUTES,
     OFFSET => INTERVAL '1' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |     window_start |       window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+

-- apply aggregation on the tumbling windowed table
Flink SQL> SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
  GROUP BY window_start, window_end;
+------------------+------------------+-------+
|     window_start |       window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
| 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
+------------------+------------------+-------+

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

本文简单的介绍了Flink 的窗口函数及具体的示例。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/626899
推荐阅读
相关标签
  

闽ICP备14008679号