当前位置:   article > 正文

Flink底层原理解析:案例解析(第37天)_fink使用例子

fink使用例子

系列文章目录


一、flink架构
二、Flink底层原理解析
三、Flink应用场景解析
四、fink入门案例解析


前言

Apache Flink 是一个开源的流处理框架,用于处理无界和有界数据流。其底层原理复杂而精细,涉及到数据流模型、任务调度与执行、内存管理、容错机制等多个方面。本文是对 Flink 底层原理的详细分析,并尝试通过举例来说明这些原理。


提示:以下是本篇文章正文内容,下面案例可供参考

一、flink架构

在这里插入图片描述
在这里插入图片描述

Flink是一个用于有状态并行数据流处理的分布式计算引擎,其运行时架构主要包括四个核心组件:作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager)以及分发器(Dispatcher)。以下是这些组件的详细功能介绍:

1. 作业管理器(JobManager)

  • 功能:作业管理器是单个应用程序的主线程,每个应用程序都有一个单独的JobManager进行控制。它负责接收并执行应用程序,这些应用程序通常包含作业图(JobGraph)、逻辑数据流图(logical dataflow graph)以及一个打包了所有类、库和其他资源的JAR包。
  • 作用:JobManager会将JobGraph转换成物理层面的数据流图,即执行图(Execution Graph),这个图包含了所有可以并发执行的任务。JobManager还会向ResourceManager请求执行任务所需的资源(即TaskManager中的插槽),一旦获取到足够的资源,就会将执行图分发到TaskManager上执行。同时,JobManager还负责所有需要中央协调的操作,如检查点(checkpoint)的协调。

2. 资源管理器(ResourceManager)

  • 功能:资源管理器负责管理TaskManager的插槽(slot),slot是Flink中定义的处理资源的最小单元。Flink为不同的环境和资源管理工具提供了不同的资源管理器,如YARN、Mesos、Kubernetes以及standalone部署。
  • 作用:当JobManager申请slot资源时,ResourceManager会将有空闲的TaskManager分配给JobManager。如果ResourceManager没有足够的slot来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。

3. 任务管理器(TaskManager)

  • 功能:任务管理器是Flink的工作进程,负责执行JobManager分配的任务。Flink集群中通常会运行多个TaskManager进程,每个TaskManager都包含一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。
  • 作用:TaskManager启动后会向ResourceManager注册它的插槽,并在收到ResourceManager的指令后,向JobManager提供一个或多个插槽资源。JobManager随后会将任务分配到这些插槽中执行。在运行过程中,同一个应用程序中不同的TaskManager进程可以进行数据交换。

4. 分发器(Dispatcher)

  • 功能:分发器可以跨作业运行,并为应用程序提供了REST接口。它的主要作用是在应用被提交执行时,分发并将应用移交给一个JobManager。
  • 作用:Dispatcher还会启动一个Web UI,用于方便地展示和监控作业的执行。然而,Dispatcher在架构中可能并不是必须的,这取决于应用提交运行的方式。
    综上所述,Flink的四大组件在运行时协同工作,共同管理流应用程序的执行。每个组件都承担着特定的功能和作用,确保了Flink能够高效地处理数据流。

二、Flink底层原理解析

Apache Flink 是一个开源的流处理框架,用于处理无界和有界数据流。其底层原理复杂而精细,涉及到数据流模型、任务调度与执行、内存管理、容错机制等多个方面。以下是对 Flink 底层原理的详细分析,并尝试通过举例来说明这些原理。

1. 数据流模型

核心概念:

  • 事件时间(Event Time):基于事件本身的时间戳进行处理,适用于有时间顺序的数据流。这意味着即使数据因为网络延迟等原因到达系统的时间不一致,Flink 也会根据事件的时间戳来重新排序并处理数据。
  • 处理时间(Processing Time):基于数据处理开始或结束的时间进行处理,适用于无明确时间顺序的数据流。这种处理方式较为简单,但可能无法准确反映数据的实际顺序。
  • 窗口(Window):将连续事件划分为时间片或数据片进行聚合分析。窗口是 Flink 中处理数据流的关键机制之一,它允许开发者定义时间窗口(如滚动窗口、滑动窗口等)来对数据进行聚合操作。

1.1 例1

