赞
踩
Checkpoint 在 Flink 中是一个非常重要的 Feature,Checkpoint 使 Flink 的状态具有良好的容错性,通过 Checkpoint 机制,Flink 可以对作业的状态和计算位置进行恢复。
Flink 的 Checkpoint 有以下先决条件:
Flink 中 Checkpoint 是默认关闭的,对于需要保障 At Least Once 和 Exactly Once 语义的任务,强烈建议开启 Checkpoint,对于丢一小部分数据不敏感的任务,可以不开启 Checkpoint,例如:一些推荐相关的任务丢一小部分数据并不会影响推荐效果。下面来介绍 Checkpoint 具体如何使用。
首先调用 StreamExecutionEnvironment 的方法 enableCheckpointing(n) 来开启 Checkpoint,参数 n 以毫秒为单位表示 Checkpoint 的时间间隔。Checkpoint 配置相关的 Java 代码如下所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint env.enableCheckpointing(1000); // Checkpoint 语义设置为 EXACTLY_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // CheckPoint 的超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间,只允许 有 1 个 Checkpoint 在发生 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 两次 Checkpoint 之间的最小时间间隔为 500 毫秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持) env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); // Checkpoint 失败后,整个 Flink 任务也会失败(flink 1.9 之前) env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(true)
以上 Checkpoint 相关的参数描述如下所示:
Checkpoint 语义:EXACTLYONCE 或 ATLEASTONCE,EXACTLYONCE
表示所有要消费的数据被恰好处理一次,即所有数据既不丢数据也不重复消费;ATLEASTONCE
表示要消费的数据至少处理一次,可能会重复消费。
Checkpoint 超时时间:如果 Checkpoint 时间超过了设定的超时时间,则 Checkpoint 将会被终止。
同时进行的 Checkpoint 数量:默认情况下,当一个 Checkpoint 在进行时,JobManager 将不会触发下一个
Checkpoint,但 Flink 允许多个 Checkpoint 同时在发生。
两次 Checkpoint 之间的最小时间间隔:从上一次 Checkpoint 结束到下一次 Checkpoint开始,中间的间隔时间。例如,env.enableCheckpointing(60000)
表示 1 分钟触发一次
Checkpoint,同时再设置两次 Checkpoint 之间的最小时间间隔为 30 秒,假如任务运行过程中一次 Checkpoint就用了50s,那么等 Checkpoint 结束后,理论来讲再过 10s 就要开始下一次 Checkpoint了,但是由于设置了最小时间间隔为30s,所以需要再过 30s 后,下次 Checkpoint才开始。注:如果配置了该参数就决定了同时进行的 Checkpoint 数量只能为 1。
当任务被取消时,外部 Checkpoint 信息是否被清理:Checkpoint 在默认的情况下仅用于恢复运行失败的 Flink
任务,当任务手动取消时 Checkpoint 产生的状态信息并不保留。当然可以通过该配置来保留外部的 Checkpoint
状态信息,这些被保留的状态信息在作业手动取消时不会被清除,这样就可以使用该状态信息来恢复 Flink
任务,对于需要从状态恢复的任务强烈建议配置为外部 Checkpoint 状态信息不清理。可选择的配置项为:
ExternalizedCheckpointCleanup.RETAINONCANCELLATION:当作业手动取消时,保留作业的Checkpoint 状态信息。注意,这种情况下,需要手动清除该作业保留的 Checkpoint状态信息,否则这些状态信息将永远保留在外部的持久化存储中。
ExternalizedCheckpointCleanup.DELETEONCANCELLATION:当作业取消时,Checkpoint状态信息会被删除。仅当作业失败时,作业的 Checkpoint 才会被保留用于任务恢复。
任务失败,当有较新的 Savepoint 时,作业是否回退到 Checkpoint 进行恢复:默认情况下,当 Savepoint 比Checkpoint 较新时,任务会从 Savepoint 处恢复。
作业可以容忍 Checkpoint 失败的次数:默认值为 0,表示不能接受 Checkpoint 失败。
Checkpoint的流程:
状态如何从 Checkpoint 恢复:
如果是yarn提交任务,在常规任务基础上加上-s : hdfs://xxx/xxx/xxx/chk-10
chk-10就表示checkpoint做到第10次了,下次从此处恢复任务。
那么如何知道任务最后一次的checkpoint地址呢,可以通过调用api的方式:
http://node107.bigdata.dmp.local.com:35524/jobs/a1c70b36d19b3a9fc2713ba98cfc4a4f/metrics?get=lastCheckpointExternalPath
地址中端口号往前是Flink UI界面的地址,后面换成自己的jobId就行了,得到的结果如下:
[
{
"id": "lastCheckpointExternalPath",
"value": "hdfs:/user/flink/checkpoints/a1c70b36d19b3a9fc2713ba98cfc4a4f/chk-18"
}
]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。