赞
踩
一、Watermark机制
二、Watermark机制原理
三、Watermark机制的应用场景
四、Watermark机制的核心算法
五、Watermark机制的优化策略
六、SQL - 演示Watermark
七、侧输出流,常见面试题
Watermark机制,在Apache Flink这样的流处理框架中,扮演着至关重要的角色,特别是在处理事件时间(Event Time)窗口时,它能够有效解决数据乱序和延迟到达的问题。以下是对Watermark机制的详细解析,涵盖其定义、原理、应用场景、核心算法以及优化策略等方面
提示:以下是本篇文章正文内容,下面案例可供参考
Watermark是Apache Flink中提出的一种时间戳机制,用于衡量事件时间(Event Time)的进展,并解决流处理中数据乱序和延迟到达的问题。在Flink中,Watermark被看作是一种特殊的数据元素,其主要内容是一个时间戳,表示当前事件时间的“水位线”,即所有时间戳小于等于该Watermark值的事件都应该已经到达了系统。
Watermark的作用:用来处理【在一段时间内】延迟或者乱序到来的数据。它就是用来处理延迟的数据。可以叫做水印、水位线。
事件时间、处理时间与Watermark
事件时间(Event Time):数据产生的时间,是流处理系统中最准确的时间标准,但由于网络延迟等原因,可能难以实时获取。
处理时间(Processing Time):数据到达流处理系统并开始处理的时间,是一个可靠但可能与事件时间存在差异的时间标准。
Watermark:一种特殊的时间戳,表示数据流中的一种进度,即所有时间戳小于等于该Watermark值的事件都应该已经到达了系统。
Watermark机制的核心在于,通过不断生成并推进Watermark,来驱动事件时间窗口的关闭和计算。当Watermark的时间戳超过了窗口的结束时间,窗口就会被认为是完整的,并触发窗口计算。
Watermark的生成策略
Watermark的生成策略是Flink中用于生成Watermark的算法,它决定了Watermark的生成时机和值。常见的Watermark生成策略包括:
固定延迟策略:根据当前处理的最大事件时间减去一个固定的延迟时间(如5秒、10秒等)来生成Watermark。这种策略简单直观,但可能无法很好地适应不同场景下的数据乱序情况。
基于时间戳分布的策略:通过分析事件时间戳的分布情况,动态调整Watermark的生成。例如,可以计算最近一段时间内事件时间戳的滑动平均值或中位数,并据此生成Watermark。这种策略更加灵活,但需要更多的计算资源。
Watermark机制在Flink流处理中有着广泛的应用场景,主要包括:
窗口计算
在Flink中,窗口是处理流数据的一种基本模式。通过定义时间窗口(如滚动窗口、滑动窗口等),可以对窗口内的数据进行聚合、过滤等操作。然而,由于数据乱序和延迟到达的问题,窗口的关闭和计算时机往往难以确定。Watermark机制通过不断生成并推进Watermark,为窗口的关闭和计算提供了可靠的依据。
精确计数与状态清理
基于Watermark的事件时间处理能够更准确地计算窗口结果,并在窗口结束后及时清理状态,避免状态无限增长。这对于处理大规模数据流、提高系统性能和稳定性具有重要意义。
乱序容忍
Watermark机制允许一定程度的数据乱序,只要乱序的数据在其对应窗口关闭之前到达即可。这种机制提高了系统的容错性和灵活性,使得Flink能够处理更加复杂和多样化的数据流。
Watermark机制的核心算法主要包括Watermark的生成和推进两个方面。
Watermark的生成
Watermark的生成通常依赖于事件时间戳的提取和Watermark生成策略的应用。在Flink中,可以通过实现TimestampAssigner接口来从事件中提取时间戳,并通过实现WatermarkGenerator接口来生成Watermark。Watermark生成策略决定了Watermark的生成时机和值,通常根据当前处理的最大事件时间和预设的延迟时间来计算。
Watermark的推进
Watermark的推进是指随着新事件的到来,不断生成新的Watermark并更新当前的水位线。在Flink中,Watermark的推进是自动进行的,当新事件的时间戳大于当前Watermark的时间戳时,就会触发Watermark的生成和推进。同时,Flink还提供了周期性生成Watermark的机制,即每隔一定时间(如200毫秒)就检查并生成新的Watermark,以确保Watermark的及时性和准确性。
为了进一步提高Watermark机制的性能和准确性,可以采取以下优化策略:
合理设置延迟时间
延迟时间的设置对Watermark机制的性能和准确性有着重要影响。延迟时间设置过短可能导致窗口频繁触发计算而浪费资源;延迟时间设置过长则可能导致窗口计算结果延迟。因此,需要根据实际业务场景和数据乱序情况合理设置延迟时间。
优化事件时间戳的提取
事件时间戳的提取是Watermark生成的基础。为了提高Watermark的准确性和及时性,需要优化事件时间戳的提取逻辑,确保能够准确、快速地从事件中提取出时间戳。
--0.环境准备
如果没有启动flink集群的,需要先启动flink集群,命令如下:
cd /export/server/flink/bin
./start-cluster.sh
flink集群启动后,进入sql-client,命令如下:
cd /export/server/flink/bin
./sql-client.sh
--1.创建表
CREATE TABLE source_table_watermark1 (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);
--2.SQL查询
select
user_id,
count(*) as pv,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000 as window_end
from source_table_watermark1
group by
user_id,
tumble(row_time, interval '5' second);
--3.启动nc
nc -lk 9999
演示截图:
--1.创建表
CREATE TEMPORARY TABLE source_table_watermark2 (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '2' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);
--2.说明
watermark for row_time as row_time - interval '2' second
interval '2' second的含义:数据允许两秒延迟到达
--3.查询SQL
select
user_id,
count(*) as pv,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000 as window_end
from source_table_watermark2
group by
user_id,
tumble(row_time, interval '5' second);
演示截图:
1- 虽然watermark允许迟到2秒,但是1001,13,13在允许延迟的情况下延迟了太长时间了,导致最终还是被丢失掉了。
2- 1001,11,11相对1001,12,12是延迟和乱序到来的,但是在允许延迟的时间范围内到达的,因此也能够得到计算
拿到业务方给到的数据之后,需要根据如下的分析来设置Watermark
1- 询问业务方你们的数据绝大多数情况下会延迟多久到来
2- 可以通过模拟Kafka的消费者,去消费一批数据,统计数据延迟多久到来
3- 初步设置完Watermark之后,需要和业务方/领导确认,设置的大小是否合适
没有绝对的标准,需要何业务反复沟通
大概思路,例如窗口大小是5秒,那么一般情况下watermark设置为窗口的1-2倍即可。
原理图:
对应的面试题:Flink如何处理延迟到来的数据?
1、Watermarks(水位线):水位线是 Flink 中用于标识事件时间进展的机制。水位线表示事件时间的进度,任务会根据水位线来触发窗口计算。通过设置适当的水位线,可以容忍一定程度的乱序和延迟。
2、窗口的处理机制:Flink 的窗口操作对处理延迟数据提供了很好的支持。窗口会根据水位线来划分时间,一旦水位线达到窗口的结束时间,窗口就会被触发。这样可以确保即使数据到达的顺序是乱序的,也能在合适的时机触发窗口操作。
3、Allowed Lateness(允许延迟):Flink 允许在窗口关闭后继续接受延迟到达的数据。这可以通过 allowedLateness 方法进行配置。允许延迟的窗口在一定的时间范围内保持开放状态,接受延迟数据,并在最终关闭后进行最终的计算。
4、侧输出(Side Output):使用侧输出,可以将延迟的数据发送到一个额外的流中,以便单独处理。这样可以灵活地处理延迟数据,而不影响主要的窗口计算逻辑。
5、定时器和处理函数:Flink 支持在 Keyed Stream 上注册定时器,可以使用定时器来处理延迟的事件。在定时器触发时,可以执行自定义的处理逻辑,例如发出警告或重新触发窗口计算。
6、Out-of-Order Execution(乱序执行):Flink 的数据流引擎允许在一定程度上乱序执行事件,这有助于处理延迟到达的数据。通过配置执行延迟来控制乱序执行的程度,可以通过 ExecutionConfig 的 setAutoWatermarkInterval 方法进行设置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。