假设我们有一个实时交易系统,需要统计每分钟的交易数量。在这个场景下,我们可以使用 Flink 的事件时间窗口来处理数据流。每个交易事件都会携带一个时间戳(即事件发生的时间),Flink 会根据这个时间戳将交易事件分配到对应的时间窗口中,并进行聚合计算。这样,即使交易事件因为网络延迟等原因没有立即到达系统,Flink 也能保证最终统计结果的准确性。

2. 任务调度与执行

核心概念:

  • 任务调度器:Flink 使用基于时间的调度器来调度和执行任务。调度器会根据任务的依赖关系和资源可用性来动态地分配任务到不同的 TaskManager 上执行。
  • 并行执行:Flink 支持多任务并行执行,以提高处理速度和吞吐量。在 Flink 中,一个作业(Job)会被拆分成多个任务(Task),每个任务可以在不同的 TaskManager 上并行执行。

2.1 例2

继续以实时交易系统为例。假设我们的系统需要处理大量的交易数据,并且希望尽快得到统计结果。在 Flink 中,我们可以将交易数据处理作业拆分成多个任务,并分配给多个 TaskManager 并行执行。每个 TaskManager 都会处理一部分交易数据,并生成相应的统计结果。最后,这些统计结果会被汇总起来,形成最终的统计报告。

3. 内存管理

核心概念:

  • 分层内存管理系统:Flink 采用了分层内存管理系统来确保各个层次的内存使用合理。这包括堆内存(Heap Memory)和堆外内存(Off-heap Memory)等不同的内存区域。
  • 垃圾回收:Flink 会进行定期的垃圾回收操作,以释放不再使用的内存资源。这有助于防止内存泄漏问题,并提高系统的稳定性和性能。

3.1 例3

在实时交易系统中,由于交易数据是持续不断地产生的,因此 Flink 需要高效地管理内存资源以避免内存溢出等问题。Flink 的分层内存管理系统允许开发者根据数据的特性和处理需求来合理地分配内存资源。例如,对于需要频繁访问的数据(如热点数据),可以将其存储在堆内存中以便快速访问;而对于不需要频繁访问的数据(如历史数据),则可以将其存储在堆外内存中以节省堆内存资源。

4. 容错机制

核心概念:

  • 检查点(Checkpoint):Flink 通过周期性地保存作业的状态到持久化存储中来实现容错。当系统发生故障时,Flink 可以从最近的检查点恢复作业的状态并继续执行。
  • 日志复制:Flink 还采用了基于日志复制的方法来确保任务在处理期间不会丢失数据。这有助于提高系统的可靠性和容错性。

4.1 例4

在实时交易系统中,如果某个 TaskManager 发生故障导致任务失败,那么 Flink 会利用检查点机制来恢复该任务的状态并继续执行。具体来说,Flink 会从最近的检查点中读取任务的状态信息,并将这些信息重新加载到新的 TaskManager 上。然后,新的 TaskManager 会从检查点之后的位置开始继续处理数据流。这样,即使发生了故障,Flink 也能保证数据的完整性和一致性。

总结
Apache Flink 的底层原理涉及多个方面,包括数据流模型、任务调度与执行、内存管理、容错机制等。这些原理共同构成了 Flink 强大的实时流处理能力。通过举例分析,我们可以看到 Flink 是如何在实际应用中处理数据流、调度任务、管理内存和保障容错的。这些特性使得 Flink 成为处理大规模实时数据流的理想选择。

三、Flink应用场景解析

Apache Flink 作为一个开源流处理框架,在实时数据处理领域有广泛的应用。以下是一些实际例子来说明 Flink 的应用场景和优势:

1. 实时数据分析

1.1 例子:网络流量监控

  • 场景描述:在大型互联网公司中,网络流量是评估服务性能和用户行为的重要指标。使用 Flink 可以实时地监控和分析网络流量数据,如每秒的请求数、响应时间等。
  • 实现方式:通过 Flink 的 DataStream API,可以实时地从数据源(如 Kafka)读取流量数据,并进行聚合、过滤等处理,然后将结果输出到实时分析平台或数据库中。
    *优势:Flink 的高吞吐量和低延迟特性使得它能够快速响应数据变化,为决策者提供实时、准确的数据支持。

2. 社交媒体分析

