当前位置:   article > 正文

Flink本地checkpoint测试_flinksql 怎么在代码中指定checkpoint启动

flinksql 怎么在代码中指定checkpoint启动

一、概述

在本地IDEA测试中,使用本地文件存储系统,作为checkpoint的存储系统,将备份数据存储到本地文件中,作业停止后,从本地备份数据启动Flink程序。

主要分为两步:

1)备份数据

2)从备份数据启动

二、备份数据

备份数据的配置,和使用HDFS文件体统类似,只不过路径填写成本地文件系统的路径,注意格式需要是 file:///******/******/,和HDFS文件系统的配置略有不同。文件具体存储的位置,在idea安装路径的根路径下。比如本人IDEA安装在D盘下,checkpoint地址配置为 file:///Users/flink/checkpoints/TestCheckPoint,那么生成的备份点数据在 D:\Users\flink\checkpoints\TestCheckPoint 目录下。

部分代码如下:

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // 禁用全局任务链
  3. env.disableOperatorChaining();
  4. String brokers = "********.com:9092,********.com:9092,********.com:9092";
  5. String sourceTopic = "0000-checkpoint-test-source";
  6. String resultTopic = "0000-checkpoint-test-result";
  7. String groupId = "demo";
  8. String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";
  9. StateBackend backend = new EmbeddedRocksDBStateBackend(true);
  10. env.setStateBackend(backend);
  11. CheckpointConfig conf = env.getCheckpointConfig();
  12. // 任务流取消和故障应保留检查点
  13. conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  14. conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  15. conf.setCheckpointInterval(10000);//milliseconds
  16. conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
  17. conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
  18. conf.setCheckpointStorage(checkPointPath);

生成的备份数据如下

 

 

三、从备份数据启动

主要区别,在从备份点数据恢复运行程序。

如果是在yarn集群运行,在启动指令中加入 -s hdfs://ns1/flink/***/chk-*** \ 即可;而在本地运行,需要将备份点路径设置到运行环境中,可以通过启动指令设置,也可以在代码中直接设置。为了展示方便,这里直接在代码中设置。

比如,从备份点  D:\Users\flink\checkpoints\TestCheckPoint\3fc3902734d9316b3d8947508e95eabd\chk-9 恢复运行,需要将备份点设置到环境变量中,部分代码如下:

  1. Configuration configuration = new Configuration();
  2. configuration.setString("execution.savepoint.path", "file:///Users/flink/checkpoints/TestCheckPoint/3fc3902734d9316b3d8947508e95eabd/chk-9");
  3. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. // 禁用全局任务链
  5. env.disableOperatorChaining();
  6. String brokers = "kafka-log1.test.xl.com:9092,kafka-log2.test.xl.com:9092,kafka-log3.test.xl.com:9092";
  7. String sourceTopic = "0000-checkpoint-test-source";
  8. String resultTopic = "0000-checkpoint-test-result";
  9. String groupId = "demo";
  10. String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";
  11. StateBackend backend = new EmbeddedRocksDBStateBackend(true);
  12. env.setStateBackend(backend);
  13. CheckpointConfig conf = env.getCheckpointConfig();
  14. // 任务流取消和故障应保留检查点
  15. conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  16. conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  17. conf.setCheckpointInterval(10000);//milliseconds
  18. conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
  19. conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
  20. conf.setCheckpointStorage(checkPointPath);

启动程序后,将会从备份点读取状态数据,继续进行计算。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号