当前位置:   article > 正文

Flink基础(九):Checkpoint的说明和用法_flink checkpoint使用

flink checkpoint使用

Checkpoint 在 Flink 中是一个非常重要的 Feature,Checkpoint 使 Flink 的状态具有良好的容错性,通过 Checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。

Checkpoint 介绍及使用

Flink 的 Checkpoint 有以下先决条件:

  • 需要具有持久性且支持重放一定时间范围内数据的数据源。例如:Kafka、RabbitMQ 等。
  • 需要一个能保存状态的持久化存储介质,例如:HDFS、S3 等。

Flink 中 Checkpoint 是默认关闭的,对于需要保障 At Least Once 和 Exactly Once 语义的任务,强烈建议开启 Checkpoint,对于丢一小部分数据不敏感的任务,可以不开启 Checkpoint,例如:一些推荐相关的任务丢一小部分数据并不会影响推荐效果。下面来介绍 Checkpoint 具体如何使用。

首先调用 StreamExecutionEnvironment 的方法 enableCheckpointing(n) 来开启 Checkpoint,参数 n 以毫秒为单位表示 Checkpoint 的时间间隔。Checkpoint 配置相关的 Java 代码如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint
env.enableCheckpointing(1000);

// Checkpoint 语义设置为 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// CheckPoint 的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一时间,只允许 有 1 个 Checkpoint 在发生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 两次 Checkpoint 之间的最小时间间隔为 500 毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

// 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

// Checkpoint 失败后,整个 Flink 任务也会失败(flink 1.9 之前)
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(true)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

以上 Checkpoint 相关的参数描述如下所示:

  • Checkpoint 语义:EXACTLYONCE 或 ATLEASTONCE,EXACTLYONCE
    表示所有要消费的数据被恰好处理一次,即所有数据既不丢数据也不重复消费;ATLEASTONCE
    表示要消费的数据至少处理一次,可能会重复消费。

  • Checkpoint 超时时间:如果 Checkpoint 时间超过了设定的超时时间,则 Checkpoint 将会被终止。

  • 同时进行的 Checkpoint 数量:默认情况下,当一个 Checkpoint 在进行时,JobManager 将不会触发下一个
    Checkpoint,但 Flink 允许多个 Checkpoint 同时在发生。

  • 两次 Checkpoint 之间的最小时间间隔:从上一次 Checkpoint 结束到下一次 Checkpoint开始,中间的间隔时间。例如,env.enableCheckpointing(60000) 表示 1 分钟触发一次
    Checkpoint,同时再设置两次 Checkpoint 之间的最小时间间隔为 30 秒,假如任务运行过程中一次 Checkpoint就用了50s,那么等 Checkpoint 结束后,理论来讲再过 10s 就要开始下一次 Checkpoint了,但是由于设置了最小时间间隔为30s,所以需要再过 30s 后,下次 Checkpoint才开始。注:如果配置了该参数就决定了同时进行的 Checkpoint 数量只能为 1。

  • 当任务被取消时,外部 Checkpoint 信息是否被清理:Checkpoint 在默认的情况下仅用于恢复运行失败的 Flink
    任务,当任务手动取消时 Checkpoint 产生的状态信息并不保留。当然可以通过该配置来保留外部的 Checkpoint
    状态信息,这些被保留的状态信息在作业手动取消时不会被清除,这样就可以使用该状态信息来恢复 Flink
    任务,对于需要从状态恢复的任务强烈建议配置为外部 Checkpoint 状态信息不清理。可选择的配置项为:

  • ExternalizedCheckpointCleanup.RETAINONCANCELLATION:当作业手动取消时,保留作业的Checkpoint 状态信息。注意,这种情况下,需要手动清除该作业保留的 Checkpoint状态信息,否则这些状态信息将永远保留在外部的持久化存储中。

  • ExternalizedCheckpointCleanup.DELETEONCANCELLATION:当作业取消时,Checkpoint状态信息会被删除。仅当作业失败时,作业的 Checkpoint 才会被保留用于任务恢复。

  • 任务失败,当有较新的 Savepoint 时,作业是否回退到 Checkpoint 进行恢复:默认情况下,当 Savepoint 比Checkpoint 较新时,任务会从 Savepoint 处恢复。

  • 作业可以容忍 Checkpoint 失败的次数:默认值为 0,表示不能接受 Checkpoint 失败。

Checkpoint的流程:

  • JobManager 端的 CheckPointCoordinator 会定期向所有 SourceTask 发送 CheckPointTrigger,Source Task 会在数据流中安插 Checkpoint barrier
  • 当 task 收到上游所有实例的 barrier 后,向自己的下游继续传递 barrier,然后自身同步进行快照,并将自己的状态异步写入到持久化存储中(关于barrier会在后续flink进阶系列中再详细讲)
  • 当 task 将状态信息完成备份后,会将备份数据的地址(state handle)通知给 JobManager 的CheckPointCoordinator,如果 Checkpoint 的持续时长超过了 Checkpoint 设定的超时时间CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator 就会认为本次 Checkpoint 失败,会把这次 Checkpoint 产生的所有状态数据全部删除
  • 如果 CheckPointCoordinator 收集完所有算子的 State Handle,CheckPointCoordinator 会把整个 StateHandle 封装成 completed Checkpoint Meta,写入到外部存储中,Checkpoint 结束

状态如何从 Checkpoint 恢复:
如果是yarn提交任务,在常规任务基础上加上-s : hdfs://xxx/xxx/xxx/chk-10
chk-10就表示checkpoint做到第10次了,下次从此处恢复任务。

那么如何知道任务最后一次的checkpoint地址呢,可以通过调用api的方式:
http://node107.bigdata.dmp.local.com:35524/jobs/a1c70b36d19b3a9fc2713ba98cfc4a4f/metrics?get=lastCheckpointExternalPath
地址中端口号往前是Flink UI界面的地址,后面换成自己的jobId就行了,得到的结果如下:

[
  {
    "id": "lastCheckpointExternalPath",
    "value": "hdfs:/user/flink/checkpoints/a1c70b36d19b3a9fc2713ba98cfc4a4f/chk-18"
  }
]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/510215
推荐阅读
相关标签
  

闽ICP备14008679号