当前位置:   article > 正文

Flink-Watermark机制详解:(第39天)_flinksql watermark 测试

flinksql watermark 测试

系列文章目录

一、Watermark机制
二、Watermark机制原理
三、Watermark机制的应用场景
四、Watermark机制的核心算法
五、Watermark机制的优化策略
六、SQL - 演示Watermark
七、侧输出流,常见面试题


前言

Watermark机制,在Apache Flink这样的流处理框架中,扮演着至关重要的角色,特别是在处理事件时间(Event Time)窗口时,它能够有效解决数据乱序和延迟到达的问题。以下是对Watermark机制的详细解析,涵盖其定义、原理、应用场景、核心算法以及优化策略等方面


提示:以下是本篇文章正文内容,下面案例可供参考

一、Watermark机制

Watermark是Apache Flink中提出的一种时间戳机制,用于衡量事件时间(Event Time)的进展,并解决流处理中数据乱序和延迟到达的问题。在Flink中,Watermark被看作是一种特殊的数据元素,其主要内容是一个时间戳,表示当前事件时间的“水位线”,即所有时间戳小于等于该Watermark值的事件都应该已经到达了系统。
Watermark的作用:用来处理【在一段时间内】延迟或者乱序到来的数据。它就是用来处理延迟的数据。可以叫做水印、水位线。

二、Watermark机制原理

  1. 事件时间、处理时间与Watermark
    事件时间(Event Time):数据产生的时间,是流处理系统中最准确的时间标准,但由于网络延迟等原因,可能难以实时获取。
    处理时间(Processing Time):数据到达流处理系统并开始处理的时间,是一个可靠但可能与事件时间存在差异的时间标准。
    Watermark:一种特殊的时间戳,表示数据流中的一种进度,即所有时间戳小于等于该Watermark值的事件都应该已经到达了系统。
    Watermark机制的核心在于,通过不断生成并推进Watermark,来驱动事件时间窗口的关闭和计算。当Watermark的时间戳超过了窗口的结束时间,窗口就会被认为是完整的,并触发窗口计算。

  2. Watermark的生成策略
    Watermark的生成策略是Flink中用于生成Watermark的算法,它决定了Watermark的生成时机和值。常见的Watermark生成策略包括:

固定延迟策略:根据当前处理的最大事件时间减去一个固定的延迟时间(如5秒、10秒等)来生成Watermark。这种策略简单直观,但可能无法很好地适应不同场景下的数据乱序情况。
基于时间戳分布的策略:通过分析事件时间戳的分布情况,动态调整Watermark的生成。例如,可以计算最近一段时间内事件时间戳的滑动平均值或中位数,并据此生成Watermark。这种策略更加灵活,但需要更多的计算资源。

三、Watermark机制的应用场景

Watermark机制在Flink流处理中有着广泛的应用场景,主要包括:

  1. 窗口计算
    在Flink中,窗口是处理流数据的一种基本模式。通过定义时间窗口(如滚动窗口、滑动窗口等),可以对窗口内的数据进行聚合、过滤等操作。然而,由于数据乱序和延迟到达的问题,窗口的关闭和计算时机往往难以确定。Watermark机制通过不断生成并推进Watermark,为窗口的关闭和计算提供了可靠的依据。

  2. 精确计数与状态清理
    基于Watermark的事件时间处理能够更准确地计算窗口结果,并在窗口结束后及时清理状态,避免状态无限增长。这对于处理大规模数据流、提高系统性能和稳定性具有重要意义。

  3. 乱序容忍
    Watermark机制允许一定程度的数据乱序,只要乱序的数据在其对应窗口关闭之前到达即可。这种机制提高了系统的容错性和灵活性,使得Flink能够处理更加复杂和多样化的数据流。

四、Watermark机制的核心算法

Watermark机制的核心算法主要包括Watermark的生成和推进两个方面。

  1. Watermark的生成
    Watermark的生成通常依赖于事件时间戳的提取和Watermark生成策略的应用。在Flink中,可以通过实现TimestampAssigner接口来从事件中提取时间戳,并通过实现WatermarkGenerator接口来生成Watermark。Watermark生成策略决定了Watermark的生成时机和值,通常根据当前处理的最大事件时间和预设的延迟时间来计算。

  2. Watermark的推进
    Watermark的推进是指随着新事件的到来,不断生成新的Watermark并更新当前的水位线。在Flink中,Watermark的推进是自动进行的,当新事件的时间戳大于当前Watermark的时间戳时,就会触发Watermark的生成和推进。同时,Flink还提供了周期性生成Watermark的机制,即每隔一定时间(如200毫秒)就检查并生成新的Watermark,以确保Watermark的及时性和准确性。

