当前位置:   article > 正文

Flink开发-实现有状态的计算_flink有状态计算

flink有状态计算

在这里插入图片描述
作为一个计算框架,Flink提供了有状态的计算,封装了一些底层的实现,比如状态的高效存储、Checkpoint和Savepoint持久化备份机制、计算资源扩缩容等问题。因为Flink接管了这些问题,开发者只需调用Flink API,这样可以更加专注于业务逻辑

1. Checkpoint原理及配置方法

Flink实时计算为了容错,可以将中间数据定期保存起来,这种定期触发保存中间结果的机制叫CheckPointing,CheckPointing是周期执行的,具体过程是JobManager定期向TaskManager中的SubTask发送RPC消息,SubTask将其计算的State保存到StateBackEnd中,并且向JobManager响应Checkpoint是否成功,如果程序出现异常或重启,TaskManager中的SubTask可以从上一次成功的CheckPointing的State恢复。
在这里插入图片描述

1.1 开启Checkpoint

默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
  • 1
  • 2
  • 3

Checkpoint 其他的属性包括:

  1. 精确一次(exactly-once)对比至少一次(at-least-once):你可以选择向enableCheckpointing(long interval, CheckpointingMode mode)方法中传入一个模式来选择使用两种保证等级中的哪一种。对于大多数应用来说,精确一次是较好的选择。至少一次可能与某些延迟超低(始终只有几毫秒)的应用的关联较大。
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 使用At-Least-Once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
  • 1
  • 2
  • 3
  • 4
  1. checkpoint 超时:如果 checkpoint 执行的时间超过了该配置的阈值仍未完成,还在进行中的 checkpoint 操作就会被终止,以免其占用太多资源。
// Checkpoint 必须在一分钟内完成,否则就会被终止
env.getCheckpointConfig().setCheckpointTimeout(6*1000);
  • 1
  • 2
  1. checkpoints 之间的最小时间:如果两次Checkpoint之间的间歇时间太短,那么正常的作业可能获取的资源较少,更多的资源被用在了Checkpoint上。对下面这个参数进行合理配置能保证数据流的正常处理。比如,设置这个参数为60秒,无论 checkpoint 持续时间与间隔是多久,前一次Checkpoint结束后60秒内不会启动新的Checkpoint。这种模式只在整个作业最多允许1个Checkpoint时适用。
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • 1
  • 2

往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。

  1. 并发 checkpoint 的数目: 默认情况下,在上一个 checkpoint 未完成(失败或者成功)的情况下,系统不会触发另一个 checkpoint。这确保了拓扑不会在 checkpoint 上花费太多时间,从而影响正常的处理流程。 不过允许多个 checkpoint 并行进行是可行的,对于有确定的处理延迟(例如某方法所调用比较耗时的外部服务),但是仍然想进行频繁的 checkpoint 去最小化故障后重跑的 pipelines 来说,是有意义的。该选项不能和 “checkpoints 间的最小时间"同时使用。如果这个参数大于1,将与前面提到的最短间隔相冲突。
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • 1
  • 2
  1. externalized checkpoints: Checkpoint的初衷是用来进行故障恢复,如果作业是因为异常而失败,Flink会保存远程存储上的数据;如果开发者自己取消了作业,远程存储上的数据都会被删除。如果开发者希望通过Checkpoint数据进行调试,自己取消了作业,同时希望将远程数据保存下来。
// 作业取消后仍然保存Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • 1
  • 2
  1. Flink 现有的 Checkpoint 机制下,每个算子需要等到收到所有上游发送的 Barrier 对齐后才可以进行 Snapshot 并继续向后发送 barrier。在反压的情况下,Barrier 从上游算子传送到下游可能需要很长的时间,从而导致 Checkpoint 超时的问题。针对这一问题,Flink 1.11 增加了 Unaligned Checkpoint 机制。开启 Unaligned Checkpoint 后当收到第一个 barrier 时就可以执行 checkpoint,并把上下游之间正在传输的数据也作为状态保存到快照中,这样 checkpoint 的完成时间大大缩短,不再依赖于算子的处理能力,解决了反压场景下 checkpoint 长期做不出来的问题。
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
  • 1
  • 2

