赞
踩
一、flink架构
二、Flink底层原理解析
三、Flink应用场景解析
四、fink入门案例解析
Apache Flink 是一个开源的流处理框架,用于处理无界和有界数据流。其底层原理复杂而精细,涉及到数据流模型、任务调度与执行、内存管理、容错机制等多个方面。本文是对 Flink 底层原理的详细分析,并尝试通过举例来说明这些原理。
提示:以下是本篇文章正文内容,下面案例可供参考
Flink是一个用于有状态并行数据流处理的分布式计算引擎,其运行时架构主要包括四个核心组件:作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager)以及分发器(Dispatcher)。以下是这些组件的详细功能介绍:
Apache Flink 是一个开源的流处理框架,用于处理无界和有界数据流。其底层原理复杂而精细,涉及到数据流模型、任务调度与执行、内存管理、容错机制等多个方面。以下是对 Flink 底层原理的详细分析,并尝试通过举例来说明这些原理。
核心概念:
假设我们有一个实时交易系统,需要统计每分钟的交易数量。在这个场景下,我们可以使用 Flink 的事件时间窗口来处理数据流。每个交易事件都会携带一个时间戳(即事件发生的时间),Flink 会根据这个时间戳将交易事件分配到对应的时间窗口中,并进行聚合计算。这样,即使交易事件因为网络延迟等原因没有立即到达系统,Flink 也能保证最终统计结果的准确性。
核心概念:
继续以实时交易系统为例。假设我们的系统需要处理大量的交易数据,并且希望尽快得到统计结果。在 Flink 中,我们可以将交易数据处理作业拆分成多个任务,并分配给多个 TaskManager 并行执行。每个 TaskManager 都会处理一部分交易数据,并生成相应的统计结果。最后,这些统计结果会被汇总起来,形成最终的统计报告。
核心概念:
在实时交易系统中,由于交易数据是持续不断地产生的,因此 Flink 需要高效地管理内存资源以避免内存溢出等问题。Flink 的分层内存管理系统允许开发者根据数据的特性和处理需求来合理地分配内存资源。例如,对于需要频繁访问的数据(如热点数据),可以将其存储在堆内存中以便快速访问;而对于不需要频繁访问的数据(如历史数据),则可以将其存储在堆外内存中以节省堆内存资源。
核心概念:
在实时交易系统中,如果某个 TaskManager 发生故障导致任务失败,那么 Flink 会利用检查点机制来恢复该任务的状态并继续执行。具体来说,Flink 会从最近的检查点中读取任务的状态信息,并将这些信息重新加载到新的 TaskManager 上。然后,新的 TaskManager 会从检查点之后的位置开始继续处理数据流。这样,即使发生了故障,Flink 也能保证数据的完整性和一致性。
总结
Apache Flink 的底层原理涉及多个方面,包括数据流模型、任务调度与执行、内存管理、容错机制等。这些原理共同构成了 Flink 强大的实时流处理能力。通过举例分析,我们可以看到 Flink 是如何在实际应用中处理数据流、调度任务、管理内存和保障容错的。这些特性使得 Flink 成为处理大规模实时数据流的理想选择。
Apache Flink 作为一个开源流处理框架,在实时数据处理领域有广泛的应用。以下是一些实际例子来说明 Flink 的应用场景和优势:
滚动窗口:窗口大小固定不变,同时窗口的移动距离和窗口大小相等
**如下操作全部都在node1上面执行:**
#1.建表
CREATE TEMPORARY TABLE source_table_tumble0 (
user_id BIGINT,
price BIGINT,
`timestamp` STRING,
pt AS PROCTIME()
) WITH (
'connector' = 'socket',
'hostname' = '192.168.88.161',
'port' = '9999',
'format' = 'csv'
);
#2.启动nc
nc -lk 9999
#3.SQL逻辑
select
user_id,
count(user_id) as pv,
sum(price) as sum_price
from source_table_tumble0
group by
user_id,tumble(pt, interval '10' second);
#1.创建source表
CREATE TEMPORARY TABLE source_table_tumble1 (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = '192.168.88.161',
'port' = '9999',
'format' = 'csv'
);
#2.启动nc
nc -lk 9999
#3.执行查询语句
select
user_id,
count(user_id) as pv,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000 as window_end
from source_table_tumble1
group by
user_id,
tumble(row_time, interval '5' second);
解释: window_start、window_end用来帮助查看窗口的开始和结束时间的,字段数据的表达式是固定写法,单位是毫秒。
一、窗口的开始时间
窗口的开始时间,与第一条数据的时间相关
计算公式 = 第一条数据的时间 - (第一条数据的时间 % 窗口大小)
二、窗口的结束时间
窗口的结束时间,与窗口的开始时间和窗口大小有关
计算公式= 窗口的开始时间 + 窗口大小 - 1毫秒
三、窗口计算的触发时间点
触发时间,也就是窗口内部的数据被进行计算的时间点。窗口什么时候结束,那么就什么时候触发窗口内数据的计算操作
四、以案例给大家进行演示
第一个窗口:
窗口的开始时间 = 1000 - (1000 % 5000) = 1000 - 1000 = 0
窗口的结束时间 = 0 + 5000 - 1 = 4999
窗口的时间范围 = [0, 4999] = [0, 5000)
窗口的触发时间 = 5000
第二个窗口:
窗口的开始时间 = 5000 - (5000 % 5000) = 5000 - 0 = 5000
窗口的结束时间 = 5000 + 5000 - 1 = 9999
窗口的时间范围 = [5000, 9999] = [5000, 10000)
窗口的触发时间 = 10000
滑动窗口的分类
场景1: 相邻的滑动窗口间有重叠的部分,有部分数据被重复计算的情况。滑动窗口的主要使用场景
场景2: 相邻的滑动窗口间既没有重叠,也没有空隙。这种就是滚动窗口
场景3: 相邻的滑动窗口间有空隙,这种情况会导致部分数据得不到计算,也就是有数据丢失情况。实际工作中不允许出现。
--0.语法
格式: hop(事件时间字段名称, 滑动距离, 窗口大小)
示例: hop(row_time, interval '2' SECOND, interval '5' SECOND)
滑动距离: 可以理解为多久对窗口内的数据执行一次计算
--1.创建表
CREATE TEMPORARY TABLE source_table_hop1 (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = '172.24.24.49',
'port' = '9999',
'format' = 'csv'
);
--2.查询的SQL
SELECT user_id,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(hop_end(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM source_table_hop1
GROUP BY user_id
, hop(row_time, interval '2' SECOND, interval '5' SECOND);
注意: hostname要改成自己的阿里云ECS服务器的内网IP
--3.在你自己的阿里云ECS服务器上启动nc
nc -lk 9999
--0.语法
--1.创建表
CREATE TEMPORARY TABLE source_table_session (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);
---2.执行SQL
SELECT
user_id,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(session_end(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM source_table_session
GROUP BY user_id
, session(row_time, interval '5' SECOND);
--1.创建表
CREATE TEMPORARY TABLE source_table_over_time (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
--2.执行SQL
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近1小时内的数据
RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table_over_time;
--3.和Hive中的over函数写法类似,只是在over里面多了时间的条件
--1.创建表
CREATE TEMPORARY TABLE source_table_over_rows (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '2',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
--2.执行SQL
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 5 行数据
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table_over_rows;
--2.根据行号聚合,和上面的根据时间聚合类似,也和Hive中的over函数类似。只是添加了行号的条件
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。