赞
踩
flink的checkpoint是异步的、分布式的、轻量级的,将同一时间点的task/operator的状态数据全局统一快照处理,包括用户自定义的keyed state和operator state 当未来程序出现问题,可以基于保存的快照容错。
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.flink.streaming.api.scala._ /** * checkPoint的测试验证 * 消费kafka的数据。来测试checkpoint */ object CheckPointTest2 { def main(args: Array[String]): Unit = { val prop = new Properties() prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092,node3:9092") prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC,classOf[StringDeserializer].getName) prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC,classOf[StringDeserializer].getName) prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"FlinkKafkaConsumer") val env = StreamExecutionEnvironment.getExecutionEnvironment //todo checkpoint默认是关闭的,需要手动开启,间隔一秒钟往数据源中加入一个栅栏(barrier),根据实际情况自行调整时间大小 env.enableCheckpointing(1000) //todo 设置一个状态后端其实就是数据的保存位置,将checkpoint的数据保存到HDFS中。 env.setStateBackend(new FsStateBackend("hdfs://node2:8020/flink/checkpoint",true)) //todo exactly-ance和at-least-once语义选择 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //todo Checkpoint超时时间设置 ,一但checkpoint超过该时间,flink就会自动中断,并按照超时处理。默认是10分钟 env.getCheckpointConfig.setCheckpointTimeout(5 * 60 * 1000) //todo 两次checkpoint之间的时间间隔,防止太过密集的checkpoint降低flink的性能 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(6000) //todo 设置最大并行度执行checkpoint的数量,默认是1,并发执行可以提高checkpoint的效率 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //todo 任务取消之后,是否自动删除删除checkpoint中保存的数据 //todo RETAIN_ON_CANCELLATION: flink的程序被cancle之后,会保存checkpoint数据; //todo DELETE_ON_CANCELLATION:表示 flink的程序被cancle之后会删除checkpoint数据,只有在job执行失败之后才会保存checkpoint数据 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //todo 设置checkpoint允许失败的次数,一但达到失败的次数,系统会自动关闭停止任务 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(1) val stream = env.addSource(new FlinkKafkaConsumer[String]("testtopic",new SimpleStringSchema(),prop)) val value = stream.flatMap(_.split(" ")).map(x => { // println((x,1)) (x, 1) }).keyBy(0).sum(1) value.print() env.execute() } }
E:运行一会之后停止程序
上图就是检查点的数据,根据flink的UI界面的jobid可以找到对应的检查点文件。
在flink重新在服务器启动时,需要基于checkpoint重启
flink run -c com.cn.state.WordCountCheckpoint -d -s hdfs://node2:/flink/checkpoint/b78a5b9164b1479761014a164009ed57/chk-15/5e806d7b-7f84-49b0-968f-a77158c84941 ~/testcheckpoint.jar
重启之后会接着从上次检查点的位置开始计算。
flink savepoint 91708180bc440568f47ab0ec88087b43 hdfs://node2:/flink/checkpoint/
#如果在flink-conf.yaml中没有设置SavePoint的路径,可以在进行SavePoint的时候指定路径
#91708180bc440568f47ab0ec88087b43:就是jobid号
然后停止job
flink cancel 91708180bc440568f47ab0ec88087b43 # job id
flink run -c com.cn.state.WordCountCheckpoint -d -s hdfs://node2:/flink/checkpoint/savepoint-917081-0a251a5323b7 ~/testcheckpoint.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。