赞
踩
Flink已经将事件时间作为默认的时间语义了。
使用数据的时间作为逻辑时间,让计算脱离系统时间。
水位线是Flink中用于处理乱序数据或迟到数据的一个机制。
在流式计算环境中,由于网络和反压等原因,数据可能会以乱序的方式到达。为了解决这个问题,Flink引入了水位线的概念。水位线不仅可以用来处理乱序数据,还可以与窗口一起使用,以控制窗口的关闭时间。通过水位线,Flink能够在一定程度上保证数据的顺序性和完整性,从而提高流处理的准确性和可靠性。
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
假如存在一个时间长度为10秒的滚动窗口,正常情况下会在时间戳是10s的数据到来后窗口触发计算,但是考虑到网络延迟问题,有可能存在10秒的数据比8秒的数据先行到达,因此我们将当前水位线在数据时间戳的基础上增加延迟,来保证数据的不丢失。
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。
1)按照驱动类型分
窗口已经关闭,延迟很久的数据到来时,通常不会导致窗口的多次计算。窗口计算是在窗口的结束时间到达时进行的,一旦窗口关闭,其相关的计算和数据处理就已完成。
Flink 提供了对于迟到数据的处理策略,如丢弃、更新窗口计算结果等。如果配置了允许处理迟到数据的策略,那么这些延迟的数据可能会被用来更新已经计算的结果,但这并不是重新计算整个窗口,而是对已有结果的增量更新。
水位线用于标识事件时间流的进度,它告知系统事件时间的进展情况,进而确定哪些数据可以被用于触发计算。设置水位线延迟时间是为了解决事件时间处理过程中的乱序数据到达的问题。通过设置一个时间戳或周期性生成水位线,Flink 可以推断出在该水位线之前的事件都已经到达。水位线延迟时间是为了容忍一定程度的乱序,以确保窗口的正确触发。一般来说,设置较大的延迟时间可以提高系统的容错性,但可能会增加计算的延迟。
在事件时间窗口中,某些数据可能会因为延迟到达而被归类为迟到数据。Flink 提供了对于迟到数据的处理策略,例如丢弃、更新窗口计算结果等。允许窗口处理迟到数据的主要目的是在窗口到达结束时间时(此时窗口未关闭),仍然可以考虑到迟到数据的计算结果。这样可以提高计算的准确性和完整性。
区别:
1.水位线延迟时间是用来控制事件时间处理的乱序程度,它影响到事件被认定为迟到数据的时间点;
2.窗口延迟时间则是决定在窗口到达结束时间后是否继续接收并计算迟到数据,直到延迟时间结束。
联系:
1.水位线延迟时间的设置会影响到迟到数据的多少,这就影响到了窗口延迟时间
如果水位线延迟时间较短,即系统会更快地将水位线向前推进,那么在窗口关闭之前到达的迟到数据可能会被视为超时数据而被丢弃。这样可以确保结果的实时性,但可能会导致一部分迟到数据无法参与计算,从而影响计算的完整性。
相反,如果水位线延迟时间较长,即系统允许更多的乱序和迟到数据,那么更多的数据有机会被纳入窗口计算。这样可以提高计算的准确性和完整性,但也会增加计算的延迟
在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个字段,通过 WATERMARK语句来定义事件时间属性。
WATERMARK 语句主要用来定义水位线(watermark)的生成表达式,这个表达式会将带有事件时间戳的字段标记为事件时间属性,并在它基础上给出水位线的延迟时间。
具体定义方式如下:
CREATE TABLE EventTable(
user STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟。这里的“5 秒”是以“时间间隔”的形式定义的,格式是 INTERVAL <数值> <时间单位>:INTERVAL '5’ SECOND,这里的数值必须用单引号引起来,而单位用 SECOND 和 SECONDS 是等效的。
Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ。这里TIMESTAMP_LTZ 是指带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE);
如数据中的时间戳是“年-月-日-时-分-秒”形式,那就是不带时区信息的,可以将事件时间属性定义为 TIMESTAMP 类型。
而如果原始的时间戳就是一个长整型的毫秒数,这时就需要另外定义一个字段来表示事件时间属性,类型定义为 TIMESTAMP_LTZ 会更方便:
CREATE TABLE events (
user STRING,
url STRING,
ts BIGINT,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND
) WITH (
...
);
这里我们另外定义了一个字段 ts_ltz,是把长整型的 ts 转换为 TIMESTAMP_LTZ 得到的;进而使用 WATERMARK 语句将它设为事件时间属性,并设置 5 秒的水位线延迟。
flink从1.13开始,提供了时间窗口聚合计算的TVF语法。
滚动窗口(Tumbling Windows)
滑动窗口(Hop Windows,跳跃窗口)
累积窗口(Cumulate Windows)
在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:窗口起始点(window_start)、窗口结束点(window_end)、窗口时间(window_time)。窗口时间指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口中包含数据的最大时间戳 。
滚动窗口有固定的大小,是一种对数据进行均匀切片
的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。
TUMBLE (TABLE EventTable,descriptor(时间属性字段),INTERVAL '10' SECOND[ 窗口长度 ] )
在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。
TUMBLE 函数根据时间属性字段为关系的每一行分配一个窗口。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。TUMBLE 的返回值是一个新关系,包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是window TVF 之后的常规timestamp 列。
TUMBLE 函数三个必需参数,一个可选参数:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size [ offset ])
# data:是一个表参数,可以是与时间属性列的任何关系。
# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到tumbling windows。
# size:是指定tumbling windows宽度的持续时间。
# offset:是一个可选参数,用于指定窗口开始移动的偏移量。
-----------------------------------
- Flink SQL> SELECT * FROM Bid;
- +------------------+-------+------+
- | bidtime | price | item |
- +------------------+-------+------+
- | 2020-04-15 08:05 | 4.00 | C |
- | 2020-04-15 08:07 | 2.00 | A |
- | 2020-04-15 08:09 | 5.00 | D |
- | 2020-04-15 08:11 | 3.00 | B |
- | 2020-04-15 08:13 | 1.00 | E |
- | 2020-04-15 08:17 | 6.00 | F |
- +------------------+-------+------+
-
- Flink SQL> SELECT * FROM TABLE(
- TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
- -- or with the named params
- -- note: the DATA param must be the first
- Flink SQL> SELECT * FROM TABLE(
- TUMBLE(
- DATA => TABLE Bid,
- TIMECOL => DESCRIPTOR(bidtime),
- SIZE => INTERVAL '10' MINUTES));
- +------------------+-------+------+------------------+------------------+-------------------------+
- | bidtime | price | item | window_start | window_end | window_time |
- +------------------+-------+------+------------------+------------------+-------------------------+
- | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- +------------------+-------+------+------------------+------------------+-------------------------+
-
- -- apply aggregation on the tumbling windowed table
- Flink SQL> SELECT window_start, window_end, SUM(price)
- FROM TABLE(
- TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
- GROUP BY window_start, window_end;
- +------------------+------------------+-------+
- | window_start | window_end | price |
- +------------------+------------------+-------+
- | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
- | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
- +------------------+------------------+-------+
滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大小(size)和滑动步长(slide)两个参数
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));
基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)
HOP (TABLE t_action,descriptor(时间属性字段),INTERVAL '5' SECONDS[ 滑动步长 ] , INTERVAL '10' SECOND[ 窗口长度 ] )
HOP 函数分配在大小间隔内覆盖行的窗口,并根据时间属性字段移动每slide 。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。HOP 的返回值是一个新关系,它包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是 windowing TVF 后的常规时间戳列。
HOP 采用四个必需参数,一个可选参数:
复制
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
# data:是一个表参数,可以是与时间属性列的任何关系。
# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到hopping windows。
# slide:是指定顺序hopping windows开始之间的持续时间
# size :是指定hopping windows宽度的持续时间。
# offset :是一个可选参数,用于指定窗口开始移动的偏移量。
- > SELECT * FROM TABLE(
- HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
- -- or with the named params
- -- note: the DATA param must be the first
- > SELECT * FROM TABLE(
- HOP(
- DATA => TABLE Bid,
- TIMECOL => DESCRIPTOR(bidtime),
- SLIDE => INTERVAL '5' MINUTES,
- SIZE => INTERVAL '10' MINUTES));
- +------------------+-------+------+------------------+------------------+-------------------------+
- | bidtime | price | item | window_start | window_end | window_time |
- +------------------+-------+------+------------------+------------------+-------------------------+
- | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
- | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
- | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
- | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
- +------------------+-------+------+------------------+------------------+-------------------------+
-
- -- apply aggregation on the hopping windowed table
- > SELECT window_start, window_end, SUM(price)
- FROM TABLE(
- HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
- GROUP BY window_start, window_end;
- +------------------+------------------+-------+
- | window_start | window_end | price |
- +------------------+------------------+-------+
- | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
- | 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
- | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
- | 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
- +------------------+------------------+-------+
累积窗口是窗口 TVF 中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度就是统计周期,最终目的就是统计这段时间内的数据
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度
—————
CUMULATE (TABLE t_action,descriptor(时间属性字段),INTERVAL '5' SECONDS[ 更新最大步长 ] , INTERVAL '10' SECOND[ 窗口最大长度 ] )
例如,您可以有一个 1 小时步长和 1 天最大大小的累积窗口,您将获得每天的窗口:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) 。
累积(CUMULATE)函数根据时间属性列分配窗口。在流式处理模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。CUMULATE 的返回值是一个新关系,包括原始关系的所有列以及名为“window_start”、“window_end”、“window_time”的另外 3 列,以指示分配的窗口。原始时间属性“timecol”将是窗口 TVF 之后的常规时间戳列。
CUMULATE 采用四个必需参数,一个可选参数:
复制
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
# data:是一个表参数,可以是与时间属性列的任何关系。
# timecol:是一个列描述符,指示数据的哪些时间属性列应映射到累积窗口。
# step :是指定顺序累积窗口结束之间增加的窗口大小的持续时间。
# size :是指定累积窗口的最大宽度的持续时间。大小必须是步长的整数倍。
# offset :是一个可选参数,用于指定窗口开始移动的偏移量。
-----------------------------------
- SELECT * FROM TABLE(
- CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
- -- or with the named params
- -- note: the DATA param must be the first
- SELECT * FROM TABLE(
- CUMULATE(
- DATA => TABLE Bid,
- TIMECOL => DESCRIPTOR(bidtime),
- STEP => INTERVAL '2' MINUTES,
- SIZE => INTERVAL '10' MINUTES));
- +------------------+-------+------+------------------+------------------+-------------------------+
- | bidtime | price | item | window_start | window_end | window_time |
- +------------------+-------+------+------------------+------------------+-------------------------+
- | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
- | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
- | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
- | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
- | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
- | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
- | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
- | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
- | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
- +------------------+-------+------+------------------+------------------+-------------------------+
-
- -- apply aggregation on the cumulating windowed table
- SELECT window_start, window_end, SUM(price)
- FROM TABLE(
- CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
- GROUP BY window_start, window_end;
- +------------------+------------------+-------+
- | window_start | window_end | price |
- +------------------+------------------+-------+
- | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
- | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
- | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
- | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
- | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
- | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
- | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
- | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
- +------------------+------------------+-------+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。