当前位置:   article > 正文

Flink技术实战分享和原理浅析_flink 实战派

flink 实战派
 

1 Flink是什么

在当前的数据量激增的时代,各种业务场景都有大量的数据产生,对于这些不断产生的数据应该如何高效的处理,成为当前的所面临的问题。而相对传统的数据处理模式,流式数据处理有着更高的处理效率和成本控制能力。Flink在德语中的意思快速和敏捷,同时又以一只棕红色的松鼠作为项目的Logo。

Apache Flink是一个支持高吞吐、低延迟、高性能的分布式处理框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

1.1 大数据架构的演变

1)同时支持高吞吐、低延迟、高性能

- Apache Spark Streaming也只能兼顾高吞吐和高性能特性,无法做到低延迟保障

- Apache Storm 只能支持低延时和高性能特性,无法满足高吞吐的要求

2)支持事件时间(Event Time)概念

3)支持有状态计算

4)支持高度灵活的窗口(Window)操作 

5)基于轻量级分布式快照(Snapshot)实现的容错

6)基于 JVM 实现的独立的内存管理

7)Savepoints (保存点)

1.2 有界数据流和无界数据流

数据的产生原本就是流式的。无论是来自Web服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕有界流(bounded)或无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。

批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。

流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。

2 应用场景

在实际生产过程中,大量数据在不断的产生,例如物流实操数据、商城订单数据、移动终端产生的数据、网络流量监控数据以及服务器运行监控和日志数据,这些数据最大的共同点是实时的从不同的数据源产生,然后传输到下周的分析系统。针对这些我们都可以使用Flink来做到以下一些功能的支持,如:智能推荐、实时监测欺诈、复杂事件处理、实时数仓和ETL、流数据分析和监控报表等。

3 代码介绍

4 Flink的基本架构

4.1 基本组件栈

4.2 运行时架构图

Flink运行时由两种类型的进程组成:一个JobManager和一个或者多个TaskManager。

StreamGraph :

JobGraph :

SlotGraph :

Flink系统作业的提交和调度都是利用AKKA的Actor通信,因此也是由此作为切入点,首先理清整个系统的启动以及作业提交的流程和数据流。

JobManager负责整个Flink集群任务的调度以及资源的管理,具有许多与协调Flink应用程序的分布式执行有关的职责:它决定何时调度下一个task(或一组 task)、对完成的task或执行失败做出反应、协调checkpoint、并且协调从失败中恢复等等。JobManager根据JobGraph生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

1)ResourceManager

-ResourceManager负责Flink集群中的资源提供、回收、分配-它管理task-slots,这是Flink集群中资源调度的单位(请参考TaskManagers)。

2)Dispatcher

-Dispatcher提供了一个REST接口,用来提交Flink应用程序执行,并为每个提交的作业启动一个新的JobMaster。

3)JobMaster

-JobMaster负责管理单个JobGraph的执行。Flink集群中可以同时运行多个作业,每个作业都有自己的JobMaster。

TaskManager(也称为worker)执行作业流的task,并且缓存和交换数据流。必须始终至少有一个TaskManager。在TaskManager中资源调度的最小单位是task-slot。TaskManager中task slot的数量表示并发处理task的数量。

5 Flink DataStream API

- 高级语言-----SQL

- 声明式API-----Table API

- 核心API------DataStream/DataSet API

- 低级构建模块(流、状态、事件[时间])-----Runtime: Stateful Stream Processing

5.1 算子

用户通过算子能将一个或多个DataStream转换成新的DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。

例如:Map、FlatMap、Filter、Window、Union、Connect

此处不做过多介绍,有兴趣的同学可以参看官方文档,描述很详细。

5.2 并行

Flink程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

算子子任务数就是其对应算子的并行度。在同一程序中,不同算子也可能具有不同的并行度。

5.3 时间概念和Watermark

对于流式数据处理,最大的特点是数据上具有时间的属性,Flink根据时间生产的位置不同,将时间区分为三种,分别为:

- 事件生成时间(Event Time):每个独立事件在生产它的设备上发生的时间,这个时间通常事件进入Flink之前就已经嵌入到时间中

