当前位置:   article > 正文

flink —— checkpoint机制_flink checkpoint

flink checkpoint

引言

checkpoint

刚接触这个机制的是在spark框架中,spark中Lineage(血统)是spark能快速恢复容错的基本,有cache和persist(都是RDD内存缓存),区别只是在于一个包装,cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

而存在内存中,保证局部rdd恢复,但如果整个job挂了,内存中的缓存也就不见了,所以出现了checkpoint机制。

checkpoint与cache和persist方案一样,都是将RDD缓存,只是这个是缓存到文件系统上(一般是hdfs),分spark和sparkstreaming,一个缓存RDD依赖关系,一个缓存DstreamGraph(DAG)。

1. Flink Checkpoint,Flink Savepoint

先简单说一下俩个比较容易混淆的机制,简单区别来看得话:

Checkpoint(检查点):task级别容错机制

Savepoint(保存点):job级别容错机制

对比:

  1. Checkpoint自动容错,Savepoint程序全局状态镜像。
  2. Checkpoint应用场景是程序自动容错,快速恢复。Savepoint应用于程序修改后,暂停后从状态恢复,一般用于程序升级。
  3. Checkpoint是系统行为,自动触发。Savepoint的触发由用户控制。
  4. Checkpoint默认程序停掉后删除。Savepoint默认会一直保存。

2. Flink Checkpoint

2.1 Flink Checkpoint 原理

Flink Checkpoint 机制保证 Flink 任务运行突然失败时,能够从最近 Checkpoint 进行状态恢复启动,进行错误容忍。它是一种自动容错机制,而不是具体的状态存储镜像(Savepoint)。Flink Checkpoint 受 Chandy-Lamport 分布式快照启发,其内部使用分布式数据流轻量级异步快照

对于Chandy-Lamport算法公式不必深究,此算法功能可根据异步快照推测出:分布式节点以状态规则同时快照。

正常快照单机很简单,但多节点的程序快照比较困难,因为很难做到同时。一般可以使用lock锁来阻止通信期间其他节点状态不一,但这样肯定很影响效率。Chandy-Lamport算法就是规避锁机制达到分布式快照。具体可自行研究。

2.1.1 barrier

而flink实现检查点一致就是通过barrier实现的,

Barrier是由流数据源(stream source)注入数据流中,并作为数据流的一部分与数据记录一起往下游流动。Barriers将流里的记录分隔为一段一段的记录集,每一个记录集都对应一个快照。每个Barrier会携带一个快照的ID,这个快照对应Barrier前面的记录集。如下图所示。

当一个算子从所有输入流都接收到一个快照(n)的barrier时,它首先会生成该算子的状态快照,然后往该算子的所有下游广播一个barrier。这个算子是sink算子时,它会告知检查点的 coordinator(一般是flink的jobManager),当所有sink算子都告知接收到了一个快照的barrier时,该快照生成结束。

以上是单线程数据快照,而多线程就多了一个概念,barrier对齐。

当一个算子接收到多于一个输入流时,就需要进行这些流的barrier对齐。当一个算子接收到第一个输入流的快照barrier n时,它不能继续处理该流的其他数据,而是需要等待接收到最后一个流的barrier n,才可以生成算子的状态快照和发送挂起的输出记录,然后发送快照barrier n。否则,检查点快照n跟检查点n+1就会混淆。


检查点对齐保证了状态的准确性,但由于对齐操作是阻塞式的,会使检查点生成时长不可控,降低吞吐量,当作业出现反压时,会加剧反压,甚至导致作业不稳定等问题。

因上面风险,Flink 1.11开始,检查点也可以时非对齐的,类似Chandy-Lamport,与处理瓶颈的解决方式差不多,就是将barrier对齐,从阻塞同步改成异步,在最后sink处合并,弊端就是每条流多了状态快照增大后端存储IO,所以视程序目的配置。


2.1.2 Checkpoint

首先我们来看一下一个简单的Checkpoint的大致流程:

  1. 暂停处理新流入数据,将新数据缓存起来。
  2. 将算子子任务的本地状态数据拷贝到一个远程的持久化存储上。
  3. 继续处理新流入的数据,包括刚才缓存起来的数据。

 

Flink Checkpoint 功能与spark一样,也是一种容错恢复机制。Checkpoint是针对状态做的容错,保存的状态在程序取消时,默认会进行清除。Checkpoint 状态保留策略有两种:

  1. DELETE_ON_CANCELLATION 表示当程序取消时,删除 Checkpoint 存储文件。
  2. RETAIN_ON_CANCELLATION 表示当程序取消时,保存之前的 Checkpoint 存储文件

通过flink执行器可直接设置Checkpoint:

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. // start a checkpoint every 1000 ms
  3. env.enableCheckpointing(1000)
  4. // advanced options:
  5. // set mode to exactly-once (this is the default)
  6. env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  7. // make sure 500 ms of progress happen between checkpoints
  8. env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  9. // checkpoints have to complete within one minute, or are discarded
  10. env.getCheckpointConfig.setCheckpointTimeout(60000)
  11. // prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
  12. env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
  13. // allow only one checkpoint to be in progress at the same time
  14. env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  15. // enables the experimental unaligned checkpoints
  16. env.getCheckpointConfig.enableUnalignedCheckpoints()

2.2 Flink Checkpoint存储位置

flink提供的三种可用的状态后端,用于设置检查点存储位置,根据不同场景调整使用

  1. MemoryStateBackend:将状态维护在java堆内存,keystate和operator用哈希表存储数据,默认配置状态大小不超过5M,适合本地调试。
  2. FsStateBackend:配置文件系统,一般用hdfs。与spark不同的是spark只是存到hdfs,有调优经验的人一般会先cache一下在checkpoint,而flink直接将两步结合了,正在进行的程序,会先存在taskmanager的内存中,然后快照写入文件系统(默认异步)中来达到速度与安全。
  3. RocksDBStateBackend:配置文件系统,与上面差不多,只是将taskmanager内存换成了嵌入式数据库RocksDB,然后在快照到文件系统(默认异步),与上面一样都会把快照元数据写入JobManager的内存中。

2.3 Flink Checkpoint语义

支持俩种语义

1、Exactly Once(默认)

2、At least Once 

语义是针对 Flink 任务状态而言的,而不是说 Flink 程序对其处理的次数。例如你使用默认的Exactly Once,但因为Checkpoint有持续时间,在你正在做Checkpoint的时候,程序挂了,那么证明这次Checkpoint未完成,就会从上次Checkpoint的状态恢复,在做Checkpoint的流还会进入程序处理。

两种语义主要区别在barrier对齐上,Exactly Once在barrier时会阻塞此线程流,等其他线程barrier对齐后放行,At least Once不会阻塞。

2.4 Flink Checkpoint配置建议

1、Checkpoint启动的时间间隔,Checkpoint过程的时间,如果启动间隔小于Checkpoint过程时间,就相当于程序一直Checkpoint,保证了安全,消耗了资源,所以看业务场景配置。

2、Flink状态存储,远程恢复肯定比本地慢,多一层网络io,所以如果状态很大的话,可以配置本地state.backend.local-recovery。

3、Checkpoint保存数默认1,可以调高保证状态文件一直可用。

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/876368
推荐阅读
相关标签
  

闽ICP备14008679号