赞
踩
flink 1.14.4
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 checkpoint,设置触发间隔(两次执行开始时间间隔)
env.enableCheckpointing(3000);
// 模式支持EXACTLY_ONCE()/AT_LEAST_ONCE()
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 存储位置,FileSystemCheckpointStorage(文件存储)
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///c:/cdc/checkpoint/"));
// savepoint存储位置
// env.setDefaultSavepointDirectory("file:///c:/cdc/checkpoint/");
// 超时时间,checkpoint没在时间内完成则丢弃
env.getCheckpointConfig().setCheckpointTimeout(10000L);
// 同时并发数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 最小间隔时间(前一次结束时间,与下一次开始时间间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// 外部checkpoint(例如文件存储)清除策略
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
值 | 描述 |
---|---|
EXACTLY_ONCE | 每条数据只会被处理一次 |
AT_LEAST_ONCE | 至少执行一次 |
外部checkpoint清除策略
值 | 描述 |
---|---|
DELETE_ON_CANCELLATION | 取消job时删除,只有job执行失败的场景会保留checkpoint |
RETAIN_ON_CANCELLATION | 取消job时保留 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。