1.2 State Backend

State Backend起到了持久化存储数据的重要功能。Flink将State Backend抽象成了一种插件,并提供了三种State Backend,每种State Backend对数据的保存和恢复方式略有不同。

1.2.1 MemoryStateBackend

从名字中可以看出,这种State Backend主要基于内存,它将数据存储在Java的堆区。当进行分布式快照时,所有算子子任务将自己内存上的状态同步到JobManager的堆上。因此,一个作业的所有状态要小于JobManager的内存大小。这种方式显然不能存储过大的状态数据,否则将抛出OutOfMemoryError异常。这种方式只适合调试或者实验,不建议在生产环境下使用。下面的代码告知一个Flink作业使用内存作为State Backend,并在参数中指定了状态的最大值,默认情况下,这个最大值是5MB。

env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
  • 1

如果不做任何配置,默认情况是使用内存作为State Backend。

1.2.2 FsStateBackend

这种方式下,数据持久化到文件系统上,文件系统包括本地磁盘、HDFS以及包括Amazon、阿里云在内的云存储服务。使用时,我们要提供文件系统的地址,尤其要写明前缀,比如:file://、hdfs://或s3://。此外,这种方式支持Asynchronous Snapshot,默认情况下这个功能是开启的,可加快数据同步速度。

// 使用HDFS作为State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));
// 关闭Asynchronous Snapshot
env.setStateBackend(new FsStateBackend(checkpointPath, false));
  • 1
  • 2
  • 3
  • 4

Flink的本地状态仍然在TaskManager的内存堆区上,直到执行快照时状态数据会写到所配置的文件系统上。因此,这种方式能够享受本地内存的快速读写访问,也能保证大容量状态作业的故障恢复能力。

1.2.3 RocksDBStateBackend

这种方式下,本地状态存储在本地的RocksDB上。RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。然而,每次从RocksDB中读写数据都需要进行序列化和反序列化,因此读写本地状态的成本更高。快照执行时,Flink将存储于本地RocksDB的状态同步到远程的存储上,因此使用这种State Backend时,也要配置分布式存储的地址。Asynchronous Snapshot在默认情况也是开启的。

此外,这种State Backend允许增量快照(Incremental Checkpoint),Incremental Checkpoint的核心思想是每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。Incremental Checkpoint非常适合超大规模的状态,快照的耗时将明显降低,同时,它的代价是重启恢复的时间更长。默认情况下,Incremental Checkpoint没有开启,需要我们手动开启。

// 开启Incremental Checkpoint
boolean enableIncrementalCheckpointing = true;
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing));
  • 1
  • 2
  • 3

2. 故障重启恢复流程

2.1 重启恢复基本流程

Flink的重启恢复逻辑相对比较简单:

  1. 重启应用,在集群上重新部署数据流图。
  2. 从持久化存储上读取最近一次的Checkpoint数据,加载到各算子子任务上。
  3. 继续处理新流入的数据。

这样的机制可以保证Flink内部状态的Excatly-Once一致性。至于端到端的Exactly-Once一致性,要根据Source和Sink的具体实现而定,我们还会在第7章端到端Exactly-Once详细讨论。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢不重。像Kafka等消息队列是提供重发功能的,socketTextStream就不具有这种功能,也意味着不能保证端到端的Exactly-Once投递保障。

当一个作业出现故障,进行重启时,势必会暂停一段时间,这段时间上游数据仍然继续发送过来。作业被重新拉起后,肯定需要将刚才未处理的数据消化掉。这个过程可以被理解为,一次跑步比赛,运动员不慎跌倒,爬起来重新向前追击。为了赶上当前最新进度,作业必须以更快的速度处理囤积的数据。所以,在设定资源时,我们必须留出一定的富余量,以保证重启后这段“赶进度”过程中的资源消耗。

2.2 重启策略

