赞
踩
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置statebackend,默认保存在内存中;可以设置保存到本地或者HDFS
env.setStateBackend(new FsStateBackend("localPath | hdfsPATH"))
// 开启checkpoint, 并设置其周期间隔为10秒; 默认不开启checkpoint
env.enableCheckpointing(10000L)
}
设置ck的执行语义; 可供选择的语义有:最多一次,至少一次,精确一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
设置checkpoint的超时时间为60s,是指做一次checkpoint的时间;如果超时则认为本次checkpoint失败,这个checkpoint就丢了,继续一下一次checkpoint即可
env.getCheckpointConfig.setCheckpointTimeout(60000)
设置程序中同时允许几个checkpoint任务同时进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
设置两次checkpoint的间隔,确保检查点之间有1s的时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
设置允许当前的checkpoint最多失败几次
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
设置任务取消时是否保留检查点
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
设置checkpoint出现问题,是否让程序报错还是继续任务进行下一次checkpoint
如果是false,checkpoint出现问题我们允许程序继续执行,如果下次checkpoint成功则没有问题;如果程序下次checkpoint也没有成功,此时程序挂掉需要从checkpoint中恢复数据时,可能导致程序计算错误,或者是重复计算数据。
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
重启策略的配置
按照固定的延迟去重启,第一个参数是尝试重启的次数,第二次是两次重启的时间间隔
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
失败率重启,在某个时间间隔内,失败的次数最大为多少次。
env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。