当前位置:   article > 正文

Flink的statebackend状态后端实例分析

statebackend

概念

flink 的状态后端和检查点

在 Flink 中,State Backend 有两个功能:

  • 提供对state(状态)的访问、查询
  • 如果开启了 Checkpoint(检查点),会周期向远程(或者本地)的持久化存储上传state数据和返回元数据 (meta) 给JobManager。

checkpoint和state的区别和联系:
区别:

  • checkpoint是一种容错机制,定期生成全局的状态数据(state)和应用程序元数据(_metadata)快照到远程分布式文件系统
  • state是一种中间结果,是keyed state(keyBy)或 operator state(offset)的中间状态数据,存储在本地系统(内存或磁盘)

联系:
通过checkpoint的容错机制,定期的将state的中间结果数据快照(如果存在状态数据应用程序状态元数据快照保存到分布式文件系统。两者相辅相成。

注意:
当前测试的版本基于Flink1.12,在此版本版本中,以上两个功能是混在一起的,即把状态存储和检查点的创建概念混在一起。Flink 1.13 中将状态和检查点两者区分开来,具体可参考
在新版本checkpoint和statebackend是两个相对独立的功能,可以只开启其中一个功能。如果只开启了checkpoint,而没开启statebackend,则只上传元数据到分布式文件系统,不上传state数据;如果只开启了statebackend,而没开启checkpoint,只在本地存储state数据,不上传到分布式文件系统,也就不能利用checkpoint的自身的容错机制。

可用的State Backend

flink提供三种开箱即用的State Backend:

  1. MemoryStateBackend

MemoryStateBackend将状态(state)数据保存在taskmanger的java堆内存中,通过checkpoint机制,MemoryStateBackend将状态(state)进行快照并保存Jobmanager(master)的堆内存中。默认使用异步快照(asynchronous snapshots)避免阻塞管道

  1. FsStateBackend

FsStateBackend将状态(state)数据保存在taskmanger的java堆内存中,通过checkpoint机制,将状态快照写入配置好的文件系统中(本地或HDFS)。
另外FsStateBackend通过配置一个fileStateThreshold阈值,小于该值时state存储到metadata中而非文件中。默认使用异步快照(asynchronous snapshots)避免阻塞管道

  1. RocksDBStateBackend

RocksDBStateBackend将状态(state)数据保存在RocksDB数据库(内存+磁盘)。通过checkpoint机制, 整个RocksDB数据库被复制到配置的文件系统中(本地或HDFS)。
另外RocksDBStateBackend可以通过构造函数enableIncrementalCheckpointing参数配置是否进行增量Checkpoint(而MemoryStateBackend 和 FsStateBackend不能)。并且跟FsStateBackend 不同的是,RocksDBStateBackend仅支持异步快照(asynchronous snapshots)

详细可参考1参考2

增量式checkpoint

增量式的检查点可以为拥有大量状态的程序带来很大的提升。在早期的测试中,一个拥有TB级别“状态”程序将生成检查点的耗时从3分钟以上降低 到了30秒左右。因为增量式的检查点不需要每次把完整的状态发送到存储中。另外通过实验还发现开启了增量式checkpoint,状态后端总的数据量也会大大减少。
目前只能通过RocksDBStateBackend来开启增量式检查点的功能,Flink使用RocksDB内置的备份机制来合并检查点数据。这样, Flink 增量式检查点的数据不会无限制的增大,它会自动合并老的检查点数据并清理掉。
开启方式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
//第二个参数为true
env.setStateBackend(new RocksDBStateBackend(filebackend, true));
  • 1
  • 2
  • 3

详细可参考

Checkpoint和Savepoint

Checkpoint

Flink Checkpoint 是一种容错恢复机制。这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。Flink Checkpoint 是 Flink 自身的系统行为,用户无法对其进行交互。

Savepoint

Flink Savepoint 你可以把它当做在某个时间点程序状态全局镜像,以后程序在进行升级,或者修改并发度等情况,还能从保存的状态位继续启动恢复。Flink Savepoint 一般存储在 HDFS 上面,它需要用户主动进行触发。

Flink Checkpoint和Savepoint对比:

  • 概念:Checkpoint 是 自动容错机制 ,Savepoint 程序全局状态镜像 。
  • 目的: Checkpoint 是程序自动容错,快速恢复 。Savepoint是 程序修改后继续从状态恢复,程序升级等。
  • 用户交互:Checkpoint 是 Flink 系统行为 。Savepoint是用户触发。
  • 状态文件保留策略:Checkpoint默认程序删除,可以设置CheckpointConfig中的参数进行保留 。Savepoint会一直保存,除非用户删除 。

详细可参考

生成方式及恢复方式

生成方式
  • checkpoint生成方式:程序自动生成
  • savepoint生成方式:
./flink savepoint [OPTIONS] <Job ID>  <target directory>
./flink savepoint fdd56f92164a075926d864b4c61aee46  hdfs://11.51.197.0:8020/data/cp
  • 1
  • 2

生成结果:
在这里插入图片描述

恢复方式

两种的恢复方式是一样的,多了一个【-s savepointPath】其他参数和正常启动时的参数一样

flink run -s savepointPath [runArgs]

例如:

#yarnPer(yarn-cluster) 运行方式
#从savepoint恢复:
flink run -m yarn-cluster -ynm kafka-source \
-s hdfs://11.51.197.0:8020/data/cp/savepoint-1b9a9c-82531fccb6f0 \
-c com.pony.kafka.StreamingKafkaSourceRocksDBState \
./flink-demo-1.0-SNAPSHOT.jar

#从checkpoint恢复:
flink run -m yarn-cluster -ynm kafka-source \
-s hdfs://11.51.197.0:8020/data/ck/rocksdb/1b9a9cc164568e2e844a377823fb4bd0/chk-43 \
-c com.pony.kafka.StreamingKafkaSourceRocksDBState \
./flink-demo-1.0-SNAPSHOT.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Flink 部署

依赖下载

Flink1.12.7下载flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
解压并将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放置于flink的lib目录,已提供与hdfs交互需要的依赖

tar -xf flink-1.12.7-bin-scala_2.11.tgz 
cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar  flink-1.12.7/lib
  • 1
  • 2

local模式部署

Local Cluster模式是开箱即用的,简单配置一下flink-conf.yaml(也可以不配置),即可启动。

#进入bin目录运行启动脚本
./start-cluster.sh
  • 1
  • 2

打开浏览器输入http://IP:8081,查看WEBUI监控界面

Flink 状态后端测试demo

maven依赖

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.2</flink.version>
        <hadoop.version>2.7.3</hadoop.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-9.0</version>
            <scope>provided</scope>
        </dependency>

            <!--为了能在本地IDE启动看到flinkwebUI-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

测试数据生成类

package com.pony.kafka;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class StreamingWithKafkaSinkExample {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);//本地运行
        //checkpoint配置
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        StringBuffer standard = new StringBuffer();
        while (standard.length() < 1024) {
            standard.append("0");
        }
        DataStreamSource<String> text = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> sourceContext) throws Exception {
                int cnt = 0;
                while (cnt < 1024 * 1024) {//生成1G的数据
                    sourceContext.collect(standard.substring((cnt + "").length()) + cnt);
                    cnt++;
                }
            }

            @Override
            public void cancel() {

            }
        });

        String brokerList = "11.51.196.255:9092";
        String topic = "t1";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", brokerList);
        //设置FlinkKafkaProducer里面的事务超时时间不大于kafka集群的最大事务超时时间(<15min)
        prop.setProperty("transaction.timeout.ms", 1000 * 60 * 5 + "");

        //使用仅一次语义的kafkaProducer
        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        text.addSink(myProducer);
        
        env.execute("StreamingWithKafkaSinkExample");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