一般情况下,一个作业遇到一些异常情况会导致运行异常,潜在的异常情况包括:机器故障、部署环境抖动、流量激增、输入数据异常等。以输入数据异常为例,如果一个作业发生了故障重启,如果触发故障的原因没有根除,那么重启之后仍然会出现故障。因此,在解决根本问题之前,一个作业很可能无限次地故障重启,陷入死循环。为了避免重启死循环,Flink提供了三种重启策略:

  • 固定延迟(Fixed Delay)策略:作业每次失败后,按照设定的时间间隔进行重启尝试,重启次数不会超过某个设定值。
  • 失败率(Failure Rate)策略:计算一个时间段内作业失败的次数,如果失败次数小于设定值,继续重启,否则不重启。
  • 不重启(No Restart)策略:不对作业进行重启。

2.2.1 Fixed Delay(固定延时重启)

Fixed Delay策略下,作业最多重启次数不会超过某个设定值,两次重启之间有一个可设定的延迟时间。超过最多重启次数后,该作业被认定为失败。两次重启之间有延迟,是考虑到一些作业与外部系统有连接,连接一般会设置超时,频繁建立连接对数据准确性和作业运行都不利。如果在程序中用代码配置,可以写为:

        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //这表示作业最多自动重启3次,两次重启之间有5秒的延迟。
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));
  • 1
  • 2
  • 3
  • 4

如果开启了Checkpoint,但没有设置重启策略,Flink会默认使用这个策略,最大重启次数为Integer.MAX_VALUE

2.2.2 Failure Rate(故障率重启)

Failure Rate策略下,在设定的时间内,重启失败次数小于设定阈值,该作业继续重启,重启失败次数超出设定阈值,该作业被最终认定为失败。两次重启之间会有一个等待的延迟。在程序中用代码配置,可以写为:

        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //这表示在30秒的时间内,重启次数小于3次时,继续重启,否则认定该作业为失败。两次重启之间的延迟为3秒。
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(30),Time.seconds(3)));
  • 1
  • 2
  • 3
  • 4

2.2.3 No Restart(不重启)

No Restart策略下,一个作业遇到异常情况后,直接被判定为失败,不进行重启尝试。使用代码配置,可以写为:

        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //这表示作业发生异常直接失败
        env.setRestartStrategy(RestartStrategies.noRestart());
  • 1
  • 2
  • 3
  • 4

3. Flink的几种状态类型

Keyed StateOperator State
适用算子类型只适用于KeyedStream上的算子可以用于所有算子
状态分配每个Key对应一个状态一个算子子任务对应一个状态
创建和访问方式重写Rich Function,通过里面的RuntimeContext访问实现CheckpointedFunction等接口
横向扩展状态随着Key自动在多个算子子任务上迁移有多种状态重新分配的方式
支持的数据结构ValueState、ListState、MapState等ListState、BroadcastState等

3.1 Keyed State

对于Keyed State,Flink提供了几种现成的数据结构供我们使用,包括ValueState、ListState等,他们的继承关系如下图所示。首先,State主要有三种实现,分别为ValueState、MapState和AppendingState、AppendingState又可以细分为ListState、ReducingState和AggregatingState。
这几个状态的具体区别在于:

  • ValueState是单一变量的状态,T是某种具体的数据类型,比如Double、String,或我们自己定义的复杂数据结构。我们可以使用T value()方法获取状态,使用void update(T value)更新状态。
  • MapState<UK, UV>存储一个Key-Value Map,其功能与Java的Map几乎相同。UV get(UK key)可以获取某个Key下的Value值,void put(UK key, UV value)可以对某个Key设置Value,boolean contains(UK key)判断某个Key是否存在,void remove(UK key)删除某个Key以及对应的Value,Iterable<Map.Entry<UK, UV>> entries()返回MapState中所有的元素,Iterator<Map.Entry<UK, UV>> iterator()返回状态的迭代器。需要注意的是,MapState中的Key和Keyed State的Key不是同一个Key。
  • ListState存储了一个由T类型数据组成的列表。我们可以使用void add(T value)或void addAll(List values)向状态中添加元素,使用Iterable get()获取整个列表,使用void update(List values)来更新列表,新的列表将替换旧的列表。
  • ReducingState和AggregatingState<IN, OUT>与ListState同属于MergingState<IN, OUT>。与ListState不同的是,ReducingState只有一个元素,而不是一个列表。它的原理是:新元素通过void add(T value)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState<IN, OUT>与ReducingState类似,也只有一个元素,只不过AggregatingState<IN, OUT>的输入和输出类型可以不一样。ReducingState和AggregatingState<IN, OUT>与窗口上进行ReduceFunction和AggregateFunction很像,都是将新元素与已有元素做聚合。

