赞
踩
checkpoint
刚接触这个机制的是在spark框架中,spark中Lineage(血统)是spark能快速恢复容错的基本,有cache和persist(都是RDD内存缓存),区别只是在于一个包装,cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
而存在内存中,保证局部rdd恢复,但如果整个job挂了,内存中的缓存也就不见了,所以出现了checkpoint机制。
checkpoint与cache和persist方案一样,都是将RDD缓存,只是这个是缓存到文件系统上(一般是hdfs),分spark和sparkstreaming,一个缓存RDD依赖关系,一个缓存DstreamGraph(DAG)。
先简单说一下俩个比较容易混淆的机制,简单区别来看得话:
Checkpoint(检查点):task级别容错机制
Savepoint(保存点):job级别容错机制
对比:
Flink Checkpoint 机制保证 Flink 任务运行突然失败时,能够从最近 Checkpoint 进行状态恢复启动,进行错误容忍。它是一种自动容错机制,而不是具体的状态存储镜像(Savepoint)。Flink Checkpoint 受 Chandy-Lamport 分布式快照启发,其内部使用分布式数据流轻量级异步快照。
对于Chandy-Lamport算法公式不必深究,此算法功能可根据异步快照推测出:分布式节点以状态规则同时快照。
正常快照单机很简单,但多节点的程序快照比较困难,因为很难做到同时。一般可以使用lock锁来阻止通信期间其他节点状态不一,但这样肯定很影响效率。Chandy-Lamport算法就是规避锁机制达到分布式快照。具体可自行研究。
而flink实现检查点一致就是通过barrier实现的,
Barrier是由流数据源(stream source)注入数据流中,并作为数据流的一部分与数据记录一起往下游流动。Barriers将流里的记录分隔为一段一段的记录集,每一个记录集都对应一个快照。每个Barrier会携带一个快照的ID,这个快照对应Barrier前面的记录集。如下图所示。
当一个算子从所有输入流都接收到一个快照(n)的barrier时,它首先会生成该算子的状态快照,然后往该算子的所有下游广播一个barrier。这个算子是sink算子时,它会告知检查点的 coordinator(一般是flink的jobManager),当所有sink算子都告知接收到了一个快照的barrier时,该快照生成结束。
以上是单线程数据快照,而多线程就多了一个概念,barrier对齐。
当一个算子接收到多于一个输入流时,就需要进行这些流的barrier对齐。当一个算子接收到第一个输入流的快照barrier n时,它不能继续处理该流的其他数据,而是需要等待接收到最后一个流的barrier n,才可以生成算子的状态快照和发送挂起的输出记录,然后发送快照barrier n。否则,检查点快照n跟检查点n+1就会混淆。
检查点对齐保证了状态的准确性,但由于对齐操作是阻塞式的,会使检查点生成时长不可控,降低吞吐量,当作业出现反压时,会加剧反压,甚至导致作业不稳定等问题。
因上面风险,Flink 1.11开始,检查点也可以时非对齐的,类似Chandy-Lamport,与处理瓶颈的解决方式差不多,就是将barrier对齐,从阻塞同步改成异步,在最后sink处合并,弊端就是每条流多了状态快照增大后端存储IO,所以视程序目的配置。
首先我们来看一下一个简单的Checkpoint的大致流程:
Flink Checkpoint 功能与spark一样,也是一种容错恢复机制。Checkpoint是针对状态做的容错,保存的状态在程序取消时,默认会进行清除。Checkpoint 状态保留策略有两种:
通过flink执行器可直接设置Checkpoint:
- val env = StreamExecutionEnvironment.getExecutionEnvironment()
-
- // start a checkpoint every 1000 ms
- env.enableCheckpointing(1000)
-
- // advanced options:
-
- // set mode to exactly-once (this is the default)
- env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-
- // make sure 500 ms of progress happen between checkpoints
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
-
- // checkpoints have to complete within one minute, or are discarded
- env.getCheckpointConfig.setCheckpointTimeout(60000)
-
- // prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
- env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
-
- // allow only one checkpoint to be in progress at the same time
- env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
-
- // enables the experimental unaligned checkpoints
- env.getCheckpointConfig.enableUnalignedCheckpoints()
flink提供的三种可用的状态后端,用于设置检查点存储位置,根据不同场景调整使用
- MemoryStateBackend:将状态维护在java堆内存,keystate和operator用哈希表存储数据,默认配置状态大小不超过5M,适合本地调试。
-
- FsStateBackend:配置文件系统,一般用hdfs。与spark不同的是spark只是存到hdfs,有调优经验的人一般会先cache一下在checkpoint,而flink直接将两步结合了,正在进行的程序,会先存在taskmanager的内存中,然后快照写入文件系统(默认异步)中来达到速度与安全。
-
- RocksDBStateBackend:配置文件系统,与上面差不多,只是将taskmanager内存换成了嵌入式数据库RocksDB,然后在快照到文件系统(默认异步),与上面一样都会把快照元数据写入JobManager的内存中。
-
支持俩种语义
1、Exactly Once(默认)
2、At least Once
语义是针对 Flink 任务状态而言的,而不是说 Flink 程序对其处理的次数。例如你使用默认的Exactly Once,但因为Checkpoint有持续时间,在你正在做Checkpoint的时候,程序挂了,那么证明这次Checkpoint未完成,就会从上次Checkpoint的状态恢复,在做Checkpoint的流还会进入程序处理。
两种语义主要区别在barrier对齐上,Exactly Once在barrier时会阻塞此线程流,等其他线程barrier对齐后放行,At least Once不会阻塞。
1、Checkpoint启动的时间间隔,Checkpoint过程的时间,如果启动间隔小于Checkpoint过程时间,就相当于程序一直Checkpoint,保证了安全,消耗了资源,所以看业务场景配置。
2、Flink状态存储,远程恢复肯定比本地慢,多一层网络io,所以如果状态很大的话,可以配置本地state.backend.local-recovery。
3、Checkpoint保存数默认1,可以调高保证状态文件一直可用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。