公共类

package com.pony.kafka;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class KeyedStateFunction extends RichFlatMapFunction<Tuple2<String,Long>, String> {

    private transient ValueState<String> orderItemsState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(30)) // TTL 时间
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // state 时间戳更新策略
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 当 state 过期时处理策略
                .cleanupInRocksdbCompactFilter(60000L) // 过期对象清理策略,每读取若干条记录就执行一次清理操作
                .build();

        ValueStateDescriptor<String> orderItemsStateConfig = new ValueStateDescriptor<>("state_demo", Types.STRING);
        orderItemsStateConfig.enableTimeToLive(ttlConfig);
        orderItemsState = getRuntimeContext().getState(orderItemsStateConfig);
    }

    @Override
    public void flatMap(Tuple2<String,Long> value, Collector<String> collector) throws Exception {
        orderItemsState.update(value.f0);
        collector.collect(value.f0);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

MemoryStateBackend demo

package com.pony.kafka;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * StreamingKafkaSourceByMemState
 */
public class StreamingKafkaSourceByMemState {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        Configuration conf = new Configuration();
//        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);//发布到flink集群(包括local/standalone/yarn等等)
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);//本地运行

        //checkpoint配置
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 60);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //设置statebackend
//        MemoryStateBackend stateBackend = new MemoryStateBackend("file:///D:\\idea_workspace\\flink-demo01\\cp","file:///D:\\idea_workspace\\flink-demo01\\sp");
//        MemoryStateBackend stateBackend = new MemoryStateBackend("hdfs://11.51.197.0:8020/data/ck/mem","hdfs://11.51.197.0:8020/data/sp");
        MemoryStateBackend stateBackend = new MemoryStateBackend();//默认构造参数将状态(state)进行快照并保存Jobmanager(master)的堆内存中
        System.out.println("是否是异步ck:"+stateBackend.isUsingAsynchronousSnapshots());//默认为异步快照
        env.setStateBackend(stateBackend);
        String topic = "t1";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "11.51.196.255:9092");
        prop.setProperty("group.id", "group_"+Math.random());
