赞
踩
Checkpoints
开始恢复
遇到故障之后,第一步就是重启应用
第二步是从checkpoint中读取状态,将状态重置
从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
检查点分界线(Checkpoint Barrier)
其实就是当JobManager向source发送Cheeckpoints ID时,source就会在当前位置插入barrier点,当source执行到barrier点时,就会停下工作,执行快照操作,但后面的任务并没有停止的,而是继续执行没有执行完的数据,当source执行完快照任务时,就会向后面的任务广播barrier点,后面的任务当处理到barrier点时同样会执行快照操作,依旧不影响其他任务执行,然后依次往下传递barrier点,直到所有任务都执行了快照操作,最后会将这次的所有快照进行合并,就得到了一次完整得快照
//配置间隔1秒
env.enableCheckpointing(1000L)
//AT_LEAST_ONCE:最少一次
//EXACTLY_ONCE:只有一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//配置超时时间60秒
env.getCheckpointConfig.setCheckpointTimeout(60000L)
//配置最大并行的Checkpoint数量为2
env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
//配置至少间隔500毫秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L)
//配置优先使用Checkpoint恢复故障
env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
//配置最大容忍失败次数为3
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
配置示例
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //配置Checkponts间隔时间 env.enableCheckpointing(1000L) //配置严格程度 //AT_LEAST_ONCE:最少一次 //EXACTLY_ONCE:只有一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //设置超时时间,超时的部分丢弃 env.getCheckpointConfig.setCheckpointTimeout(60000L) //最大并行的Checkponts(默认不配的情况下是一个) env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) //两次Checkpoint之间至少间隔的时间 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L) //配置是否更倾向于使用Chechpoint恢复故障,默认false env.getCheckpointConfig.setPreferCheckpointForRecovery(true) //配置Checkpoint失败最大容忍次数(默认0次,如果Checkpoint失败也重启) env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。