五、Watermark机制的优化策略

为了进一步提高Watermark机制的性能和准确性,可以采取以下优化策略:

  1. 合理设置延迟时间
    延迟时间的设置对Watermark机制的性能和准确性有着重要影响。延迟时间设置过短可能导致窗口频繁触发计算而浪费资源;延迟时间设置过长则可能导致窗口计算结果延迟。因此,需要根据实际业务场景和数据乱序情况合理设置延迟时间。

  2. 优化事件时间戳的提取
    事件时间戳的提取是Watermark生成的基础。为了提高Watermark的准确性和及时性,需要优化事件时间戳的提取逻辑,确保能够准确、快速地从事件中提取出时间戳。

六、SQL - 演示Watermark

1. SQL - 演示Watermark为零的情况

--0.环境准备
如果没有启动flink集群的,需要先启动flink集群,命令如下:
cd /export/server/flink/bin
./start-cluster.sh

flink集群启动后,进入sql-client,命令如下:
cd /export/server/flink/bin
./sql-client.sh

--1.创建表
CREATE TABLE source_table_watermark1 ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1', 
  'port' = '9999',
  'format' = 'csv'
);


--2.SQL查询
select 
user_id,
count(*) as pv,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000  as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000  as window_end
from source_table_watermark1
group by
    user_id,
    tumble(row_time, interval '5' second);
    
--3.启动nc
nc -lk 9999
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

演示截图:
在这里插入图片描述

2. SQL - 演示Watermark不为零的情况

--1.创建表
CREATE TEMPORARY TABLE source_table_watermark2 ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '2' second
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1', 
  'port' = '9999',
  'format' = 'csv'
);


--2.说明
 watermark for row_time as row_time - interval '2' second
 interval '2' second的含义:数据允许两秒延迟到达
 

--3.查询SQL
select 
user_id,
count(*) as pv,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000  as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000  as window_end
from source_table_watermark2
group by
    user_id,
    tumble(row_time, interval '5' second);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

演示截图:
在这里插入图片描述

1- 虽然watermark允许迟到2秒,但是1001,13,13在允许延迟的情况下延迟了太长时间了,导致最终还是被丢失掉了。
2- 1001,11,11相对1001,12,12是延迟和乱序到来的,但是在允许延迟的时间范围内到达的,因此也能够得到计算
  • 1
  • 2
  • 工作中设置Watermark
    实际工作中更多的情况是数据是乱序或者延迟到来的,在工作中如何进行Watermark的设置?
拿到业务方给到的数据之后,需要根据如下的分析来设置Watermark
1- 询问业务方你们的数据绝大多数情况下会延迟多久到来
2- 可以通过模拟Kafka的消费者,去消费一批数据,统计数据延迟多久到来
3- 初步设置完Watermark之后,需要和业务方/领导确认,设置的大小是否合适
没有绝对的标准,需要何业务反复沟通
大概思路,例如窗口大小是5秒,那么一般情况下watermark设置为窗口的1-2倍即可。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

七、侧输出流

原理图:
在这里插入图片描述
对应的面试题:Flink如何处理延迟到来的数据?

1、Watermarks(水位线):水位线是 Flink 中用于标识事件时间进展的机制。水位线表示事件时间的进度,任务会根据水位线来触发窗口计算。通过设置适当的水位线,可以容忍一定程度的乱序和延迟。

2、窗口的处理机制:Flink 的窗口操作对处理延迟数据提供了很好的支持。窗口会根据水位线来划分时间,一旦水位线达到窗口的结束时间,窗口就会被触发。这样可以确保即使数据到达的顺序是乱序的,也能在合适的时机触发窗口操作。

3、Allowed Lateness(允许延迟):Flink 允许在窗口关闭后继续接受延迟到达的数据。这可以通过 allowedLateness 方法进行配置。允许延迟的窗口在一定的时间范围内保持开放状态,接受延迟数据,并在最终关闭后进行最终的计算。

4、侧输出(Side Output):使用侧输出,可以将延迟的数据发送到一个额外的流中,以便单独处理。这样可以灵活地处理延迟数据,而不影响主要的窗口计算逻辑。

5、定时器和处理函数:Flink 支持在 Keyed Stream 上注册定时器,可以使用定时器来处理延迟的事件。在定时器触发时,可以执行自定义的处理逻辑,例如发出警告或重新触发窗口计算。

6、Out-of-Order Execution(乱序执行):Flink 的数据流引擎允许在一定程度上乱序执行事件,这有助于处理延迟到达的数据。通过配置执行延迟来控制乱序执行的程度,可以通过 ExecutionConfig 的 setAutoWatermarkInterval 方法进行设置。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/986336
推荐阅读
相关标签
  

闽ICP备14008679号