赞
踩
前言:今天是学习 flink 的第 11 天啦!学习了 flink 四大基石之 Checkpoint (检查点),主要是解决大数据领域持久化中间结果数据,以及取消任务,下次启动人可以恢复累加数据问题,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!
Tips:检查点,检查的是历史记录!过去的时间值得怀念,未来的道路运气会继续累加,能力会继续提升,明天也要继续努力!
1- 将状态存储在 [本地文件/ HDFS]
2- 适用场景:
3- 弊端:
1- RockDB 是一种嵌入式的本地数据库,默认是配置成异步快照(不需要等待所有信号结束才开始状态拷贝)
2- 适用场景:
3- 也需要配置一个文件系统:本地 / HDFS
4- RocksDBStateBackend(flink1.13) 唯一支持增量 checkpoint 的后端
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.7.0</version>
</dependency>
flink-conf.yaml
从 Flink 1.13 版本开始,社区重新设计了其公共状态后端类,用户可以迁移现有应用程序以使用新 API,而不会丢失任何状态或一致性。
以前的 MemoryStateBackend 相当于使用 HashMapStateBackend
和 JobManagerCheckpointStorage
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 因为使用新的类:HashMapStateBackend
env.setStateBackend(new HashMapStateBackend());
// 因为 JobManagerCheckpointStorage 即:checkpoint 与 jobmanager 有关
env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());
以前的 FsStateBackend 相当于使用 HashMapStateBackend
和 FileSystemCheckpointStorage
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 因为使用新的类:HashMapStateBackend
env.setStateBackend(new HashMapStateBackend());
// 因为 JobManagerCheckpointStorage 即:checkpoint 与 jobmanager 有关
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// 下面的设置更高级一点,可以传入参数,建议!
// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
以前的 RocksDBStateBackend相当于使用 EmbeddedRocksDBStateBackend
和 FileSystemCheckpointStorage
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 因为使用新的类:EmbeddedRocksDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
例子:socket 数据源,词频统计,开启 checkpoint,每隔 5s 写入 HDFS
package cn.itcast.day10.checkpoint;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 读取服务器node01中端口9999的内容,并切分单词,统计数量
* 要求: 开启checkpoint支持,每隔5s钟写入HDFS一次
*/
public class StreamCheckpointDemo {
public static void main(String[] args) throws Exception {
//todo 1)获取flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//todo 2)开启checkpoint
//每隔5s周期性的生成barrier(栅栏),默认情况下没有开启checkpoint
env.enableCheckpointing(5000L);
//设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(2000L);
//同一个时间只能有一个栅栏在运行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置checkpoint的执行模式。仅执行一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);
//env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
//指定checkpoint的存储位置
if(args.length< 1){
//env.setStateBackend(new FsStateBackend("file:///D:\\checkpoint"));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");
}else{
//env.setStateBackend(new FsStateBackend(args[0]));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(args[0]);
}
// 设置任务失败时候,能够外部持久化检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//todo 2)接入数据源,读取文件获取数据
DataStreamSource<String> lines = env.socketTextStream("node01", 7777);
//todo 3)数据处理
// 3.1:使用flatMap对单词进行拆分
SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
//返回数据
for (String word : words) {
out.collect(word);
}
}
});
// 3.2:对拆分后的单词进行记一次数
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
// 3.4:对分组后的key进行聚合操作
SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = grouped.sum(1);
//todo 4)构建sink,输出结果
sumed.print();
//todo 5)启动运行
env.execute();
}
}
结果:
将程序打包成jar包放在flink的提交页面
# 启动 jobmanager 和 taskmanager
bin/start-cluster.sh
输入地址参数,最终发现:
1. 手动取消作业,检查点文件没有消失
2. 输入hdfs地址参数,检查点文件生成在hdfs目录上
总结:
flink-conf.yaml 配置文件的 restart-strategy 配置参数决定重启策略。
重启策略 | 重启策略值 | 说明 |
---|---|---|
Fixed delay | fixed-delay | 固定延迟重启策略 |
Failure rate | failure-rate | 失败率重启策略 |
No restart | None | 无重启策略 |
配置参数 | 描述 | 默认值 |
---|---|---|
restart-strategy.fixed-delay.attempts | 在 Job 最终失败前,Flink 尝试执行的次数 | 如果启用 checkpoint 的话是Integer.MAX_VALUE |
restart-strategy.fixed-delay.delay | 延迟重启指一个执行失败后,不立即重启,等待一段时间。 | akka.ask.timeout,如果启用checkpoint的话是1s |
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
// 1. 初始化流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 延时
))
配置参数 | 描述 | 默认值 |
---|---|---|
restart-strategy.failure-rate.max-failures-per-interval | 在一个Job认定为失败之前,最大的重启次数 | 1 |
restart-strategy.fixed-delay.delay | 计算失败率的时间间隔 | 1分钟 |
restart-strategy.failure-rate.delay | 两次连续重启尝试之间的时间间隔 | akka.ask.timeout |
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
意思就表明为:
失败重启之间的间隔是10秒
如果5分钟内,失败3次,就不会在重启了,直接结束任务
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个测量时间间隔最大失败次数
Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔
Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔
))
restart-strategy: none
// 直接失败,不会重启
env.setRestartStrategy(RestartStrategies.noRestart());
例子:基于之前的单词统计案例改造,当遇到"laowang"字符串的时候,程序抛出异常,出现3次异常后,程序退出.
/**
* 演示flink的重启策略
* flink的重启策略是,在配置了checkpoint的前提下,不停的重启的重启,如果不配置checkpoint不能使用重启策略,作业直接停止
* flink有三种重启策略的方式:
* 固定延迟重启策略:
* 设置失败重启的次数,以及两次重启的时间间隔,如:设置重启失败次数是3次,每次间隔5秒钟,那么输入三次异常以后,尝试重启三次,第四次依然失败,则作业停止
* 失败率重启策略:
* 给定一定时间,如果这个时间内设置了n次失败重启,一旦超过了N次,则作业停止,如:3分钟失败五次,每次时间间隔10秒,则任务结束
* 无重启策略:
* 表示运行失败以后,立刻停止作业运行
*/
public class FixedDelayRestartStrategyDemo {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)初始化flink的流处理的运行环境
* 2)开启checkpint
* 3)配置重启策略
* 4)接入数据源
* 5)对字符串进行空格拆分,每个单词记一次数
* 6)分组聚合
* 7)打印测试
* 8)运行作业
*/
//TODO 1)初始化flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2)开启checkpoint
//周期性的生成barrier(栅栏),默认情况下checkpoint是没有开启的
env.enableCheckpointing(5000L);
//设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(6000L);
//设置同一个时间只能有一个栅栏在运行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置checkpoint的执行模式,最多执行一次或者至少执行一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//Checkpointing最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//指定checkpoint的存储位置
if(args.length < 1) {
env.setStateBackend(new FsStateBackend("file:///E:\\checkpoint"));
}else{
env.setStateBackend(new FsStateBackend(args[0]));
}
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//TODO 3)配置重启策略
//固定延迟重启策略,程序出现异常的时候,重启三次,每次延迟五秒钟重启,超过三次,则程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(5)));
//TODO 4)接入数据源
DataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999);
//TODO 5)对字符串进行空格拆分,每个单词记一次数
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = socketTextStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> collector) throws Exception {
if(line.startsWith("laowang")){
System.out.println(line);
int i = 1/0;
System.out.println(i);
throw new RuntimeException("老王驾到,程序挂了!");
}
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1L));
}
}
});
//TODO 6)分组聚合
SingleOutputStreamOperator<Tuple2<String, Long>> sumed = wordAndOne.keyBy(0).sum(1);
//TODO 7)打印测试
sumed.print();
//TODO 8)运行作业
env.execute();
}
}
总结:
savepoint的目的是为了从上一次保存的中间结果中恢复过来,比如:在生产环境中运行着一个作业,因为今晚要升级作业,因此需要将生产环境的作业停止掉,将升级后的jar进行部署和发布,希望重新发布以后可以将上一个作业的运行结果恢复后继续运行
checkpoint和savepoint的区别?
例子:代码和之前一样
/**
* savepoint的目的是为了从上一次保存的中间结果中恢复过来
* 举例:
* 在生产环境中运行着一个作业,因为今晚要升级作业,因此需要将生产环境的作业停止掉,将升级后的jar进行部署和发布
* 希望重新发布以后可以将上一个作业的运行结果恢复后继续运行
*
* 所以这时候可以使用savepoint进行解决这个问题问题
*
* 面试问题:
* checkpoint和savepoint的区别?
* checkpoint:周期性定期运行,生成barrier(栅栏)发送到job作业的每个算子,当算子收到barrier以后会将state的中间计算结果快照存储到分布式文件系统中
* savepoint:将指定的checkpoint的结果恢复过来,恢复到当前的作业中,继续运行
*
* TODO 当作业重新递交的时候,并行度发生了概念,在flink1.10版本中,可以正常的递交作业,且能够恢复历史的累加结果
* 但是之前版本中一旦发生并行度的变化,作业无法递交
*/
public class SavepointDemo {
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)初始化flink流处理的运行环境
* 2)开启checkpoint
* 3)指定数据源
* 4)对字符串进行空格拆分,然后每个单词记一次数
* 5)对每个单词进行分组聚合操作
* 6) 打印测试
* 7)执行任务,递交作业
*/
//TODO 1)初始化flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2)开启checkpoint
//周期性的生成barrier(栅栏),默认情况下checkpoint是没有开启的
env.enableCheckpointing(5000L);
//设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(6000L);
//设置同一个时间只能有一个栅栏在运行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置checkpoint的执行模式,最多执行一次或者至少执行一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//Checkpointing最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//指定checkpoint的存储位置
if(args.length < 1) {
env.setStateBackend(new FsStateBackend("file:///E:\\checkpoint"));
}else{
env.setStateBackend(new FsStateBackend(args[0]));
}
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//TODO 3)指定数据源
DataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999);
//TODO 4)对字符串进行空格拆分,然后每个单词记一次数
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = socketTextStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Long>> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(Tuple2.of(word, 1L));
}
}
});
//TODO 5)对每个单词进行分组聚合操作
SingleOutputStreamOperator<Tuple2<String, Long>> sumed = wordAndOne.keyBy(0).sum(1);
//TODO 6) 打印测试
sumed.print();
//TODO 7)执行任务,递交作业
env.execute();
}
}
结果:
(hadoop,4)
取消程序后,再次运行,
结果累加上一次取消任务的结果:
(hadoop,5)
总结:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。