3.1.1 ValueState

Keyed State会帮助我们管理key,我们在使用时只需要关注value就可以。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,5000));
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String s) throws Exception {
                return Tuple2.of(s,1);
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = wordAndOne.keyBy(0).map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            private transient ValueState<Integer> counter;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //想使用状态要先定义一个状态描述器(State的名称,类型)
                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-desc", Integer.class);
                //初始化或恢复历史状态
                counter = getRuntimeContext().getState(stateDescriptor);
            }
            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                Integer currentCount = input.f1;
                //从ValueState中取出历史次数
                Integer historyCount = counter.value();//获取当前key对应的value
                if (historyCount == null) {
                    historyCount = 0;
                }
                Integer toutle = historyCount + currentCount;//累加
                //更新内存中的状态
                counter.update(toutle);
                input.f1 = toutle;
                return input;
            }
        });
        map.print();
        env.execute("");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

3.1.2 MapState

相当于ValueState中存放Map

    public static void main(String[] args) throws Exception {
        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //调用Transformation
        SingleOutputStreamOperator<Tuple3<String, String, Double>> tpDataStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
            @Override
            public Tuple3<String, String, Double> map(String s) throws Exception {
                String[] fields = s.split(",");
                return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
            }
        });
        KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple3<String, String, Double>> result = keyedStream.process(new KeyedProcessFunction<String, Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {
            //transient表示该值不参与序列化反序列化
            private transient MapState<String, Double> mapState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定义一个状态描述器
                MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<>("kv-state", String.class, Double.class);
                //初始化或恢复历史状态
                mapState = getRuntimeContext().getMapState(stateDescriptor);
            }

            @Override
            public void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
                String city = value.f1;
                Double money = value.f2;
                Double historyMoney = mapState.get(city);
                if (historyMoney == null) {
                    historyMoney = 0.0;
                }
                Double totalMoney = historyMoney + money;
                //更新State
                mapState.put(city, totalMoney);
                value.f2 = totalMoney;
                out.collect(value);
            }
        });
        result.print();
        env.execute("StreamingWordCount");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

3.1.3 ListState

相当于ValueState中存放List

    public static void main(String[] args) throws Exception {
        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, String>> tpDataStream = lines.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                String[] fields = s.split(",");
                return Tuple2.of(fields[0], fields[1]);
            }
        });
        KeyedStream<Tuple2<String, String>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, List<String>>> process = keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, List<String>>>() {
            private transient ListState<String> listState;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("list_state", String.class);
                //初始化或恢复状态
                listState = getRuntimeContext().getListState(listStateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
                listState.add(value.f1);
                Iterable<String> iterator = listState.get();
                ArrayList<String> event = new ArrayList<>();
                for (String name : iterator) {
                    event.add(name);
                }
                out.collect(Tuple2.of(value.f0, event));
            }
        });
        process.print();
        env.execute("StreamingWordCount");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