2.1 例子:实时用户行为分析

  • 场景描述:社交媒体平台需要实时分析用户的行为数据,如点赞、评论、分享等,以了解用户偏好和趋势,从而优化内容推荐和广告投放策略。
  • 实现方式:利用 Flink 的事件时间窗口和状态管理功能,可以实时地处理用户行为数据流,计算用户的活跃度、兴趣偏好等指标,并实时更新用户画像。
  • 优势:Flink 的高可靠性和容错性保证了数据处理的一致性和连续性,即使在系统发生故障时也能快速恢复,保证数据的实时性和准确性。

3. 交易监控

3.1 例子:金融交易实时监控

  • 场景描述:在金融领域,交易监控是保障交易安全、预防欺诈的重要手段。通过 Flink 可以实时监控交易数据流,识别异常交易行为。
  • 实现方式:使用 Flink 的复杂事件处理(CEP)功能,可以定义复杂的交易模式并实时地匹配交易数据流,一旦发现异常交易行为则立即触发警报。
  • 优势:Flink 的高并发处理能力和低延迟特性使得它能够处理大量的交易数据,并实时地识别出异常交易行为,从而保障交易安全。

4. 日志处理

4.1 例子:大规模日志实时处理

  • 场景描述:在大型分布式系统中,日志文件是排查问题、优化性能的重要依据。使用 Flink 可以实时地处理和分析大规模日志数据。
  • 实现方式:通过 Flink 的 DataStream API,可以实时地从日志收集系统(如 Flume、Logstash)读取日志数据,并进行过滤、聚合等处理,然后将结果输出到日志分析平台或数据库中。
  • 优势:Flink 的高吞吐量和可扩展性使得它能够处理海量的日志数据,并实时地提供分析结果,帮助运维人员快速定位问题并优化系统性能。

5. 物联网(IoT)

5.1 例子:设备数据实时收集和处理

  • 场景描述:在物联网场景中,大量设备产生的数据需要被实时收集和处理,以支持智能决策和远程控制。
  • 实现方式:使用 Flink 可以实时地从设备数据源(如 MQTT 消息队列)读取数据,并进行数据清洗、聚合等处理,然后将处理结果发送到云端或本地系统进行进一步分析。
  • 优势:Flink 的实时性和可靠性使得它能够快速响应设备数据变化,并保证数据处理的一致性和连续性,为物联网应用提供强大的数据支持。
    这些例子展示了 Flink 在不同领域的实际应用和优势,体现了其在实时数据处理领域的强大能力。

四、fink入门案例解析

1. 滚动窗口(tumble window)

在这里插入图片描述
在这里插入图片描述

滚动窗口:窗口大小固定不变,同时窗口的移动距离和窗口大小相等

  1. 特点:
  • 窗口大小固定不变
  • 窗口的移动距离和窗口大小相等
  • 相邻的两个窗口间,既没有重叠也没有空缺,也就是数据仅且只会被处理一次
  1. 语法
    格式: tumble(时间字段名称, 滚动窗口大小)
    示例: tumble(pt, interval ‘10’ second),创建了一个窗口大小是10秒的滚动窗口

1.1 处理时间演示

**如下操作全部都在node1上面执行:**
#1.建表
CREATE TEMPORARY TABLE source_table_tumble0 ( 
 user_id BIGINT, 
 price BIGINT,
 `timestamp` STRING,
 pt AS PROCTIME()
) WITH (
  'connector' = 'socket',
  'hostname' = '192.168.88.161',        
  'port' = '9999',
  'format' = 'csv'
);

#2.启动nc
nc -lk 9999

#3.SQL逻辑
select 
    user_id,
    count(user_id) as pv,
    sum(price) as sum_price
from source_table_tumble0
group by
user_id,tumble(pt, interval '10' second);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

在这里插入图片描述
在这里插入图片描述

1.2 事件时间演示

#1.创建source表
CREATE TEMPORARY TABLE source_table_tumble1 ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = '192.168.88.161',        
  'port' = '9999',
  'format' = 'csv'
);

#2.启动nc
nc -lk 9999

#3.执行查询语句
select 
user_id,
count(user_id) as pv,
    sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000  as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000  as window_end
from source_table_tumble1
group by
    user_id,
    tumble(row_time, interval '5' second);
    