- 事件接入时间(Ingestion Time):数据进入Flink系统时间

- 事件处理时间(Processing Time):数据在操作算子计算过程中获取到的所在主机时间

5.4 Windows窗口计算

通过按照固定的时间或长度将数据流切分成不同的窗口,然后对数据进行聚合运算,从而得到一定时间范围内的运算结果。例如统计某网站最近5分钟的点击率。

5.4.1 keyed和 Non-keyed窗口

Flink根据上游数据流是否为KeyedStream类型(数据流按照Key分区),对应windows处理也会有所不同。如果上游数据流是KeyedStream类型,则调用DataStream API中的window()方法执行窗口处理逻辑,数据会根据key在不同的Task实例中进行分别计算,最后得出针对每个key计算的结果。如果是Non-keyed类型,则调用windowAll()方法来指定,所有的数据都会在窗口算子中路由同一个Task中计算,得到全局计算结果。

5.4.1 窗口的划分

1)滚动窗口:滚动窗口是根据固定时间或大小进行切分,且窗口和窗口之间的元素互补重叠。

2)滑动窗口:滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

3)会话窗口:会话窗口根据Session gap切分不同的窗口,当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的。我们可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。

5.5 作业链和资源组

5.5.1 作业链

在Flink的作业中,用户可以指定相应的链条将相关性非常强的算子操作绑定在一起,让上下游的Task在同一个Pipeline中执行,避免数据在网络或者线程间传输产生的开销。

5.5.2 资源组

在flink集群中,一个TaskManager就是一个JVM进程,并且会用独立的线程来执行task,为了控制一个TaskManager能接受多少个task,Flink提出了Task Slot的概念。

我们可以简单的把Task Slot理解为TaskManager的计算资源子集。假如一个TaskManager拥有5个slot,那么该TaskManager的计算资源会被平均分为5份,不同的task在不同的slot中执行,避免资源竞争。但是需要注意的是,slot仅仅用来做内存的隔离,对CPU不起作用。

通过调整task slot的数量,用户可以定义task之间是如何相互隔离的。每个TaskManager有一个slot,也就意味着每个task运行在独立的JVM中。每个TaskManager有多个slot的话,也就是说多个task运行在同一个JVM中。 而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗

6 Flink状态管理和容错

6.1 有状态计算

 任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件或中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

6.2 Checkpoints和Savepoints

6.2.1 Checkpoint

快照是Flink作业状态全局一致镜像的通用术语。快照包括指向每个数据源的指针(例如,到文件或Kafka分区的偏移量)以及每个作业的有状态运算符的状态副本,该状态副本是处理了sources偏移位置之前所有的事件后而生成的状态。

 一种由Flink自动执行的快照,其目的是能够从故障中恢复。Checkpoints可以是增量的(只有状态存储使用Rockdb时支持),并为快速恢复进行了优化。

Flink分布式快照的核心元素:

- Barrier(数据栅栏):可以把Barrier简单地理解成一个标记,该标记是严格有序的,并且随着数据流往下流动。每个Barrier都带有自己的ID,Barrier极其轻量,并不会干扰正常的数据处理。

- 异步:每次在把快照存储到我们的状态后端时,如果是同步进行就会阻塞正常任务,从而引入延迟。因此Flink在做快照存储时,采用异步方式。

- 增量:由于checkpoint是一个全局状态,用户保存的状态可能非常大,多数达G或者T级别,checkpoint 的创建会非常慢,而且执行时占用的资源也比较多,因此Flink提出了增量快照的概念。也就是说,每次进行的全量 checkpoint,是基于上次进行更新的。

当 checkpoint coordinator(job manager的一部分)指示task manager开始checkpoint时,它会让所有sources记录它们的偏移量,并将编号的checkpoint barriers插入到它们的流中。这些barriers流经job graph,标注每个checkpoint前后的流部分。

6.2.2 Savepoints

用户出于某种操作目的(例如有状态的重新部署/升级/缩放操作)手动(或API调用)触发的快照,手动触发的全量Checkpoint。Savepoints始终是完整的、全量的,并已针对操作灵活性进行了优化。