通过ValueState中存放List来实现相同效果

    public static void main(String[] args) throws Exception {
        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //调用Transformation
        SingleOutputStreamOperator<Tuple2<String, String>> tpDataStream = lines.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String s) throws Exception {
                String[] fields = s.split(",");
                return Tuple2.of(fields[0], fields[1]);
            }
        });
        KeyedStream<Tuple2<String, String>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, List<String>>> process = keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, List<String>>>() {
            private transient ValueState<List<String>> listValueState;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                ValueStateDescriptor<List<String>> listValueStateDescriptor = new ValueStateDescriptor<>("lst_state", TypeInformation.of(new TypeHint<List<String>>(){}));
                listValueState = getRuntimeContext().getState(listValueStateDescriptor);
            }

            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
                List<String> lst = listValueState.value();
                if (lst == null){
                    lst = new ArrayList<String>();
                }
                lst.add(value.f1);
                listValueState.update(lst);
                out.collect(Tuple2.of(value.f0, lst));
            }
        });
        process.print();
        env.execute("StreamingWordCount");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

3.1.4 自己实现一个简单的KeyedState(只是测试,不建议这么使用)

实现的功能:

  • 在KeyBy后的SubTask中定义一个HashMap保存中间结果
  • 可以定期的将HashMap中的数据持久化到磁盘
  • 在SubTask出现异常重启时,通过open方法读取磁盘中的文件。恢复历史状态。

区别:这里每个定时器都是在SubTask中定期执行的,而Checkpoint是由JobManager发起的。

    public static void main(String[] args) throws Exception {
        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //这表示作业最多自动重启3次,两次重启之间有5秒的延迟。
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //调用Transformation
        SingleOutputStreamOperator<String> wordDataStream = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] words = s.split(" ");
                for (String word : words) {
                    if ("error".equals(word)){
                        throw new RuntimeException("出现异常!");
                    }
                    collector.collect(word);
                }
            }
        });
        //将单词和1组合
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s, 1);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tp) throws Exception {
                return tp.f0;
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            //存储中间结果的集合
            private HashMap<String, Integer> counter;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //获取当前SubTask的编号
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                File ckFile = new File("src/main/resources/myKeystate/"+indexOfThisSubtask);
                if (ckFile.exists()){
                    FileInputStream fileInputStream = new FileInputStream(ckFile);
                    ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
                    counter = (HashMap<String, Integer>)objectInputStream.readObject();
                }else {
                    counter = new HashMap<>();
                }
                //简化直接在当前SubTask启动一个定时器
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                       while (true){
                           try {
                               Thread.sleep(10000);
                               if (!ckFile.exists()){
                                   ckFile.createNewFile();
                               }
                               //将HashMap对象持久化到文件中
                               ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(ckFile));
                               objectOutputStream.writeObject(counter);
                               objectOutputStream.flush();
                               objectOutputStream.close();
                           } catch (Exception e) {
                               e.printStackTrace();
                           }
                       }
                    }
                }).start();
            }
            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
                String word = input.f0;
                Integer count = input.f1;
                //从map中取出历史数据
                Integer historyCount = counter.get(word);
                if (historyCount == null) {
                    historyCount = 0;
                }
                int sum = historyCount + count;
                counter.put(word, sum);
                return Tuple2.of(word, sum);
            }
        });
        map.print();
        env.execute("StreamingWordCount");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

3.2 Operator State

状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。状态相关的主要逻辑有两项:

  1. 将算子子任务本地内存数据在Checkpoint时写入存储,这步被称为备份(Snapshot);
  2. 初始化或重启应用时,以一定的逻辑从存储中读出并变为算子子任务的本地内存数据,这步被称为重建(Restore)。

Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用。对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink作业后,某个数据流元素不一定流入重启前的算子子任务上。因此,使用Operator State时,我们需要根据自己的业务场景来设计Snapshot和Restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。

实现一个通过Operator State记录读取文件偏移量的Source:

package org.example.Restart;

import org.apache.commons.io.Charsets;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.io.RandomAccessFile;


/***********************************
 *@Desc TODO
 *@ClassName MyAtLeastOnceSource
 *@Author DLX
 *@Data 2021/5/18 13:09
 *@Since JDK1.8
 *@Version 1.0
 ***********************************/
