当前位置:   article > 正文

flink checkpoint配置整理_flinkcdc checkpoint

flinkcdc checkpoint

版本

flink 1.14.4

代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 启用 checkpoint,设置触发间隔(两次执行开始时间间隔)
env.enableCheckpointing(3000);
// 模式支持EXACTLY_ONCE()/AT_LEAST_ONCE()
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 存储位置,FileSystemCheckpointStorage(文件存储)
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///c:/cdc/checkpoint/"));
// savepoint存储位置
// env.setDefaultSavepointDirectory("file:///c:/cdc/checkpoint/");
// 超时时间,checkpoint没在时间内完成则丢弃
env.getCheckpointConfig().setCheckpointTimeout(10000L);
// 同时并发数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); 
// 最小间隔时间(前一次结束时间,与下一次开始时间间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); 
// 外部checkpoint(例如文件存储)清除策略
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

CheckpointingMode

checkpoint模式

描述
EXACTLY_ONCE每条数据只会被处理一次
AT_LEAST_ONCE至少执行一次

ExternalizedCheckpointCleanup

外部checkpoint清除策略

描述
DELETE_ON_CANCELLATION取消job时删除,只有job执行失败的场景会保留checkpoint
RETAIN_ON_CANCELLATION取消job时保留
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/510209
推荐阅读
相关标签
  

闽ICP备14008679号