赞
踩
虽然数据流中的许多操作一次只查看一个单独的事件(例如事件分析器),但某些操作会记住跨多个事件的信息(例如窗口运算符)。这些操作称为有状态操作。
需要状态的一些场景:
Flink 需要了解状态,以便使用 checkpoints 和 savepoints 使其容错。
状态还允许重新扩展 Flink 应用程序,这意味着 Flink 负责在并行实例之间重新分发状态。
可查询状态允许你在运行时从 Flink 外部访问状态。
在使用状态时, Flink 的 state backends 也可能很有用。Flink 提供了不同的 state backends,指定了状态的存储方式和位置。
Managed State:托管状态
Raw State:原始状态
Managed State 分为两种,Keyed State(键控状态) 和 Operator State(算子状态) (Raw State都是Operator State)。
Keyed State(键控状态)
ValueState
、ListState
、MapState
、ReducingState
和 AggregatingState
等等Operator State(算子状态)
又称为 non-keyed state,每一个 operator state 都仅与一个 operator 的实例(1个SubTask任务)绑定;
可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态;
常见的 operator state 是数据源 source state,例如记录当前 source 的 offset;
存储数据结构:ListState
或 BroadcastState
等等
KeyedState案例:
// todo: 自定义状态,实现max算子获取最大值,此处KeyedState定义 SingleOutputStreamOperator<String> statStream = tupleStream // 指定城市字段进行分组 .keyBy(tuple -> tuple.f0) // 处理流中每条数据 .map(new RichMapFunction<Tuple3<String, String, Long>, String>() { // todo: 第1步、定义变量,存储每个Key对应值,所有状态State实例化都是RuntimeContext实例化 private ValueState<Long> maxState = null ; // 处理流中每条数据之前,初始化准备工作 @Override public void open(Configuration parameters) throws Exception { // todo: 第2步、初始化状态,开始默认值null maxState = getRuntimeContext().getState( new ValueStateDescriptor<Long>("maxState", Long.class) ); } @Override public String map(Tuple3<String, String, Long> value) throws Exception { // 获取流中数据对应值 Long currentValue = value.f2; // todo: step3、从状态中获取存储key以前值 Long historyValue = maxState.value(); // 如果数据为key分组中第一条数据;没有状态,值为null if(null == historyValue ||historyValue < currentValue){ // todo: step4、更新状态值 maxState.update(currentValue); } // 返回状态的最大值 return value.f0 + " -> " + maxState.value(); } });
Flink State Time-To-Live:状态的存活时间。
设置状态 TTL 过期:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。
TTL 的更新策略(默认是 OnCreateAndWrite):
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。
注意:
状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。 Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list
或者 map 的每个元素)序列化后增加 8 个字节。暂时只支持基于 processing time 的 TTL。
尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。
过期数据清理:
默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground()
.build();
可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。
Checkpoint:也就是检查点,是用来故障恢复的一种机制。 Spark 也有 Checkpoint ,Flink 与 Spark 一样,都是用 Checkpoint 来存储某一时间或者某一段时间的快照(snapshot),用于将任务恢复到指定的状态。
Flink 的 Checkpoint 的核心算法叫做 Chandy-Lamport ,是一种分布式快照(Distributed Snapshot)算法,应用到流式系统中就是确定一个 Global 的 Snapshot,错误处理的时候各个节点根据上一次的 Global Snapshot 来恢复。
Checkpoint 实现的核心就是 barrier(栅栏或屏障),Flink 通过在数据集上间隔性的生成屏障 barrier,并通过 barrier 将某段时间内的状态 State 数据保存到 Checkpoint 中(先快照,再保存)。
栅栏对齐:下游 SubTask 必须接收到上游的所有 SubTask 发送 Barrier 栅栏信号,才开始进行 Checkpoint 操作。
Checkpoint 其实就是 Flink 中某一时刻,所有的 Operator 的全局快照,那么快照应该要有一个地方进行存储,而这个存储的地方叫做状态后端(StateBackend)。
Flink 1.13 中将状态State和检查点Checkpoint两者区分开来。State Backend 的概念变窄,只描述状态访问和存储;Checkpoint storage,描述的是 Checkpoint 行为,如 Checkpoint 数据是发回给 JM 内存还是上传到远程。
private static void setEnvCheckpoint(StreamExecutionEnvironment env) { // 1.设置checkpoint时间间隔 env.enableCheckpointing(1000); // 2.设置状态后端 env.getCheckpointConfig().setCheckpointStorage("file:///D:/flink-checkpoints/"); // 3.没置两个checkpoint 之问最少等待时间, env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 4.没置checkpoint时失败次数,允许失败几次 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 5.设置是否清理检查点,表示 Cancel 时是否需要保当前的 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 6,设置checkpoint的执行供式为EXACTLY_ONCE(默认),注意: 需要外部支持,如Source和sink的支持 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 7,设置checkpoint的超时时间,如果 Checkpoint在 60s内尚未完成说明该次Checkpoint失败,则丢弃. env.getCheckpointConfig().setCheckpointTimeout(60000); // 8。设置同一时间有多少个checkpoint可以同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 9.没置重启策略: NoRestart env.setRestartStrategy(RestartStrategies.noRestart()); }
程序重启:
// 设置自动重后策略,比如最大重启 3 次,每次间隔时间 10 秒
env.setRestartStrategy( RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));
Savepoint ,手动设置checkpoint:
# Trigger a Savepoint
$ bin/flink savepoint :jobId [:targetDirectory]
# Trigger a Savepoint with YARN
$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
# Stopping a Job with Savepoint
$ bin/flink stop --savepointPath [:targetDirectory] :jobId
# Resuming from Savepoint
$ bin/flink run -s :savepointPath [:runArgs]
详细区别:
流处理引擎通常为应用程序提供了三种数据处理语义:最多一次、至少一次和精确一次。
如下是对这些不同处理语义的宽松定义(一致性由弱到强):
At most noce < At least once < Exactly once < End to End Exactly once
端到端的精确一次
结果的正确性贯穿了整个流处理应用的始终,每一个组件都保证了它自己的一致性;Flink 应用从 Source 端开始到 Sink 端结束,数据必须经过的起始点和结束点;
实现方式:
要求:
Exactly-once 两阶段提交步骤总结:
第1步、Flink 消费到 Kafka 数据之后,就会开启一个 Kafka 的事务,正常写入 Kafka 分区日志但标记为未提交,这就是 Pre-commit(预提交)。
第2步、一旦所有的 Operator 完成各自的 Pre-commit ,它们会发起一个 commit 操作。
第3步、如果有任意一个 Pre-commit 失败,所有其他的 Pre-commit 必须停止,并且 Flink 会回滚到最近成功完成的 Checkpoint。
第4步、当所有的 Operator 完成任务时,Sink 段就收到 Checkpoint barrier(检查点分界线),Sink 保存当前状态存入 Checkpoint ,通知JobManager,并提交外部事务,用于提交外部检查点的数据。
第5步、JobManager 收到所有任务的通知,发出确认信息,表示 Checkpoint 已完成,Sink 收到 JobManager 的确认信息,正式 commit (提交)这段时间的数据。
第6步、外部系统(Kafka)关闭事务,提交的数据可以正常消费了。
上述过程可以发现,一旦 Pre-commit 完成,必须要确保 commit 也要成功,Operator 和外部系统都需要对此进行保证。整个 两阶段提交协议 2PC 就是解决分布式事务问题,所以才能有如今 Flink 可以端到端精准一次处理。
checkpiont 流程:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。