6.2.3 恢复

当流处理应用程序发生错误的时候,结果可能会产生丢失或者重复。Flink根据你为应用程序和集群的配置,可以产生以下结果:

- Flink不会从快照中进行恢复(at most once),执行一次或零次

- 没有任何丢失,但是你可能会得到重复冗余的结果(at least once),执行最少一次

- 没有丢失或冗余重复(exactly once),执行精确一次

Exactly once模式下的checkpoint流程,对齐模式:

Aligned Checkpoint状态变化

图a: 输入Channel 1存在3个元素,其中2在Barrier前面;Channel 2存在4个元素,其中 2、9、7在Barrier前面。

图b: 算子分别读取Channel一个元素,输出2。随后接收到Channel 1的Barrier,停止处理Channel 1后续的数据,只处理Channel 2的数据。

图c: 算子再消费2个自Channel 2的元素,接收到Barrier,开始本地快照并输出Barrier。

注:若使用at least once,则为非对齐模式,即图b时,接收到Channel 1的Barrier后,继续处理Channel 1后续的数据,并不阻塞,所以非对齐模式对节点数据处理有更好的执行效率,对checkpoint完成并不一定快。

如何实现精确一致性?

Flink 通过强大的异步快照机制和两阶段提交,实现了“端到端的精确一次语义”。

 “端到端(End-to-End)的精确一次”,指的是Flink应用从Source端开始到Sink端结束,数据必须经过的起始点和结束点。Flink自身是无法保证外部系统“精确一次”语义的,所以Flink若要实现所谓“端到端”的精确一次”的要求,那么外部系统必须支持“精确一次”语义;然后借助Flink提供的分布式快照和两阶段提交才能实现。

整个过程可以总结为下面四个阶段:

- 一旦Flink开始做checkpoint操作,那么就会进入pre-commit阶段,同时Flink JobManager会将检查点Barrier注入数据流中 ;

- 当所有的barrier在算子中成功进行一遍传递,并完成快照后,则pre-commit阶段完成;

- 等所有的算子完成“预提交”,就会发起一个“提交”动作,但是任何一个“预提交”失败都会导致Flink回滚到最近的checkpoint;

- pre-commit完成,必须要确保commit也要成功,Sink Operators和Kafka Sink会共同来保证。

Unaligned Checkpoint是Flink1.11新增的功能。在Flink之前的版本,checkpoint的对齐操作会使先收到barrier的input-channel后续到来的数据缓存起来,一直等到所有的input channel都接收到chechkpoint barrier并且checkpoint操作完毕后,才放开数据进入operator。这样虽然保证了exactly-once,但是显著的增加了延迟,降低了性能。如果再遇到数据反压,情况会更加糟糕。

Exactly once模式下的checkpoint流程,非对齐模式:

Unaligned-Checkpoint 状态变化

Barrier越过数据

图a: 输入 Channel 1存在3个元素,其中2在Barrier前面;Channel 2存在4个元素,其中 2、9、7 在Barrier前面。输出Channel已存在结果数据1。

图b: 算子优先处理输入Channel 1的Barrier,开始本地快照记录自己的状态,并将Barrier插到输出Channel末端。

图c: 算子继续正常处理两个Channel的输入,输出2、9。同时算子会将Barrier越过的数据(即输入Channel 1的2和输出Channel的1)写入Checkpoint,并将输入Channel 2后续早于Barrier的数据(即 2、9、7)持续写入Checkpoint。

6.3 状态管理

在使用Flink过程中,支持三种类型的StateBackend数据状态:

通常情况下我们使用RocksDBStateBackend存储数据状态。

7 监控与调优

7.1 Backpressure背压(反压)原理

7.1.1 什么是背压(反压)

反压是在实时数据处理中,数据管道某个节点生产数据的速率比下游节点消费数据的速率要快的一种现象。 在工作流中数据记录是从上游向下游流动的(例如:从Source到Sink)。反压沿着相反的方向传播,沿着数据流向+上游传播。

