当前位置:   article > 正文

flink-checkpoint实战使用_flink手动触发checkpoint

flink手动触发checkpoint

依赖在我上几篇文章有

// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);

// 高级选项:

// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 开启在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

以上是代码配置在实际项目中的配置情况如下
在这里插入图片描述

连接kafka消费 输出


测试一下 flink run  -c hbase.kafkaconn  kafka_test.jar


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在kafka输入数据

在这里插入图片描述
能正常消费
在这里插入图片描述
数据分布在不同节点 所以我只截取了一个节点的数据
此时stop作业后 继续在kafka生产数据

在这里插入图片描述
这是程序运行后的 flink checkpoint 安装作业ID 生成的备份 (最后面是作业ID)
开始 savepoint

在这里插入图片描述

此时已经把这个作业的 此刻最新的状态save到这个目录

在这里插入图片描述
然后启动flink 作业的时候从这个保存的状态去运行恢复
在这里插入图片描述

然后flink会继续消费之前没有消费到的数据

在这里插入图片描述

触发保存点
触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。此目录的位置可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制(请参阅:targetDirectory参数)。

注意:目标目录必须是 JobManager(s)TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。
例如使用 aFsStateBackend或RocksDBStateBackend:

# Savepoint target directory
/savepoints/

# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/

# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...
注意: 虽然看起来好像可以移动保存点,但由于_metadata文件中的绝对路径,目前不可能。请关注FLINK-5778以了解解除此限制的进展。
请注意,如果使用MemoryStateBackend,元数据和保存点状态将存储在_metadata文件中。由于它是独立的,您可以移动文件并从任何位置恢复。

注意:不鼓励移动或删除正在运行的作业的最后一个保存点,因为这可能会干扰故障恢复。保存点对一次性接收器有副作用,因此为了确保一次性语义,如果在最后一个保存点之后没有检查点,则保存点将用于恢复。
触发保存点
$ bin/flink savepoint :jobId [:targetDirectory]
这将触发 ID 作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来恢复和处理保存点。

使用 YARN 触发保存点
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
这将触发带有 ID:jobId和 YARN 应用程序 ID的作业的保存点:yarnAppId,并返回创建的保存点的路径。

使用保存点取消作业
$ bin/flink cancel -s [:targetDirectory] :jobId
这将自动触发带有 ID 的作业的保存点:jobid并取消作业。此外,您可以指定一个目标文件系统目录来存储保存点。该目录需要由 JobManager(s)TaskManager(s) 访问。

从保存点恢复
$ bin/flink run -s :savepointPath [:runArgs]
这将提交一个作业并指定一个要从中恢复的保存点。您可以提供保存点目录或_metadata文件的路径。

允许非恢复状态
默认情况下,恢复操作将尝试将保存点的所有状态映射回您正在使用的程序。如果你删除了一个操作符,你可以允许跳过不能通过--allowNonRestoredState(short -n:) 选项映射到新程序的状态:

$ bin/flink run -s :savepointPath -n [:runArgs]
处理保存点
$ bin/flink savepoint -d :savepointPath
这将处理存储在:savepointPath.

请注意,还可以通过常规文件系统操作手动删除保存点,而不会影响其他保存点或检查点(回想一下,每个保存点都是独立的)。在 Flink 1.2 之前,这是一个更繁琐的任务,通过上面的 savepoint 命令来执行。

配置
您可以通过state.savepoints.dir密钥配置默认保存点目标目录。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅:targetDirectory参数)。

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints
如果您既未配置默认目录也未指定自定义目标目录,则触发保存点将失败。

注意:目标目录必须是 JobManager(s)TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/876352
推荐阅读
相关标签
  

闽ICP备14008679号