赞
踩
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。当然,为了保证exactly-once/at-least-once的特性,还需要数据源支持数据回放。Flink针对不同的容错和消息处理上提供了不同的容错语义,主要分为如下部分:
Flink在实现上诉Checkpoint语义上是依赖于带有barrier的分布式快照+可部分重发的数据源功能实现的。在其周期性的分布式快照中,就保存了各个operator算子中的状态信息。Flink的失败恢复依赖于:检查点机制+可部分重发的数据源。
Flink分布式快照的核心在于stream barrier,barrier是一种特殊标记的消息,它会作为数据流的一部分和数据一起向下流动。barrier不会干扰正常的数据,数据流严格有序。一个barrier把数据流分割成两部分;一部分进入当前的快照,另一部分进入下一个快照。每一个barrier都带有快照ID,并且barrier之前的数据都进入了此快照。barrier不会干扰数据流处理,所以很轻量。多个不同快照的多个barrier会在流中同时出现。即多个快照可能同时创建。而当一个operator从它所有的input channel中都收到barrier,则会触发当前operator的快照操作,并且向其下游channel中发射barrier。当所有的sink都反馈完成快照之后,Checkpoint coordinator认为检查点创建完毕。
接下来将对Checkpoint的执行流程逐步拆解进行讲解,其主要触发为JobMaster中的Checkpoint Coordinator,其是整个Checkpoint的发起者,之后便是source算子,经过一系列的transformation算子最终到达sink至外部持久化存储,其主要执行步骤如下:
为了实现EXACTLY ONCE语义,Flink在对多input channel通道输入的算子会进行barrier对齐操作。也就是说在多通道输入的算子中,其在接收到第一个barrier后不会马上做snapshot,而是等待接受其他input channel的barrier。在等待期间,算子会把barrier已到的channel的record放入input buffer缓存起来(仅缓存,不做计算处理),当所有上游channel的barrier到齐后,当前算子operate才进行异步的快照操作,记录当前自身的state状态,并向所有下游channel发送barrier。之后便开始处理input buffer所缓存的数据以及所属下一个barrier的数据流记录。
而对于AT LEAST ONCE语义,无需进行barrier对齐操作,也就是说在多input channel输入的过程中,无需缓存barrier先到的channel中收集的数据,算子会直接对先到的数据进行处理,所以导致restore时,数据可能会被多次处理。下图是官网文档里面就Checkpoint barrier对齐操作的示意图:
需要特别注意的是,Flink的Checkpoint机制只能保证Flink引擎内的计算过程可以做到EXACTLY ONCE,端到端的EXACTLY ONCE需要source和sink的支持。
在flink中如果需要实现end to end的Exactly-Once语义,需要依赖于其提供的两阶段提交协议TwoPhaseCommitSinkFunction,其主要用来在data sink端中保证Exactly-Once语义,其会把所属当前ckp-n的所有写入数据通过一个事务提交到外部存储。在两个checkpoint之间,一个外部的事务提交绑定了当前ckp所有需要写入的数据。当前为了保证容错,其写入的数据可以被回滚。其两阶段提交协议TwoPhaseCommitSinkFunction的具体过程如下:
接下来通过一个简单的文件操作例子来说明如何使用TwoPhaseCommitSinkFunction。只需要实现四个method,并使sink呈现Exactly-Once语义。
默认情况下,Checkpoint机制是关闭的,需要调用env.enableCheckpointing(n)来开启,每隔n毫秒进行一次Checkpoint。Checkpoint是一种负载较重的任务,如果状态比较大,同时n值又比较小,那可能一次Checkpoint还没完成,下次Checkpoint已经被触发,占用太多本该用于正常数据处理的资源。增大n值意味着一个作业的Checkpoint次数更少,整个作业用于进行Checkpoint的资源更小,可以将更多的资源用于正常的流数据处理。同时,更大的n值意味着重启后,整个作业需要从更长的Offset开始重新处理数据。
此外,还有一些其他参数需要配置,这些参数统一封装在了CheckpointConfig里:
val cpConfig: CheckpointConfig = env.getCheckpointConfig
默认的Checkpoint配置是支持Exactly-Once设置的,这样能保证在重启恢复时,所有算子的状态对任一条数据只处理一次。用上文的Checkpoint原理来说,使用Exactly-Once就是进行了Checkpoint Barrier对齐,因此会有一定的延迟。如果作业延迟小,那么应该使用At-Least-Once投递,不进行对齐,但某些数据会被处理多次。
- // 使用At-Least-Once
- env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
如果一次Checkpoint超过一定时间仍未完成,直接将其终止,以免其占用太多资源:
- // 超时时间1小时
- env.getCheckpointConfig.setCheckpointTimeout(3600*1000)
如果两次Checkpoint之间的间歇时间太短,那么正常的作业可能获取的资源较少,更多的资源被用在了Checkpoint上。对这个参数进行合理配置能保证数据流的正常处理。比如,设置这个参数为60秒,那么前一次Checkpoint结束后60秒内不会启动新的Checkpoint。这种模式只在整个作业最多允许1个Checkpoint时适用。
- // 两次Checkpoint的间隔为60秒
- env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)
默认情况下一个作业只允许1个Checkpoint执行,如果某个Checkpoint正在进行,另外一个Checkpoint被启动,新的Checkpoint需要挂起等待。
- // 最多同时进行3个Checkpoint
- env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)
如果这个参数大于1,将与前面提到的最短间隔相冲突。
Checkpoint的初衷是用来进行故障恢复,如果作业是因为异常而失败,Flink会保存远程存储上的数据;如果开发者自己取消了作业,远程存储上的数据都会被删除。如果开发者希望通过Checkpoint数据进行调试,自己取消了作业,同时希望将远程数据保存下来,需要设置为:
- // 作业取消后仍然保存Checkpoint
- env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
RETAIN_ON_CANCELLATION模式下,用户需要自己手动删除远程存储上的Checkpoint数据。
默认情况下,如果Checkpoint过程失败,会导致整个应用重启,我们可以关闭这个功能,这样Checkpoint失败不影响作业的运行。
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。