赞
踩
Apache Flink是一个框架和分布式处理引擎,用于对无限制和有限制的数据流进行有状态的计算。Flink被设计为可以在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
官网:https://flink.apache.org/
官网中文:https://flink.apache.org/zh/
Flink 开发文档:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/
本示例以 1.12 版本进行介绍,当前版本更新至 1.17 。
Apache Flink 是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个 Flink 流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。由于流处理和批处理所提供的SLA(服务等级协议)是完全不相同, 流处理一般需要支持低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理,所以在实现的时候通常是分别给出两套实现方法,或者通过一个独立的开源框架来实现其中每一种处理方案。比较典型的有:实现批处理的开源方案有 MapReduce、Spark;实现流处理的开源方案有 Storm;Spark的Streaming 其实本质上也是微批处理。
Flink 在实现流处理和批处理时,与传统的一些方案完全不同,它从另一个视角看待流处理和批处理,将二者统一起来:Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。
Flink 官方提供了Java、Scala、Python 语言接口用以开发 Flink 应用程序,但是 Flink 的源码是使用 Java 语言进行开发的,且 Flink 被阿里收购后(2019 年1 月 8 日),未来的主要编程语言可能主要会是 Java,且 GitHub 上关于 Flink 的项目,大多数是使用 Java 语言编写的。
Flink 支持本地运行、能在独立集群或者在被 YARN 管理的集群上运行, 也能部署在云上,该层主要涉及 Flink 的部署模式,目前 Flink 支持多种部署模式:本地、集群(Standalone、YARN)、云(GCE/EC2)、Kubenetes。Flink 能够通过该层能够支持不同平台的部署,用户可以根据需要选择使用对应的部署模式。
Runtime 层提供了支持 Flink 计算的全部核心实现,为上层 API 层提供基础服务,该层主要负责对上层不同接口提供基础服务,也是 Flink 分布式计算框架的核心实现层,支持分布式 Stream 作业的执行、JobGraph 到 ExecutionGraph 的映射转换、任务调度等。将 DataSteam 和 DataSet 转成统一的可执行的 Task Operator,达到在流式引擎下同时处理批量计算和流式计算的目的。
Flink 首先支持了 Scala 和 Java 的 API、Python 。DataStream、DataSet、Table、SQL API,作为分布式数据处理框架,Flink 同时提供了支撑计算和批计算的接口,两者都提供给用户丰富的数据处理高级 API ,例如 Map、FlatMap 操作等,也提供比较低级的 Process Function API,用户可以直接操作状态和时间等底层数据。
Flink 还包括用于复杂事件处理的CEP,机器学习库 FlinkML,图处理库 Gelly 等。Table 是一种接口化的 SQL 支持,也就是 API 支持(DSL),而不是文本化的 SQL 解析和执行。
Flink 之所以能这么流行,离不开它最重要的四个基石:Checkpoint
、State
、Time
、Window
。
这是 Flink 最重要的一个特性。
Flink基于 Chandy-Lamport 算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。
Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而 Flink 则把这个算法发扬光大了。
Spark 最近在实现 Continue streaming,Continue streaming 的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了 Chandy-Lamport 这个算法,说明Chandy-Lamport 算法在业界得到了一定的肯定。(https://zhuanlan.zhihu.com/p/53482103)
提供了一致性的语义之后,Flink 为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的 State API,包括里面的有 ValueState、ListState 、MapState ,近期添加了 BroadcastState ,使用 State API 能够自动享受到这种一致性的语义。
除此之外,Flink 还实现了 Watermark 的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。
另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink 提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。
Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。
https://flink.apache.org/zh/use-cases/
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。
系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。
从某种程度上来说,所有的实时的数据处理或者是流式数据处理都应该是属于Data Driven ,流计算本质上是 Data Driven 计算。应用较多的如风控系统,当风控系统需要处理各种各样复杂的规则时,Data Driven 就会把处理的规则和逻辑写入到Datastream 的API 或者是 ProcessFunction 的 API 中,然后将逻辑抽象到整个 Flink 引擎,当外面的数据流或者是事件进入就会触发相应的规则,这就是 Data Driven 的原理。在触发某些规则后,Data Driven 会进行处理或者是进行预警,这些预警会发到下游产生业务通知,这是Data Driven 的应用场景,Data Driven 在应用上更多应用于复杂事件的处理。
典型实例:
数据分析任务需要从原始数据中提取有价值的信息和指标。如下图所示,Flink 同时支持流式及批量分析应用。
Data Analytics Applications
:包含 Batch analytics (批处理分析)和 Streaming analytics (流处理分析)
Batch analytics
:可以理解为周期性查询:Batch Analytics 就是传统意义上使用类似于 Map Reduce、Hive、Spark Batch 等,对作业进行分析、处理、生成离线报表。比如Flink应用凌晨从 Recorded Events 中读取昨天的数据,然后做周期查询运算,最后将数据写入 Database 或者 HDFS ,或者直接将数据生成报表供公司上层领导决策使用。
Streaming analytics
:可以理解为连续性查询:比如实时展示双十一天猫销售 GMV(Gross Merchandise Volume 成交总额),用户下单数据需要实时写入消息队列,Flink 应用源源不断读取数据做实时计算,然后不断的将数据更新至 Database 或者 K-VStore ,最后做大屏实时展示。
典型实例:
什么是数据管道?
提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此数据管道支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。
例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。下图描述了周期性 ETL 作业和持续数据管道的差异。
Periodic ETL
:比如每天凌晨周期性的启动一个 Flink ETL Job,读取传统数据库中的数据,然后做ETL,最后写入数据库和文件系统。
Data Pipeline
:比如启动一个 Flink 实时应用,数据源(比如数据库、Kafka )中的数据不断的通过 Flink Data Pipeline 流入或者追加到数据仓库(数据库或者文件系统),或者 Kafka 消息队列。Data Pipeline 的核心场景类似于数据搬运并在搬运的过程中进行部分数据清洗或者处理,而整个业务架构图的左边是 Periodic ETL,它提供了流式 ETL 或者实时 ETL,能够订阅消息队列的消息并进行处理,清洗完成后实时写入到下游的 Database 或 File system 中。
典型实例:
同时支持高吞吐、低延迟、高性能
Flink 是目前开源社区中唯一一套集高吞吐、低延迟、高性能三者于一身的分布式流式数据处理框架。Spark 只能兼顾高吞吐和高性能特性,无法做到低延迟保障,因为Spark是用批处理来做流处理; Storm 只能支持低延时和高性能特性,无法满足高吞吐的要求。下图显示了 Apache Flink 与 Apache Storm 在完成流数据清洗的分布式任务的性能对比。
支持事件时间(Event Time)概念
在流式计算领域中,窗口计算的地位举足轻重,但目前大多数框架窗口计算采用的都是系统时间(Process Time),也就是事件传输到计算框架处理时,系统主机的当前时间。Flink 能够支持基于事件时间(Event Time)语义进行窗口计算,这种基于事件驱动的机制使得事件即使乱序到达甚至延迟到达,流系统也能够计算出精确的结果,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。
支持有状态计算
Flink1.4开始支持有状态计算,所谓状态就是在流式计算过程中将算子的中间结果保存在内存或者文件系统中,等下一个事件进入算子后可以从之前的状态中获取中间结果,计算当前的结果,从而无须每次都基于全部的原始数据来统计结果,极大的提升了系统性能,状态化意味着应用可以维护随着时间推移已经产生的数据聚合。
支持高度灵活的窗口(Window)操作
Flink 将窗口划分为基于 Time 、Count 、Session、以及 Data-Driven 等类型的窗口操作,窗口可以用灵活的触发条件定制化来达到对复杂的流传输模式的支持,用户可以定义不同的窗口触发机制来满足不同的需求。
基于轻量级分布式快照(Snapshot/Checkpoints)的容错机制
Flink 能够分布运行在上千个节点上,通过基于分布式快照技术的 Checkpoints ,将执行过程中的状态信息进行持久化存储,一旦任务出现异常停止,Flink 能够从 Checkpoints 中进行任务的自动恢复,以确保数据处理过程中的一致性,Flink 的容错能力是轻量级的,允许系统保持高并发,同时在相同时间内提供强一致性保证。
基于 JVM 实现的独立的内存管理
Flink 实现了自身管理内存的机制,通过使用散列,索引,缓存和排序有效地进行内存管理,通过序列化/反序列化机制将所有的数据对象转换成二进制在内存中存储,降低数据存储大小的同时,更加有效的利用空间。使其独立于 Java 的默认垃圾收集器,尽可能减少 JVM GC 对系统的影响。
基于 JVM 实现的独立的内存管理
Flink 实现了自身管理内存的机制,通过使用散列,索引,缓存和排序有效地进行内存管理,通过序列化/反序列化机制将所有的数据对象转换成二进制在内存中存储,降低数据存储大小的同时,更加有效的利用空间。使其独立于 Java 的默认垃圾收集器,尽可能减少 JVM GC 对系统的影响。
SavePoints 保存点
对于 7 * 24 小时运行的流式应用,数据源源不断的流入,在一段时间内应用的终止有可能导致数据的丢失或者计算结果的不准确。比如集群版本的升级,停机运维操作等。值得一提的是,Flink 通过 SavePoints 技术将任务执行的快照保存在存储介质上,当任务重启的时候,可以从事先保存的 SavePoints 恢复原有的计算状态,使得任务继续按照停机之前的状态运行。Flink 保存点提供了一个状态化的版本机制,使得能以无丢失状态和最短停机时间的方式更新应用或者回退历史数据。
灵活的部署方式,支持大规模集群
Flink 被设计成能用上千个点在大规模集群上运行。除了支持独立集群部署外,Flink 还支持 YARN 和 Mesos 方式部署。
Flink 的程序内在是并行和分布式的
数据流可以被分区成 stream partitions,operators 被划分为 operator subtasks;这些 subtasks 在不同的机器或容器中分不同的线程独立运行;operator subtasks 的数量就是 operator 的并行计算数,不同的 operator 阶段可能有不同的并行数;如下图所示,source operator 的并行数为 2,但最后的 sink operator 为1;
丰富的库
Flink 拥有丰富的库来进行机器学习,图形处理,关系数据处理等。
用户实现的 Flink 程序是由 Stream 和 Transformation 这两个基本构建块组成,其中 Stream 是一个中间结果数据,而 Transformation 是一个操作,它对一个或多个输入 Stream 进行计算处理,输出一个或多个结果 Stream 。当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow 。一个 Streaming Dataflow 是由一组 Stream 和 Transformation Operator 组成,它类似于一个 DAG 图,在启动的时候从一个或多个 Source Operator 开始,结束于一个或多个 Sink Operator。
下面是一个由 Flink 程序映射为 Streaming Dataflow 的示意图,如下所示:
上图中,FlinkKafkaConsumer 是一个 Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink 是一个 Sink Operator。
在 Flink 中,程序天生是并行和分布式的:一个 Stream 可以被分成多个 Stream 分区(Stream Partitions),一个 Operator 可以被分成多个 Operator Subtask ,每一个 Operator Subtask 是在不同的线程中独立执行的。一个 Operator 的并行度,等于 Operator Subtask 的个数,一个 Stream 的并行度总是等于生成它的 Operator 的并行度。有关 Parallel Dataflow 的实例,如下图所示:
上图 Streaming Dataflow 的并行视图中,展现了在两个 Operator 之间的 Stream 的两种模式:
流处理中的聚合操作(counts,sums等等)不同于批处理,因为数据流是无限,无法在其上应用聚合,所以通过限定窗口(window)的范围,来进行流的聚合操作。例如:5分钟的数据计数,或者计算100个元素的总和等等。
窗口可以由时间驱动 (every 30 seconds) 或者数据驱动(every 100 elements)。如:滚动窗口tumbling windows(无叠加),滑动窗口sliding windows(有叠加),以及会话窗口session windows(被无事件活动的间隔隔开)。
三种不同的时间概念:
在流处理中,有些操作仅仅在某一时间针对单一事件(如事件转换map),有些操作需要记住多个事件的信息并进行处理(window operators),后者的这些操作称为有状态的操作。有状态的操作一般被维护在内置的 key/value 存储中。这些状态信息会跟数据流一起分区并且分布存储,并且可以通过有状态的数据操作来访问。因此这些 key/value 的状态信息仅在带 key 的数据流(通过 keyBy() 函数处理过)中才能访问到。数据流按照 key 排列能保证所有的状态更新都是本地操作,保证一致性且无事务问题。同时这种排列方式使 Flink 能够透明的再分发状态信息和调整数据流分区。
Flink 通过流回放和设置检查点的方式实现容错。一个checkpoint关联了输入流中的某个记录和相应状态和操作。数据流可以从checkpoint中进行恢复,并保证一致性(exactly-once 的处理语义)。 Checkpoint的间隔关系到执行是的容错性和恢复时间。
Flink 把批处理作为特殊的流处理程序来执行,许多概念也都可以应用的批处理中,除了一些小的不同:
Flink 部署方式:
在 Flink 分布式执行环境中,会将多个运算子任务 Operator Subtask 串起来组成一个 Operator Chain ,实际上就是一个运算链。每个运算会在 TaskManager 上一个独立的线程中执行。将算子串连到任务中是一种很好的优化:它能减少线程间的数据交接和缓存,并且提高整体的吞吐,降低处理的时延。这种串联的操作,可以通过 API 来进行配置。如下图的数据流就有 5 个子任务,通过 5 个并行的线程来执行,所示:
Source Operator 对应 2 个 Subtask,所以并行度为 2 ,而 Sink Operator 的 Subtask 只有1个,故而并行度为 1 。这个 Flink 作业的任务有 2 + 2 + 1 = 5 个,则会有 5 个不同的线程执行。
什么情况下算子可以组合为算子链?
上下游的并行度一致
上游算子将所有数据前向传播到下游算子上,数据不进行任何交换,那么这两个算子可以被链接到一起。比如,先进行 filter(),再进行map() 这两个算子可以被链接到一起。(Flink 源码 org.apache.flink.streaming.api.graph.StreamingJob-
raphGenerator中的 isChainable() 方法定义了何种情况可以进行链接)
下游节点的入并行度为 1 (也就是说下游算子节点没有来自其他算子节点的输入)
上下游节点都在同一个 slot group 中
dataStream.filter(e->e.getId()!=0).slotSharingGroup("groupName");
下游节点的 chain 策略为 ALWAYS(可以与上下游算子链接,map、flatmap、filter 等默认是 ALWAYS )
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD )
两个节点间数据分区方式是 forward
用户没有禁用 chain
Flink的运行时,由两种类型的进程组成:
注:如果一个 Slot 中启动多个线程,那么这几个线程类似 CPU 调度一样共用同一个 Slot。JobManagers 与 TaskManagers
之间的任务管理,Checkpoints 的触发,任务状态,心跳等等消息处理都是通过 ActorSystem。
上边这幅图呢,表示了 source/map 算子链 与 keyby/window/apply 算子链的两个 subtask 都进入了不同的 Task Slot ,Sink 单独进入了一个 Task Slot,由于 TaskManger 会根据 Task Slot 数量 对每个 Task Slot 平分系统资源,但是呢,我们发现,上述情况,有一个 Task Slot 为空闲状态并未使用,因为白白浪费了系统资源…
为什么会出现这种情况呢?因为并行度设置不合理导致的…由于上方 2+2+1 = 5 并行度,总共才会有 2 个 subtask,就算每个subtask都进入了不同的Task Slot,仍会有 Task Slot 为空闲状态 (因为上图 Task Slot 有 6 个…)
解释:从图中看,这里的任务分成了三个阶段,第一个阶段是 source/map 算子链,第二个阶段是 keyby/window/apply 算子链,第三个阶段是 Sink ,前两个阶段每个阶段都设置并行度是 2 ,所以使用 2 个 线程,第三个阶段并行度是 1,所以 一共使用了 5个并行度。出现空闲的原因是设置了 6 个并行度,导致多空出来一个 Slot。
优化方案:
1、可以设置成 5 个并行度;
2、第一个阶段 source/map 并行度设置为 6、第二个阶段 keyby/window/apply 并行度设置为 6、第三个阶段sink并行度设置为 1 。这样总体还设置 6 个并行度。
这个时候呢,为了能充分利用slot资源,我们需要对我们的Flink作业并行度进行优化设置。
比如,我们设置 source/map、keyby/window/apply 的算子链并行度为6(并行度为6,表示算子(Task)的并行数为 6,即同时可有 6 个subtask 执行),Sink 并行度保持为1;
每一个 sourec/map keyBy/window/apply 算子链进入到不同 Task Slot 如下图所示:
那么,此时,我们的 Task Slot都被利用到了,就能充分利用 Slot 资源,同时保证每个 TaskManager 能平均分配到重的 subtasks,比如keyby/window/apply 操作就会均分到申请的所有 Slot 里,也保证了 Slot 的负载均衡。默认情况下,Flink 允许子任务共享 Slot ,即便它们是不同任务的子任务,只要属于同一个 job。这样的结果就是一个 Slot 会负责一个 job 的整个 pipeline。
不同任务共享同一个 Slot 的前提
这几个任务前后顺序不同,如上图中 Source 和 keyBy 是两个不同步骤顺序的任务,所以可以在同一个 Slot 执行。
一个 Slot 可以保存作业的整个管道的好处
更实现更好的资源利用
假设拆分的多个Source子任务放到同一个Slot,那么任务不能并行执行了=>因为多个相同步骤的子任务需要抢占的具体资源相同,比如抢占某个锁,这样就不能并行。 使用共享的 Slot 的将充分的利用分槽的资源,使代价较大的 subtask 能够均匀的分布在 TaskManager 上。如图中的共享 Slot 的执行模式中可以并行运行 6 个 pipeline 而上图的只可以运行 2 个 pipeline。同时 APIs 也提供了资源组的机制,可以实现不想进行资源隔离的情况。实践中,比较好的每个 TaskManager 的 Task Slot 的默认数量最好是 CPU 的核数。
Task Slot 是静态概念
Task Slot是静态的概念,是指 TaskManager 具有的并发执行能力,可以通过参数 taskmanager.numberOfTaskSlots 进行配置。而并行度 parallelism 是动态概念,即 TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。
例如一个 TaskManager 有 3 个 Slots,每个 Slot 能管理对这个 Worker 分配的资源的三分之一的内存。对资源分槽,意味着 Subtask 不会同其他 Subtasks 竞争内存,同时可以预留一定的可用内存。目前 Task Slot 没有对 CPU 进行隔离,仅是针对内存。通过动态的调整 Task Slot 的个数,用户可以定义哪些子任务可以相互隔离。只有一个 Slot 的 TaskManager 意味着每个任务组运行在一个单独 JVM 中。 在拥有多个 Slot 的 TaskManager 上, subtask 共用 JVM ,可以共用 TCP 连接和心跳消息,同时可以共用一些数据集和数据结构,从而减小任务的开销。
数据的 KV 索引信息存储在设定的状态后端的存储中。一种是内存中的 Hash map,另一种是存在 Rocksdb(KV存储)中。另外,状态后端还是实现了在时间点上对 KV 状态的快照,并作为 Checkpoint 的一部分存储起来。
通过 Data Stream API 编写的程序可以从一个保存点重新开始执行。即便你更新了你的程序和Flink集群都不会有状态数据丢失。
保存点是手动触发的,触发时会将它写入状态后端。 Savepoints 的实现也是依赖 Checkpoint 的机制。Flink 程序在执行中会周期性的在worker 节点上进行快照并生成Checkpoint。因为任务恢复的时候只需要最后一个完成的 Checkpoint 的,所以旧有的 Checkpoint 会在新的 Checkpoint 完成时被丢弃。
Savepoints 和周期性的 Checkpoint 非常的类似,只是有两个重要的不同。一个是由用户触发,而且不会随着新的 Checkpoint 生成而被丢弃。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。