赞
踩
在 Flink 中,State Backend 有两个功能:
checkpoint和state的区别和联系:
区别:
联系:
通过checkpoint的容错机制,定期的将state的中间结果数据快照(如果存在状态数据) 和 应用程序状态元数据快照保存到分布式文件系统。两者相辅相成。
注意:
当前测试的版本基于Flink1.12,在此版本版本中,以上两个功能是混在一起的,即把状态存储和检查点的创建概念混在一起。Flink 1.13 中将状态和检查点两者区分开来,具体可参考。
在新版本checkpoint和statebackend是两个相对独立的功能,可以只开启其中一个功能。如果只开启了checkpoint,而没开启statebackend,则只上传元数据到分布式文件系统,不上传state数据;如果只开启了statebackend,而没开启checkpoint,只在本地存储state数据,不上传到分布式文件系统,也就不能利用checkpoint的自身的容错机制。
flink提供三种开箱即用的State Backend:
MemoryStateBackend将状态(state)数据保存在taskmanger的java堆内存中,通过checkpoint机制,MemoryStateBackend将状态(state)进行快照并保存Jobmanager(master)的堆内存中。默认使用异步快照(asynchronous snapshots)避免阻塞管道
FsStateBackend将状态(state)数据保存在taskmanger的java堆内存中,通过checkpoint机制,将状态快照写入配置好的文件系统中(本地或HDFS)。
另外FsStateBackend通过配置一个fileStateThreshold阈值,小于该值时state存储到metadata中而非文件中。默认使用异步快照(asynchronous snapshots)避免阻塞管道
RocksDBStateBackend将状态(state)数据保存在RocksDB数据库(内存+磁盘)。通过checkpoint机制, 整个RocksDB数据库被复制到配置的文件系统中(本地或HDFS)。
另外RocksDBStateBackend可以通过构造函数enableIncrementalCheckpointing参数配置是否进行增量Checkpoint(而MemoryStateBackend 和 FsStateBackend不能)。并且跟FsStateBackend 不同的是,RocksDBStateBackend仅支持异步快照(asynchronous snapshots)
增量式的检查点可以为拥有大量状态的程序带来很大的提升。在早期的测试中,一个拥有TB级别“状态”程序将生成检查点的耗时从3分钟以上降低 到了30秒左右。因为增量式的检查点不需要每次把完整的状态发送到存储中。另外通过实验还发现开启了增量式checkpoint,状态后端总的数据量也会大大减少。
目前只能通过RocksDBStateBackend来开启增量式检查点的功能,Flink使用RocksDB内置的备份机制来合并检查点数据。这样, Flink 增量式检查点的数据不会无限制的增大,它会自动合并老的检查点数据并清理掉。
开启方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//第二个参数为true
env.setStateBackend(new RocksDBStateBackend(filebackend, true));
详细可参考
Flink Checkpoint 是一种容错恢复机制。这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。Flink Checkpoint 是 Flink 自身的系统行为,用户无法对其进行交互。
Flink Savepoint 你可以把它当做在某个时间点程序状态全局镜像,以后程序在进行升级,或者修改并发度等情况,还能从保存的状态位继续启动恢复。Flink Savepoint 一般存储在 HDFS 上面,它需要用户主动进行触发。
Flink Checkpoint和Savepoint对比:
详细可参考
./flink savepoint [OPTIONS] <Job ID> <target directory>
./flink savepoint fdd56f92164a075926d864b4c61aee46 hdfs://11.51.197.0:8020/data/cp
生成结果:
两种的恢复方式是一样的,多了一个【-s savepointPath】其他参数和正常启动时的参数一样
flink run -s savepointPath [runArgs]
例如:
#yarnPer(yarn-cluster) 运行方式
#从savepoint恢复:
flink run -m yarn-cluster -ynm kafka-source \
-s hdfs://11.51.197.0:8020/data/cp/savepoint-1b9a9c-82531fccb6f0 \
-c com.pony.kafka.StreamingKafkaSourceRocksDBState \
./flink-demo-1.0-SNAPSHOT.jar
#从checkpoint恢复:
flink run -m yarn-cluster -ynm kafka-source \
-s hdfs://11.51.197.0:8020/data/ck/rocksdb/1b9a9cc164568e2e844a377823fb4bd0/chk-43 \
-c com.pony.kafka.StreamingKafkaSourceRocksDBState \
./flink-demo-1.0-SNAPSHOT.jar
Flink1.12.7下载、flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
解压并将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放置于flink的lib目录,已提供与hdfs交互需要的依赖
tar -xf flink-1.12.7-bin-scala_2.11.tgz
cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar flink-1.12.7/lib
Local Cluster模式是开箱即用的,简单配置一下flink-conf.yaml(也可以不配置),即可启动。
#进入bin目录运行启动脚本
./start-cluster.sh
打开浏览器输入http://IP:8081,查看WEBUI监控界面
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.12.2</flink.version> <hadoop.version>2.7.3</hadoop.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> <version>2.7.5-9.0</version> <scope>provided</scope> </dependency> <!--为了能在本地IDE启动看到flinkwebUI--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> </dependencies>
package com.pony.kafka; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper; import java.util.Properties; public class StreamingWithKafkaSinkExample { public static void main(String[] args) throws Exception { //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);//本地运行 //checkpoint配置 env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); StringBuffer standard = new StringBuffer(); while (standard.length() < 1024) { standard.append("0"); } DataStreamSource<String> text = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> sourceContext) throws Exception { int cnt = 0; while (cnt < 1024 * 1024) {//生成1G的数据 sourceContext.collect(standard.substring((cnt + "").length()) + cnt); cnt++; } } @Override public void cancel() { } }); String brokerList = "11.51.196.255:9092"; String topic = "t1"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", brokerList); //设置FlinkKafkaProducer里面的事务超时时间不大于kafka集群的最大事务超时时间(<15min) prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + ""); //使用仅一次语义的kafkaProducer FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); text.addSink(myProducer); env.execute("StreamingWithKafkaSinkExample"); } }
package com.pony.kafka; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; public class KeyedStateFunction extends RichFlatMapFunction<Tuple2<String,Long>, String> { private transient ValueState<String> orderItemsState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(30)) // TTL 时间 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // state 时间戳更新策略 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 当 state 过期时处理策略 .cleanupInRocksdbCompactFilter(60000L) // 过期对象清理策略,每读取若干条记录就执行一次清理操作 .build(); ValueStateDescriptor<String> orderItemsStateConfig = new ValueStateDescriptor<>("state_demo", Types.STRING); orderItemsStateConfig.enableTimeToLive(ttlConfig); orderItemsState = getRuntimeContext().getState(orderItemsStateConfig); } @Override public void flatMap(Tuple2<String,Long> value, Collector<String> collector) throws Exception { orderItemsState.update(value.f0); collector.collect(value.f0); } }
package com.pony.kafka; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; /** * StreamingKafkaSourceByMemState */ public class StreamingKafkaSourceByMemState { public static void main(String[] args) throws Exception { //获取Flink的运行环境 Configuration conf = new Configuration(); // conf.setInteger(RestOptions.PORT, 8050); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);//发布到flink集群(包括local/standalone/yarn等等) // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);//本地运行 //checkpoint配置 env.enableCheckpointing(1000 * 30); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 60); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend // MemoryStateBackend stateBackend = new MemoryStateBackend("file:///D:\\idea_workspace\\flink-demo01\\cp","file:///D:\\idea_workspace\\flink-demo01\\sp"); // MemoryStateBackend stateBackend = new MemoryStateBackend("hdfs://11.51.197.0:8020/data/ck/mem","hdfs://11.51.197.0:8020/data/sp"); MemoryStateBackend stateBackend = new MemoryStateBackend();//默认构造参数将状态(state)进行快照并保存Jobmanager(master)的堆内存中 System.out.println("是否是异步ck:"+stateBackend.isUsingAsynchronousSnapshots());//默认为异步快照 env.setStateBackend(stateBackend); String topic = "t1"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "11.51.196.255:9092"); prop.setProperty("group.id", "group_"+Math.random()); // prop.setProperty("auto.offset.reset", "earliest"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默认消费策略 DataStreamSource<String> text = env.addSource(myConsumer); text .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String input) throws Exception { // System.out.println("接收到数据:" + input); return new Tuple2<>(input, 1L); } }) .keyBy(0) .flatMap(new KeyedStateFunction()) .addSink(new SinkFunction<String>() { @Override public void invoke(String value, Context context) throws Exception { SinkFunction.super.invoke(value, context); } }) .setParallelism(1); env.execute("StreamingFromCollection"); } }
package com.pony.kafka; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; /** * StreamingKafkaSourceByFsState */ public class StreamingKafkaSourceByFsState { public static void main(String[] args) throws Exception { //获取Flink的运行环境 Configuration conf = new Configuration(); // conf.setInteger(RestOptions.PORT, 8050); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); //checkpoint配置 env.enableCheckpointing(1000 * 30); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 60); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend //The threshold for file state size must be in [-1, 1048576],default:1K FsStateBackend stateBackend = new FsStateBackend(new Path("hdfs://11.51.197.0:8020/data/ck/fs").toUri(), 1024 * 1024); // FsStateBackend stateBackend = new FsStateBackend("file:///D:\\idea_workspace\\flink-demo01\\cp"); //本地文件系统 System.out.println("是否是异步ck:" + stateBackend.isUsingAsynchronousSnapshots()); env.setStateBackend(stateBackend); String topic = "t1"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "11.51.196.255:9092"); prop.setProperty("group.id", "group_" + Math.random()); // prop.setProperty("auto.offset.reset", "earliest"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默认消费策略 DataStreamSource<String> text = env.addSource(myConsumer); text .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String input) throws Exception { System.out.println("接收到数据:" + input); return new Tuple2<>(input, 1L); } }) .keyBy(0) .flatMap(new KeyedStateFunction()) .print() .setParallelism(1); env.execute("StreamingFromCollection"); } }
package com.pony.kafka; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; /** * StreamingKafkaSourceRocksDBState */ public class StreamingKafkaSourceRocksDBState { public static void main(String[] args) throws Exception { //获取Flink的运行环境 Configuration conf = new Configuration(); // conf.setInteger(RestOptions.PORT, 8050); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); //checkpoint配置 env.enableCheckpointing(1000 * 30); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 60); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend RocksDBStateBackend stateBackend = new RocksDBStateBackend("hdfs://11.51.197.0:8020/data/ck/rocksdb", true); // RocksDBStateBackend stateBackend = new RocksDBStateBackend("hdfs://11.51.197.0:8020/data/ck/rocksdb", false);//全量checkpoint System.out.println("是否是增量ck:" + stateBackend.isIncrementalCheckpointsEnabled()); stateBackend.setDbStoragePath("/data/rocksdb/rocks_db/db01"); env.setStateBackend(stateBackend); String topic = "t1"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "11.51.196.255:9092"); prop.setProperty("group.id", "group_" + Math.random()); // prop.setProperty("auto.offset.reset", "earliest"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默认消费策略 DataStreamSource<String> text = env.addSource(myConsumer); text .map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String input) throws Exception { System.out.println("接收到数据:" + input); return new Tuple2<>(input, 1L); } }) .keyBy(0) .flatMap(new KeyedStateFunction()) .print() .setParallelism(1); env.execute("StreamingFromCollection"); } }
测试数据为1G的数据量
MemoryStateBackend在数据量小的情况下的目录储存情况(指定hdfs存储ck,不指定默认为jobmanager),如下:
MemoryStateBackend在数据量大的情况下,会引起taskmanager:java.lang.OutOfMemoryError: Java heap space
FsStateBackend在数据量小的情况下的目录储存情况,如下:
FsStateBackend在数据量大的情况下,同样也会引起taskmanager:java.lang.OutOfMemoryError: Java heap space
RocksDBStateBackend在数据量大的情况下不会引起taskmanager OOM
如前面介绍只有RocksDBStateBackend支持增量Checkpoint,因此只在RocksDBStateBackend基础上做全量与增量的分析。
数据量分析:
增量CK过程:
增量CK存储结果:
增量savepoint过程:
增量savepoint存储结果:
数据量分析:
全量CK过程:
全量CK存储结果:
全量savepoint过程:
全量savepoint存储结果:
增量checkpoint相比于全量checkpoint,数据在每次做checkpoint时的数据量会小很多,并且增量式checkpoint,状态后端总的数据量也大大减少。但无论增量还是全量在形成savepoint时,生成的状态(state)数据的数据量大小是相同的。
在Savepoint增量checkpoint的基础知识进行以下测试分析
指定要恢复的checkpoint的目录
承接上一次的checkpoint(chk-43),在上一次chk序列的基础上累加,如下:
从Checkpoint恢复后Checkpoint的结果:
指定savepoint的目录
承接最后一次有状态变化的checkpoint(注意与从checkpoint恢复的区别)
从savepoint恢复后Checkpoint的结果:
在不考虑代码变更的情况下等因素的情况下,从savepoint/checkpoint都能恢复状态数据,并且用法相似。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。