public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
    private Long offset = 0L;
    private String path;
    private boolean flag = true;
    private transient ListState<Long> listState;

    public MyAtLeastOnceSource(String path) {
        this.path = path;
    }

    //相当于open方法在初始化状态或回复状态时执行一次
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<>("offset-state", Long.class);
        listState = context.getOperatorStateStore().getListState(listStateDescriptor);
        //当前的状态是否已经恢复了
        if (context.isRestored()){
            //从listState中恢复便宜量
            Iterable<Long> iterable = listState.get();
            for (Long aLong : iterable) {
                offset = aLong;
            }
        }
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        RandomAccessFile randomAccessFile = new RandomAccessFile(path + "/" + indexOfThisSubtask + ".txt","r");
        randomAccessFile.seek(offset);//从指定位置读取数据
        while (flag){
            String line = randomAccessFile.readLine();
            if (line != null){
                line = new String(line.getBytes(Charsets.ISO_8859_1),Charsets.UTF_8);
                //对offset加锁,防止更新偏移量的时候进行checkPoint
                synchronized (ctx.getCheckpointLock()){
                    offset = randomAccessFile.getFilePointer();
                    ctx.collect(indexOfThisSubtask+".txt:"+line);
                }
            }else {
                Thread.sleep(1000);
            }
        }
    }
    //在做CheckPoint时周期性执行
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //定期更新OperatorState
        listState.clear();
        listState.add(offset);
    }

    @Override
    public void cancel() {
        flag = false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

main方法:

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //对状态做快照
        env.setParallelism(4);
        env.enableCheckpointing(30000);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<String> errorStream = socketStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value.startsWith("error")) {
                    int i = 10 / 0;
                }
                return value;
            }
        });
        DataStreamSource<String> dataStreamSource = env.addSource(new MyAtLeastOnceSource("D:\\IntelliJ_IDEA_Code_Space\\DataDemo\\input"));
        //如果不union在一起当出现异常时,上面的流会重启下面的流不会重启
        DataStream<String> union = errorStream.union(dataStreamSource);
        union.print();
        env.execute("");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

4. 状态存活时间(TTL)

4.1设置状态过期时间

任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,这会在后面详述。

所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。

在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:

    public static void main(String[] args) throws Exception {
        //创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        //Source
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        //调用Transformation
        SingleOutputStreamOperator<Tuple3<String, String, Double>> tpDataStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
            @Override
            public Tuple3<String, String, Double> map(String s) throws Exception {
                String[] fields = s.split(",");
                return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
            }
        });
        KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple3<String, String, Double>> result = keyedStream.process(new KeyedProcessFunction<String, Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {
            //transient表示该值不参与序列化反序列化
            private transient MapState<String, Double> mapState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //定义一个TTLConfig
                StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .build();
                //定义一个状态描述器
                MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<>("kv-state", String.class, Double.class);
                //关联状态描述器
                stateDescriptor.enableTimeToLive(ttlConfig);
                //初始化或恢复历史状态
                mapState = getRuntimeContext().getMapState(stateDescriptor);
            }

            @Override
            public void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
                String city = value.f1;
                Double money = value.f2;
                Double historyMoney = mapState.get(city);
                if (historyMoney == null) {
                    historyMoney = 0.0;
                }
                Double totalMoney = historyMoney + money;
                //更新State
                mapState.put(city, totalMoney);
                value.f2 = totalMoney;
                out.collect(value);
            }
        });

        result.print();
        env.execute("StreamingWordCount");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。

TTL 的更新策略(默认是 OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
  • StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新

数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp -
    会返回过期但未清理的数据

NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

注意:

  • 状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。 Heap state
    backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend
    会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。
  • 暂时只支持基于 processing time 的 TTL。
  • 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到
    “StateMigrationException”。
  • TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
  • 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持
    null, 可以用 NullableSerializer 包装一层。
  • State TTL 当前在 PyFlink DataStream API 中还不支持。

4.2 过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

4.2.1全量快照时进行清理

另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

注意:

  • 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。

4.2.2 增量数据清理

另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5

该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

注意:

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key
    的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

4.2.2 在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

你还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。
    对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
  • 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器,
    从而确定下一个未过期数据的位置。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/969864
推荐阅读
相关标签
  

闽ICP备14008679号