//        prop.setProperty("auto.offset.reset", "earliest");

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);

        myConsumer.setStartFromGroupOffsets();//默认消费策略

        DataStreamSource<String> text = env.addSource(myConsumer);

        text
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String input) throws Exception {
//                        System.out.println("接收到数据:" + input);
                        return new Tuple2<>(input, 1L);
                    }
                })
                .keyBy(0)
                .flatMap(new KeyedStateFunction())
                .addSink(new SinkFunction<String>() {
                    @Override
                    public void invoke(String value, Context context) throws Exception {
                        SinkFunction.super.invoke(value, context);
                    }
                })
                .setParallelism(1);

        env.execute("StreamingFromCollection");


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

FsStateBackend demo

package com.pony.kafka;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * StreamingKafkaSourceByFsState
 */
public class StreamingKafkaSourceByFsState {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        Configuration conf = new Configuration();
//        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //checkpoint配置
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 60);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //设置statebackend
        //The threshold for file state size must be in [-1, 1048576],default:1K
        FsStateBackend stateBackend = new FsStateBackend(new Path("hdfs://11.51.197.0:8020/data/ck/fs").toUri(), 1024 * 1024);
//        FsStateBackend stateBackend = new FsStateBackend("file:///D:\\idea_workspace\\flink-demo01\\cp"); //本地文件系统
        System.out.println("是否是异步ck:" + stateBackend.isUsingAsynchronousSnapshots());
        env.setStateBackend(stateBackend);
        String topic = "t1";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "11.51.196.255:9092");
        prop.setProperty("group.id", "group_" + Math.random());
//        prop.setProperty("auto.offset.reset", "earliest");

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);

        myConsumer.setStartFromGroupOffsets();//默认消费策略

        DataStreamSource<String> text = env.addSource(myConsumer);

        text
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String input) throws Exception {
                        System.out.println("接收到数据:" + input);
                        return new Tuple2<>(input, 1L);
                    }
                })
                .keyBy(0)
                .flatMap(new KeyedStateFunction())
                .print()
                .setParallelism(1);

        env.execute("StreamingFromCollection");


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

RocksDBStateBackend demo

package com.pony.kafka;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * StreamingKafkaSourceRocksDBState
 */
