赞
踩
依赖在我上几篇文章有
// 每 1000ms 开始一次 checkpoint env.enableCheckpointing(1000); // 高级选项: // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Checkpoint 必须在一分钟内完成,否则就会被抛弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 开启在 job 中止后仍然保留的 externalized checkpoints env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
以上是代码配置在实际项目中的配置情况如下
连接kafka消费 输出
测试一下 flink run -c hbase.kafkaconn kafka_test.jar
在kafka输入数据
能正常消费
数据分布在不同节点 所以我只截取了一个节点的数据
此时stop作业后 继续在kafka生产数据
这是程序运行后的 flink checkpoint 安装作业ID 生成的备份 (最后面是作业ID)
开始 savepoint
此时已经把这个作业的 此刻最新的状态save到这个目录
然后启动flink 作业的时候从这个保存的状态去运行恢复
然后flink会继续消费之前没有消费到的数据
触发保存点 触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。此目录的位置可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制(请参阅:targetDirectory参数)。 注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。 例如使用 aFsStateBackend或RocksDBStateBackend: # Savepoint target directory /savepoints/ # Savepoint directory /savepoints/savepoint-:shortjobid-:savepointid/ # Savepoint file contains the checkpoint meta data /savepoints/savepoint-:shortjobid-:savepointid/_metadata # Savepoint state /savepoints/savepoint-:shortjobid-:savepointid/... 注意: 虽然看起来好像可以移动保存点,但由于_metadata文件中的绝对路径,目前不可能。请关注FLINK-5778以了解解除此限制的进展。 请注意,如果使用MemoryStateBackend,元数据和保存点状态将存储在_metadata文件中。由于它是独立的,您可以移动文件并从任何位置恢复。 注意:不鼓励移动或删除正在运行的作业的最后一个保存点,因为这可能会干扰故障恢复。保存点对一次性接收器有副作用,因此为了确保一次性语义,如果在最后一个保存点之后没有检查点,则保存点将用于恢复。 触发保存点 $ bin/flink savepoint :jobId [:targetDirectory] 这将触发 ID 作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来恢复和处理保存点。 使用 YARN 触发保存点 $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId 这将触发带有 ID:jobId和 YARN 应用程序 ID的作业的保存点:yarnAppId,并返回创建的保存点的路径。 使用保存点取消作业 $ bin/flink cancel -s [:targetDirectory] :jobId 这将自动触发带有 ID 的作业的保存点:jobid并取消作业。此外,您可以指定一个目标文件系统目录来存储保存点。该目录需要由 JobManager(s) 和 TaskManager(s) 访问。 从保存点恢复 $ bin/flink run -s :savepointPath [:runArgs] 这将提交一个作业并指定一个要从中恢复的保存点。您可以提供保存点目录或_metadata文件的路径。 允许非恢复状态 默认情况下,恢复操作将尝试将保存点的所有状态映射回您正在使用的程序。如果你删除了一个操作符,你可以允许跳过不能通过--allowNonRestoredState(short -n:) 选项映射到新程序的状态: $ bin/flink run -s :savepointPath -n [:runArgs] 处理保存点 $ bin/flink savepoint -d :savepointPath 这将处理存储在:savepointPath. 请注意,还可以通过常规文件系统操作手动删除保存点,而不会影响其他保存点或检查点(回想一下,每个保存点都是独立的)。在 Flink 1.2 之前,这是一个更繁琐的任务,通过上面的 savepoint 命令来执行。 配置 您可以通过state.savepoints.dir密钥配置默认保存点目标目录。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅:targetDirectory参数)。 # Default savepoint target directory state.savepoints.dir: hdfs:///flink/savepoints 如果您既未配置默认目录也未指定自定义目标目录,则触发保存点将失败。 注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。