赞
踩
State场景:
flink中有状态函数和运算符在各个元素(element)/事件(event)的处理过程中存储的数据,这些数据可以修改和查询,可以自己维护,根据自己的业务场景,保存历史数据或者中间结果到状态(state)中);
使用状态计算的例子:
比如:以wordcount中计算pv/uv为例:
输出的结果跟之前的状态有关系,不符合幂等性,访问多次,pv会增加;
为什么需要state管理
流式作业的特点是7*24小时运行,数据不重复消费,不丢失,保证只计算一次,数据实时产出不延迟,但是当状态很大,内存容量限制,或者实例运行奔溃,或需要扩展并发度等情况下,如何保证状态正确的管理,在任务重新执行的时候能正确执行,状态管理就显得尤为重要。
理想中的state管理
checkpoint 机制是 Flink 可靠性的基石,可以保证 Flink 集群在某个算子因为 某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某 一状态,保证应用流图状态的一致性.
快照的实现算法:
简单算法–暂停应用, 然后开始做检查点, 再重新恢复应用
Flink 的改进 Checkpoint 算法. Flink 的 checkpoint 机制原理来自 "Chandy-Lamport algorithm"算法(分布式快照算)的一种变体: 异步 barrier 快照 (asynchronous barrier snapshotting)
每个需要 checkpoint 的应用在启动时,Flink 的 JobManager 为其创 建一个 CheckpointCoordinator ,CheckpointCoordinator 全权负责本应用 的快照制作。
流的 barrier 是 Flink 的 Checkpoint 中的一个核心概念. 多个 barrier 被插入到数据 流中, 然后作为数据流的一部分随着数据流动(有点类似于 Watermark).这些 barrier 不会 跨越流中的数据.
每个 barrier 会把数据流分成两部分: 一部分数据进入 当前的快照 , 另一部分数据进入下一个快照每个 barrier 携带着快照的 id. barrier 不会暂停数据的流动, 所以非 常轻量级. 在流中, 同一时间可以有来源于多个不同快照的多个 barrier, 这个意味着可 以并发的出现不同的快照.
第一步: Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint. 然后 Source Task 会在数据流中安插 CheckPoint barrier
第二步: source 节点向下游广播 barrier,这个 barrier 就是实现 ChandyLamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才 会执行相应的 Checkpoint
第三步: 当 task 完成 state 备份后,会将备份数据的地址(state handle) 通知给 Checkpoint coordinator。
第四步: 下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行 本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没 有上传的文件进行持久化备份(紫色小三角)。
第五步: 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。
第六步: 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。
Checkpoint总体过程
在多并行度下, 如果要实现严格一次, 则要执行barrier 对齐。
当 job graph 中的每个 operator 接收到 barriers 时,它就会记录下其状态。拥有 两个输入流的 Operators(例如 CoProcessFunction)会执行 barrier 对齐 (barrier alignment) 以便当前快照能够包含消费两个输入流 barrier 之前(但不超过)的所有 events 而产生的状态。
当 operator 收到数字流的 barrier n 时, 它就 不能处理(但是可以接收) 来自该流的任 何数据记录,直到它从字母流所有输入接收到 barrier n 为止。否则,它会混合属于快 照 n 的记录和属于快照 n + 1 的记录。
接收到 barrier n 的流(数字流)暂时被搁置。从这些流接收的记录入输入缓冲区, 不会被处理。
图一中的 Checkpoint barrier n 之后的数据 123 已结到达了算子, 存入到输入缓冲 区没有被处理, 只有等到字母流的 Checkpoint barrier n 到达之后才会开始处理.
一旦最后所有输入流都接收到 barrier n,Operator 就会把缓冲区中 pending 的输出数 据发出去,然后把 CheckPoint barrier n 接着往下游发送。这里还会对自身进行快照。
前面介绍了 barrier 对齐, 如果 barrier 不对齐会怎么样?会重复消费, 就是 至少一次 语义.
假设不对齐, 在字母流的 Checkpoint barrier n 到达前, 已经处理了 1 2 3. 等字母 流 Checkpoint barrier n 到达之后, 会做 Checkpoint n. 假设这个时候程序异常错误了, 则重新启动的时候会 Checkpoint n 之后的数据重新计算. 1 2 3 会被再次被计算, 所以 123 出现了重复计算.
Savepoint | Checkpoint |
---|---|
Savepoint 是由命令触发, 由用户创建和删 除 | Checkpoint 被保存在用户指定的外部路径中 |
保存点存储在标准格式存储中,并且可以升 级作业版本并可以更改其配置。 | 当作业失败或被取消时,将保留外部存储的 检查点。 |
用户必须提供用于还原作业状态的保存点 的路径。 | 用户必须提供用于还原作业状态的检查点 的路径。 |
package com.aikfk.flink.datastream.state; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.ArrayList; import java.util.List; /** * @author :caizhengjie * @description:TODO * @date :2021/3/31 4:04 下午 */ public class CheckPoint { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000ms env.enableCheckpointing(1000); // advanced options // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 env.getCheckpointConfig().setCheckpointTimeout(10000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 如果有更近的保存点时,是否将作业回退到该检查点 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); env.getCheckpointConfig().enableUnalignedCheckpoints(); env.setStateBackend(new FsStateBackend( "file:Users/caizhengjie/Desktop/test ",true)); DataStreamSink<Tuple2<String,Long>> dataStream = env.addSource(new MySource()) .map(new MapFunction<String, Tuple2<String,Long>>() { @Override public Tuple2<String, Long> map(String line) throws Exception { String[] words = line.split(","); return new Tuple2<>(words[0],Long.parseLong(words[1])); } }) .keyBy(value -> value.f0) .addSink(new BufferingSink()); env.execute("KeyedState"); } static class BufferingSink implements SinkFunction<Tuple2<String,Long>>, CheckpointedFunction { private ListState<Tuple2<String,Long>> listState; private List<Tuple2<String,Long>> bufferedElements = new ArrayList<>(); @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Long>> descriptor = new ListStateDescriptor<Tuple2<String, Long>>("bufferedSinkState", TypeInformation.of(new TypeHint<Tuple2<String,Long>>() {})); listState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()){ for (Tuple2<String, Long> element : listState.get()){ bufferedElements.add(element); } } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { for (Tuple2<String, Long> element : bufferedElements){ listState.add(element); } } @Override public void invoke(Tuple2<String,Long> value, Context context) throws Exception { bufferedElements.add(value); System.out.println("invoke>>> " + value); for (Tuple2<String,Long> element : bufferedElements){ System.out.println(Thread.currentThread().getId() + " >> " + element.f0 + " : " + element.f1); } } } public static class MySource implements SourceFunction<String> { @Override public void cancel() { } @Override public void run(SourceContext<String> ctx) throws Exception { String data = "s,4"; while (true) { ctx.collect(data); } } } }
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。