public class StreamingKafkaSourceRocksDBState {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        Configuration conf = new Configuration();
//        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        //checkpoint配置
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 60);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //设置statebackend
        RocksDBStateBackend stateBackend = new RocksDBStateBackend("hdfs://11.51.197.0:8020/data/ck/rocksdb", true);
//        RocksDBStateBackend stateBackend = new RocksDBStateBackend("hdfs://11.51.197.0:8020/data/ck/rocksdb", false);//全量checkpoint

        System.out.println("是否是增量ck:" + stateBackend.isIncrementalCheckpointsEnabled());
        stateBackend.setDbStoragePath("/data/rocksdb/rocks_db/db01");
        env.setStateBackend(stateBackend);
        String topic = "t1";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "11.51.196.255:9092");
        prop.setProperty("group.id", "group_" + Math.random());
//        prop.setProperty("auto.offset.reset", "earliest");

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);

        myConsumer.setStartFromGroupOffsets();//默认消费策略

        DataStreamSource<String> text = env.addSource(myConsumer);

        text
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String input) throws Exception {
                        System.out.println("接收到数据:" + input);
                        return new Tuple2<>(input, 1L);
                    }
                })
                .keyBy(0)
                .flatMap(new KeyedStateFunction())
                .print()
                .setParallelism(1);

        env.execute("StreamingFromCollection");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

Flink 状态后端测试结果分析

测试数据为1G的数据量

不同状态后端结果分析

MemoryStateBackend

MemoryStateBackend在数据量小的情况下的目录储存情况(指定hdfs存储ck,不指定默认为jobmanager),如下:
在这里插入图片描述
在这里插入图片描述
MemoryStateBackend在数据量大的情况下,会引起taskmanager:java.lang.OutOfMemoryError: Java heap space

FsStateBackend

FsStateBackend在数据量小的情况下的目录储存情况,如下:
在这里插入图片描述

FsStateBackend在数据量大的情况下,同样也会引起taskmanager:java.lang.OutOfMemoryError: Java heap space

RocksDBStateBackend

在这里插入图片描述
RocksDBStateBackend在数据量大的情况下不会引起taskmanager OOM

增量/全量状态后端结果分析

如前面介绍只有RocksDBStateBackend支持增量Checkpoint,因此只在RocksDBStateBackend基础上做全量与增量的分析。

RocksDBStateBackend 增量Checkpoint

数据量分析:
在这里插入图片描述
增量CK过程:
在这里插入图片描述
增量CK存储结果:

在这里插入图片描述
增量savepoint过程:
在这里插入图片描述
增量savepoint存储结果:
在这里插入图片描述

RocksDBStateBackend 全量Checkpoint

数据量分析:
在这里插入图片描述
全量CK过程:
在这里插入图片描述
全量CK存储结果:
在这里插入图片描述
全量savepoint过程:
在这里插入图片描述
全量savepoint存储结果:
在这里插入图片描述

结果分析

增量checkpoint相比于全量checkpoint,数据在每次做checkpoint时的数据量会小很多,并且增量式checkpoint,状态后端总的数据量也大大减少。但无论增量还是全量在形成savepoint时,生成的状态(state)数据的数据量大小是相同的。

分别从savepoint/checkpoint恢复状态数据

在Savepoint增量checkpoint的基础知识进行以下测试分析

从checkpoint恢复

指定要恢复的checkpoint的目录
在这里插入图片描述
承接上一次的checkpoint(chk-43),在上一次chk序列的基础上累加,如下:
在这里插入图片描述
从Checkpoint恢复后Checkpoint的结果:
在这里插入图片描述

从savepoint 恢复

指定savepoint的目录
在这里插入图片描述
承接最后一次有状态变化的checkpoint(注意与从checkpoint恢复的区别

在这里插入图片描述
从savepoint恢复后Checkpoint的结果:

在这里插入图片描述

结果分析:

在不考虑代码变更的情况下等因素的情况下,从savepoint/checkpoint都能恢复状态数据,并且用法相似。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/975550
推荐阅读
相关标签
  

闽ICP备14008679号