赞
踩
流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过90 度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。
下图展示了无状态流处理和有状态流处理的主要区别。无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。
上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。
尽管无状态的计算很重要,但是流处理对有状态的计算更感兴趣。事实上,正确地实现有状态的计算比实现无状态的计算难得多。旧的流处理系统并不支持有状态的计算,而新一代的流处理系统则将状态及其正确性视为重中之重。
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction 会保存设置的定时器信息等等。
state:flink中有状态函数和运算符在各个元素(element)/事件(event)的处理过程中存储的数据(注意:状态数据可以修改和查询,可以自己维护,根据自己的业务场景,保存历史数据或者中间结果到状态(state)中);
Flink中有两种基本类型的State:Keyed State,Operator State。
他们两种都可以以两种形式存在:原始状态(raw state)和托管状态(managed state).
托管状态:由Flink框架管理的状态,如ValueState, ListState, MapState等,我们通常使用的就是这种。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑他。
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
Flink 为算子状态提供三种基本数据结构:
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的key-value map 数据结构,只能用于KeyedStream(keyBy 算子处理之后)。
Flink的Keyed State支持以下数据类型:
示例:
- // Keyed state测试:必须定义在RichFunction中,因为需要运行时上下文 与 MapFunction的区别
- class MyRichMapper1 extends RichMapFunction[SensorReading, String] {
- var valueState: ValueState[Double] = _
- lazy val listState: ListState[Int] = getRuntimeContext.getListState(new ListStateDescriptor[Int]("liststate", classOf[Int]))
- lazy val mapState: MapState[String, Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Double]("mapstate", classOf[String], classOf[Double]))
- lazy val reduceState: ReducingState[SensorReading] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[SensorReading]("reducestate", new MyReducer, classOf[SensorReading]))
-
- override def open(parameters: Configuration): Unit = {
- valueState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate", classOf[Double]))
- }
-
- override def map(value: SensorReading): String = {
- // 状态的读写
- val myV = valueState.value()
- valueState.update(value.temperature)
- listState.add(1)
- val list = new util.ArrayList[Int]()
- list.add(2)
- list.add(3)
- listState.addAll(list)
- //更新整个列表
- listState.update(list)
- listState.get()
-
- mapState.contains("sensor_1")
- mapState.get("sensor_1")
- mapState.put("sensor_1", 1.3)
-
- // 得到聚合完成的值
- reduceState.get()
- // 把新增的和前面的聚合起来
- reduceState.add(value)
-
- value.id
- }
- }
富函数可以用来获取上下文信息,以及在处理完数据时做一些清理工作。例如RichMapFunction。
键控状态必须定义在RichFunction中,因为需要运行时上下文。
底层API可以访问时间戳、水位线以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
在流处理中,一致性可以分为3 个级别:
曾经,at-least-once 非常流行。第一代流处理器(如Storm 和Samza)刚问世时只保证at-least-once,原因有二:
Flink 的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:
而对于sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。DataStream API 提供了GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。
Flink内部提供了Exactly once特性,是依赖于带有barrier的分布式快照+可部分重发的数据源功能实现的。而分布式快照中,就保存了operator的状态信息。
Flink的失败恢复依赖于 检查点机制 + 可部分重发的数据源。
检查点机制:checkpoint定期触发,产生快照,快照中记录了:
- 当前检查点开始时数据源(例如Kafka)中消息的offset。
- 记录了所有有状态的operator当前的状态信息(例如sum中的数值)。
可部分重发的数据源:Flink选择最近完成的检查点K,然后系统重放整个分布式的数据流,然后给予每个operator他们在检查点K快照中的状态。数据源被设置为从位置Sk开始重新读取流。例如在Apache Kafka中,那意味着告诉消费者从偏移量Sk开始重新消费。
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。
我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink +Kafka 的数据管道系统(Kafka 进、Kafka 出)而言,各组件怎样保证exactly-once语义呢?
在分布式系统中,可以使用两阶段提交来实现事务性从而保证数据的一致性,两阶段提交分为:预提交阶段与提交阶段,通常包含两个角色:协调者与执行者,协调者用于用于管理所有执行者的操作,执行者用于执行具体的提交操作,具体的操作流程:
1. 首先协调者会送预提交(pre-commit)命令有的执行者
2. 执行者执行预提交操作然后发送一条反馈(ack)消息给协调者
3. 待协调者收到所有执行者的成功反馈,则发送一条提交信息(commit)给执行者
4. 执行者执行提交操作
如果在流程2中部分预提交失败,那么协调者就会收到一条失败的反馈,则会发送一条rollback消息给所有执行者,执行回滚操作,保证数据一致性;但是如果在流程4中,出现部分提交成功部分提交失败,那么就会造成数据的不一致,因此后面也提出了3PC或者通过其他补偿机制来保证数据最终一致性
flink中两阶段提交是为了保证端到端的Exactly Once,主要依托checkpoint机制来实现,先看一下checkpoint的整体流程,
1.JobManager会周期性的发送执行checkpoint命令(start checkpoint);
2.当source端收到执行指令后会产生一条barrier消息插入到input消息队列中,当处理到barrier时会执行本地checkpoint, 并且会将barrier发送到下一个节点,当checkpoint完成之后会发送一条ack信息给JobManager;
3. 当所有节点都完成checkpoint之后,JobManager会收到来自所有节点的ack信息,那么就表示一次完整的checkpoint的完成;
4. JobManager会给所有节点发送一条callback信息,表示通知checkpoint完成消息。接下来就可以提交事务了
对比flink整个checkpoint机制调用流程可以发现与2PC非常相似,JobManager相当于协调者,flink提供了CheckpointedFunction与CheckpointListener这样两个接口,CheckpointedFunction中有snapshotState方法,每次checkpoint触发执行方法,通常会将缓存数据放入状态中,可以理解为是一个hook,这个方法里面可以实现预提交,CheckpointListener中有notifyCheckpointComplete方法,checkpoint完成之后的通知方法,这里可以做一些额外的操作,比如真正提交kafka的事务;在2PC中提到如果对应流程2预提交失败,那么本次checkpoint就被取消不会执行,不会影响数据一致性.如果流程4失败,那么重启从上一次的checkpoints重新计算。
内部的checkpoint 机制我们已经有了了解,那source 和sink 具体又是怎样运行的呢?接下来我们逐步做一个分析。
我们知道Flink 由JobManager 协调各个TaskManager 进行checkpoint 存储,checkpoint 保存在StateBackend 中,默认StateBackend 是内存级的,也可以改为文件级的进行持久化保存。
当checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier 会在算子间传递下去。
每个算子会对当前的状态做个快照,保存到状态后端。对于source 任务而言,就会把当前的offset 作为状态保存起来。下次从checkpoint 恢复时,source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。
每个内部的transform 任务遇到barrier 时,都会把状态存到checkpoint 里。sink 任务首先把数据写入外部kafka,这些数据都属于预提交的事务(还不能被消费);当遇到barrier 时,把状态保存到状态后端,并开启新的预提交事务。
当所有算子任务的快照完成,也就是这次的checkpoint 完成时,JobManager 会向所有任务发通知,确认这次checkpoint 完成。
当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。
所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink 操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
具体的两阶段提交步骤总结如下:
所以我们也可以看到,如果宕机需要通过StateBackend 进行恢复,只能恢复所有确认提交的操作。
MemoryStateBackend 默认的就是这种
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager 的JVM 堆上;执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中。
缺点:1.只能保存数据量小的状态。2.状态数据有可能会丢失
优点:开发测试很方便
FsStateBackend
将state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中,可以使用hdfs等分布式文件系统。
缺点:状态大小受TaskManager内存限制(默认支持5M)
优点:1.状态访问速度很快。2.状态信息不会丢失
用于: 生产,也可存储状态数据量大的情况
RocksDBStateBackend
RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时RocksDB需要配置一个远端的filesystem。RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
缺点:状态访问速度有所下降
优点:1.可以存储超大量的状态信息。2.状态信息不会丢失
用于:生产,可以存储超大量的状态信息
env.setStateBackend(new
FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend),
filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
- //获取flink的运行环境
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //设置statebackend
- env.setStateBackend(new MemoryStateBackend());
-
- CheckpointConfig config = env.getCheckpointConfig();
-
- // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
- config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
- // 设置checkpoint的周期, 每隔1000 ms进行启动一个检查点
- config.setCheckpointInterval(1000);
-
- // 设置模式为exactly-once
- config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
- // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
- config.setMinPauseBetweenCheckpoints(500);
- // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
- config.setCheckpointTimeout(60000);
- // 同一时间只允许进行一个检查点
- config.setMaxConcurrentCheckpoints(1);
Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数restart-strategy 定义了哪个策略被使用。
常用的重启策略:
(1)固定间隔 (Fixed delay)
(2)失败率 (Failure rate)
(3)无重启 (No restart)
如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略, 尝试重启次数默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置。
- 第一种:全局配置 flink-conf.yaml
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 3
- restart-strategy.fixed-delay.delay: 10 s
- 第二种:应用代码设置
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- 3, // 尝试重启的次数
- Time.of(10, TimeUnit.SECONDS) // 间隔
- ));
- 第一种:全局配置 flink-conf.yaml
- restart-strategy: failure-rate
- restart-strategy.failure-rate.max-failures-per-interval: 3
- restart-strategy.failure-rate.failure-rate-interval: 5 min
- restart-strategy.failure-rate.delay: 10 s
- 第二种:应用代码设置
- env.setRestartStrategy(RestartStrategies.failureRateRestart(
- 3, // 一个时间段内的最大失败次数
- Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
- Time.of(10, TimeUnit.SECONDS) // 间隔
- ));
- 第一种:全局配置 flink-conf.yaml
- restart-strategy: none
- 第二种:应用代码设置
- env.setRestartStrategy(RestartStrategies.noRestart());
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:
state.checkpoints.num-retained: 20
这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点进行恢复
- bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-
- 56/_metadata flink-job.jar
程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据。
当然恢复数据的方式还可以在自己的代码里面指定checkpoint目录,这样下一次启动的时候即使代码发生了改变就自动恢复数据了。
Flink通过Savepoint功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断全局,一致性快照。可以保存数据源offset,operator操作状态等信息,可以从应用在过去任意做了savepoint的时刻开始继续消费
注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过uid(String) 方法手动的给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID。
savepoint的使用
1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:触发一个savepoint【直接触发或者在cancel的时候触发】停止程序:
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
3:从指定的savepoint启动job
bin/flink run -s savepointPath [runArgs]
作业恢复时,二者均可以使用,主要区别如下:
Savepoint | Externalized Checkpoint |
用户通过命令触发,由用户管理其创建与删除 | Checkpoint 完成时,在用户给定的外部持久化存储保存 |
标准化格式存储,允许作业升级或者配置变更 | 当作业 FAILED(或者CANCELED)时,外部存储的 Checkpoint 会保留下来 |
用户在恢复时需要提供用于恢复作业状态的 savepoint 路径 | 用户在恢复时需要提供用于恢复的作业状态的 Checkpoint 路径 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。