解释: window_start、window_end用来帮助查看窗口的开始和结束时间的,字段数据的表达式是固定写法,单位是毫秒。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

在这里插入图片描述

1.3 窗口的时间计算

一、窗口的开始时间
	窗口的开始时间,与第一条数据的时间相关
	计算公式 = 第一条数据的时间 - (第一条数据的时间 % 窗口大小)
	
二、窗口的结束时间
	窗口的结束时间,与窗口的开始时间和窗口大小有关
	计算公式= 窗口的开始时间 + 窗口大小 - 1毫秒
	
三、窗口计算的触发时间点
	触发时间,也就是窗口内部的数据被进行计算的时间点。窗口什么时候结束,那么就什么时候触发窗口内数据的计算操作
	
四、以案例给大家进行演示
	第一个窗口:
		窗口的开始时间 = 1000 - (1000 % 5000) = 1000 - 1000 = 0
		窗口的结束时间 = 0 + 5000 - 1 = 4999
		窗口的时间范围 = [0, 4999] = [0, 5000)
		窗口的触发时间 = 5000
		
	第二个窗口:
		窗口的开始时间 = 5000 - (5000 % 5000) = 5000 - 0 = 5000
		窗口的结束时间 = 5000 + 5000 - 1 = 9999
		窗口的时间范围 = [5000, 9999] = [5000, 10000)
		窗口的触发时间 = 10000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2. 滑动窗口(hop)

在这里插入图片描述
在这里插入图片描述

滑动窗口的分类
场景1: 相邻的滑动窗口间有重叠的部分,有部分数据被重复计算的情况。滑动窗口的主要使用场景
场景2: 相邻的滑动窗口间既没有重叠,也没有空隙。这种就是滚动窗口
场景3: 相邻的滑动窗口间有空隙,这种情况会导致部分数据得不到计算,也就是有数据丢失情况。实际工作中不允许出现。
  • 1
  • 2
  • 3
  • 4

2.1阿里云: SQL-入门案例

--0.语法
格式: hop(事件时间字段名称, 滑动距离, 窗口大小)
示例: hop(row_time, interval '2' SECOND, interval '5' SECOND)
滑动距离: 可以理解为多久对窗口内的数据执行一次计算


--1.创建表
CREATE TEMPORARY TABLE source_table_hop1 ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = '172.24.24.49',        
  'port' = '9999',
  'format' = 'csv'
);


--2.查询的SQL
SELECT user_id,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(hop_end(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_end, 
    sum(price) as sum_price
FROM source_table_hop1
GROUP BY user_id
    , hop(row_time, interval '2' SECOND, interval '5' SECOND);
    
注意: hostname要改成自己的阿里云ECS服务器的内网IP

--3.在你自己的阿里云ECS服务器上启动nc
nc -lk 9999
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

在这里插入图片描述

3. 会话窗口(session)

在这里插入图片描述

3.1 SQL案例实现

--0.语法



--1.创建表
CREATE TEMPORARY TABLE source_table_session ( 
 user_id STRING, 
 price BIGINT,
 `timestamp` bigint,
 row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
 watermark for row_time as row_time - interval '0' second
) WITH (
  'connector' = 'socket',
  'hostname' = 'node1',        
  'port' = '9999',
  'format' = 'csv'
);


---2.执行SQL
SELECT 
    user_id,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(session_end(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_end, 
    sum(price) as sum_price
FROM source_table_session
GROUP BY user_id
      , session(row_time, interval '5' SECOND);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

4. 聚合窗口(over)

4.1. 根据时间聚合代码实现

--1.创建表
CREATE TEMPORARY TABLE source_table_over_time (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '10',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);


--2.执行SQL
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近1小时内的数据
    RANGE BETWEEN INTERVAL '5' SECOND PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM source_table_over_time;



--3.和Hive中的over函数写法类似,只是在over里面多了时间的条件

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

4.2. 根据行号聚合代码实现

--1.创建表
CREATE TEMPORARY TABLE source_table_over_rows (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '2',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);



--2.执行SQL
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 5 行数据
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM source_table_over_rows;



--2.根据行号聚合,和上面的根据时间聚合类似,也和Hive中的over函数类似。只是添加了行号的条件

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/907721
推荐阅读
相关标签
  

闽ICP备14008679号