以一个简单的Source->Sink-Job为例。如果看到Source发生了警告,意味着Sink消费数据的速率比Source生产数据的速率要慢。Sink正在向上游的Source算子产生反压。

7.1.2 Backpressure产生的原理

Flink通过在TaskManager中采样LocalBufferPool内存块上的每一Task的stackTrace实现。默认情况,TM会触发100次采样,每次间隔50ms来确定反压,然后将解决汇报给JobManager,由JM进行汇总计算,得出最终结果。 你在Web界面看到的比率表示在获得的样本中有多少表明Task正在被反压,例如:0.01表示100个样本中只有1个反压了。

- OK: 0<=比例<=0.10

- LOW:0.10<比例<=0.5

- HIGH:0.5<比例<=1

1)跨TaskManager的反压过程

2)TaskManager内部的反压过程

3)基于Credit的反压过程

7.1.3 Backpressure的优化

反压的影响主要体现在Flink中checkpoint过程上,主要影响两个方面:

- 反压出现时,相关数据流阻塞,可能导致checkpoint超时或失败。

- 在对齐checkpoint场景中,算子接收多个管道输入,输入较快的管道数据state会被缓存起来,等待输入较慢的管道数据barrier对齐,由于输入较快管道数据没被处理,一直积压可能导致OOM或者内存资源耗尽的不稳定问题。

反压的原因:

1)系统资源

首先,需要检查机器的资源使用情况,像CPU、网络、磁盘I/O等。如果一些资源负载过高,就可以进行下面的处理:

- 尝试优化代码;

- 针对特定资源对Flink进行调优;

- 增加并发或者增加机器;

2)垃圾回收

- 性能问题常常源自过长的GC时长。这种情况下可以通过打印GC日志,或者使用一些内存/GC分析工具来定位问题。

3)线程争用

- 跟上面CPU/线程瓶颈问题类似,一个子任务可能由于对共享资源的高线程争用成为瓶颈。同样的,CPU分析工具对于探查这类问题也很有用。

4)负载不均

- 如果瓶颈是数据倾斜造成的,可以尝试删除倾斜数据,或者通过改变数据分区策略将造成数据的key值拆分,或者也可以进行本地聚合/预聚合。

7.2 Checkpointing 监控与优化

Checkpointing超时问题大多情况下是由于反压问题产生,解决反压问题,即可解决大部分的Checkpoint问题。

可以针对以下几点优化:

1)检查程序中是否存在大量的状态的交互,可适当增大托管内存,因为RockDb运行实际占用托管内存,可通过taskmanager.memory.managed.fraction参数进行调整,官方默认是0.4,BDP平台上默认为0.1

2)可以增大Network内存占比,并用使用exactly-once模式,原因exactly-once模式是对齐模式,会阻塞先收到Barrier的数据流,此时Solt资源全部用于处理另外延迟的数据流,更快的完成checkpoint,但是会降低算子的计算速度。若Flink版本在1.11之后,也可使用Unaligned Checkpoint,速度更快且不会降低算子计算速度,但是存在一个弊端,在调整算子并行度后,会导致状态丢失的问题,请知悉。

3)优化Rockdb运行参数,提升Rockdb性能,也可提升Checkpoint速度

常用配置如下:

4)若服务器压力过大,可增大并行度

其优化思路与解决背压基本一致,这里不再过多赘述。

7.3 Flink内存优化

7.3.1 Flink内存模型

如上图所示,下表中列出了Flink TaskManager内存模型的所有组成部分,以及影响其大小的相关配置参数。

我们可以看到,有些内存部分的大小可以直接通过一个配置参数进行设置,有些则需要根据多个参数进行调整。

8 分拣报表数据处理架构

双层数据模型加工:

1)基础数据模型层:按照业务范围对表数据清洗、加工,最终生成一条宽表数据,通过JDQ发出,供下游直接使用。

2)业务数据加工层:按照需求对基础模型层数据进行处理,补全最终数据字段,将结果写入ES索引中。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/607512
推荐阅读
相关标签
  

闽ICP备14008679号