当前位置:   article > 正文

Flink的Checkpoint和savepoint的区别和作用_flink 标准格式 savepoint 是同步还是异步

flink 标准格式 savepoint 是同步还是异步

Flink的Checkpoint和savepoint的区别和作用

一、Flink的checkpoint

flink的checkpoint是异步的、分布式的、轻量级的,将同一时间点的task/operator的状态数据全局统一快照处理,包括用户自定义的keyed state和operator state 当未来程序出现问题,可以基于保存的快照容错。

  1. checkpoint的原理
    A:flink会在输入的数据集中,间隔的生成checkpoint barrier,通过barrier间隔时间段内的数据划分到相应的checkpoint中,程序出现异常时,operator就能够从上次的所有快照中恢复所有的算子状态,从而保证数据据的一致性。例如:从kafka中消费数据时,kafkaConsumer中维护offset状态,当系统出现问题时,无法从kafka中消费数据,可以将offset记录在状态中,当任务恢复之后就会从指定的偏移量中消费。
    B:基于checkpoint的异常处理
    如果数据在计算的过程中刚发生了错误,会在hdfs上读取上一次checkpoint的结果,并且基于读取的结果将算子的状态重置;source在重新读取数据时,如果是barrier中间的某个数据导致的,那么造成部分数据的重复计算。
    C特别提示1:barrier是按照批来checkpoint的,虽然效率提高了,但是会导致数据的重复计算,不能严格个保证Excatly-Once,可以使用幂等或者将checkpoint和事务绑定可以解决该问题。
    D特别提示2 :每条数据经过DataStream处理完毕,会将状态暂时保存到对应的线程的缓存中,当所有的DataStream处理完毕后,会将状态信息刷到storage中。这就是checkpoint。
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.streaming.api.scala._
/**
  * checkPoint的测试验证
  * 消费kafka的数据。来测试checkpoint
  */
object CheckPointTest2 {
  def main(args: Array[String]): Unit = {
    val prop = new Properties()
    prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092")
    prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC,classOf[StringDeserializer].getName)
    prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC,classOf[StringDeserializer].getName)
    prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"FlinkKafkaConsumer")
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //todo checkpoint默认是关闭的,需要手动开启,间隔一秒钟往数据源中加入一个栅栏(barrier),根据实际情况自行调整时间大小
    env.enableCheckpointing(1000)
    //todo 设置一个状态后端其实就是数据的保存位置,将checkpoint的数据保存到HDFS中。
    env.setStateBackend(new FsStateBackend("hdfs://node2:8020/flink/checkpoint",true))
    //todo exactly-ance和at-least-once语义选择
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //todo Checkpoint超时时间设置 ,一但checkpoint超过该时间,flink就会自动中断,并按照超时处理。默认是10分钟
    env.getCheckpointConfig.setCheckpointTimeout(5 * 60 * 1000)
    //todo 两次checkpoint之间的时间间隔,防止太过密集的checkpoint降低flink的性能
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(6000)
    //todo 设置最大并行度执行checkpoint的数量,默认是1,并发执行可以提高checkpoint的效率
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    //todo 任务取消之后,是否自动删除删除checkpoint中保存的数据
    //todo RETAIN_ON_CANCELLATION: flink的程序被cancle之后,会保存checkpoint数据;
    //todo DELETE_ON_CANCELLATION:表示 flink的程序被cancle之后会删除checkpoint数据,只有在job执行失败之后才会保存checkpoint数据
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //todo 设置checkpoint允许失败的次数,一但达到失败的次数,系统会自动关闭停止任务
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)

    val stream = env.addSource(new FlinkKafkaConsumer[String]("testtopic",new SimpleStringSchema(),prop))
    val value = stream.flatMap(_.split(" ")).map(x => {
     // println((x,1))
      (x, 1)
    }).keyBy(0).sum(1)
    value.print()
    env.execute()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

E运行一会之后停止程序
在这里插入图片描述
在这里插入图片描述
上图就是检查点的数据,根据flink的UI界面的jobid可以找到对应的检查点文件。
在flink重新在服务器启动时,需要基于checkpoint重启

flink run -c com.cn.state.WordCountCheckpoint -d -s hdfs://node2:/flink/checkpoint/b78a5b9164b1479761014a164009ed57/chk-15/5e806d7b-7f84-49b0-968f-a77158c84941 ~/testcheckpoint.jar
  • 1

重启之后会接着从上次检查点的位置开始计算

二、Flink的savepoint

  1. savepoint是checkpoint的一种特殊实现,底层其实也是使用的是checkpoint的机制,savepoint的用户一般使用手动命令的方式来触发checkpoint,并且将结果持久化到指定的存储路径中(hdfs路径),其重要的目的就是帮助用户在升级或者维护集群过程中保存系统中的状态数据,避免防止运维的一些正常操作使作业无法恢复到原有的计算状态,从而无法实现端到端的Excatly-Once的语义保证。
  2. SavePoint的路径需要在flink-conf.yaml中配置。
    state.savepoints.dir: hdfs://node01:8020/flink/state/savepoint
  3. 系统大型升级运维才会用到。先手动执行savePoint
flink savepoint 91708180bc440568f47ab0ec88087b43 hdfs://node2:/flink/checkpoint/
#如果在flink-conf.yaml中没有设置SavePoint的路径,可以在进行SavePoint的时候指定路径
#91708180bc440568f47ab0ec88087b43:就是jobid号
  • 1
  • 2
  • 3

然后停止job

flink cancel 91708180bc440568f47ab0ec88087b43  # job id
  • 1
  1. 重启job,就可以基于上次的结果状态进行计算
flink run -c com.cn.state.WordCountCheckpoint -d -s hdfs://node2:/flink/checkpoint/savepoint-917081-0a251a5323b7 ~/testcheckpoint.jar
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/1010867
推荐阅读
相关标签
  

闽ICP备14008679号