赞
踩
在本地IDEA测试中,使用本地文件存储系统,作为checkpoint的存储系统,将备份数据存储到本地文件中,作业停止后,从本地备份数据启动Flink程序。
主要分为两步:
1)备份数据
2)从备份数据启动
备份数据的配置,和使用HDFS文件体统类似,只不过路径填写成本地文件系统的路径,注意格式需要是 file:///******/******/,和HDFS文件系统的配置略有不同。文件具体存储的位置,在idea安装路径的根路径下。比如本人IDEA安装在D盘下,checkpoint地址配置为 file:///Users/flink/checkpoints/TestCheckPoint,那么生成的备份点数据在 D:\Users\flink\checkpoints\TestCheckPoint 目录下。
部分代码如下:
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 禁用全局任务链
- env.disableOperatorChaining();
-
- String brokers = "********.com:9092,********.com:9092,********.com:9092";
- String sourceTopic = "0000-checkpoint-test-source";
- String resultTopic = "0000-checkpoint-test-result";
- String groupId = "demo";
-
- String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";
-
- StateBackend backend = new EmbeddedRocksDBStateBackend(true);
- env.setStateBackend(backend);
-
- CheckpointConfig conf = env.getCheckpointConfig();
- // 任务流取消和故障应保留检查点
- conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- conf.setCheckpointInterval(10000);//milliseconds
- conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
- conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
- conf.setCheckpointStorage(checkPointPath);

生成的备份数据如下
主要区别,在从备份点数据恢复运行程序。
如果是在yarn集群运行,在启动指令中加入 -s hdfs://ns1/flink/***/chk-*** \ 即可;而在本地运行,需要将备份点路径设置到运行环境中,可以通过启动指令设置,也可以在代码中直接设置。为了展示方便,这里直接在代码中设置。
比如,从备份点 D:\Users\flink\checkpoints\TestCheckPoint\3fc3902734d9316b3d8947508e95eabd\chk-9 恢复运行,需要将备份点设置到环境变量中,部分代码如下:
- Configuration configuration = new Configuration();
- configuration.setString("execution.savepoint.path", "file:///Users/flink/checkpoints/TestCheckPoint/3fc3902734d9316b3d8947508e95eabd/chk-9");
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 禁用全局任务链
- env.disableOperatorChaining();
-
- String brokers = "kafka-log1.test.xl.com:9092,kafka-log2.test.xl.com:9092,kafka-log3.test.xl.com:9092";
- String sourceTopic = "0000-checkpoint-test-source";
- String resultTopic = "0000-checkpoint-test-result";
- String groupId = "demo";
-
- String checkPointPath = "file:///Users/flink/checkpoints/TestCheckPoint";
-
- StateBackend backend = new EmbeddedRocksDBStateBackend(true);
- env.setStateBackend(backend);
-
- CheckpointConfig conf = env.getCheckpointConfig();
- // 任务流取消和故障应保留检查点
- conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- conf.setCheckpointInterval(10000);//milliseconds
- conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
- conf.setMinPauseBetweenCheckpoints(10 * 1000);//相邻两次checkpoint之间的时间间隔
- conf.setCheckpointStorage(checkPointPath);

启动程序后,将会从备份点读取状态数据,继续进行计算。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。