赞
踩
作为一个计算框架,Flink提供了有状态的计算,封装了一些底层的实现,比如状态的高效存储、Checkpoint和Savepoint持久化备份机制、计算资源扩缩容等问题。因为Flink接管了这些问题,开发者只需调用Flink API,这样可以更加专注于业务逻辑
Flink实时计算为了容错,可以将中间数据定期保存起来,这种定期触发保存中间结果的机制叫CheckPointing,CheckPointing是周期执行的,具体过程是JobManager定期向TaskManager中的SubTask发送RPC消息,SubTask将其计算的State保存到StateBackEnd中,并且向JobManager响应Checkpoint是否成功,如果程序出现异常或重启,TaskManager中的SubTask可以从上一次成功的CheckPointing的State恢复。
默认情况下 checkpoint 是禁用的。通过调用 StreamExecutionEnvironment 的 enableCheckpointing(n) 来启用 checkpoint,里面的 n 是进行 checkpoint 的间隔,单位毫秒。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
Checkpoint 其他的属性包括:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 使用At-Least-Once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
// Checkpoint 必须在一分钟内完成,否则就会被终止
env.getCheckpointConfig().setCheckpointTimeout(6*1000);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
往往使用“checkpoints 之间的最小时间”来配置应用会比 checkpoint 间隔容易很多,因为“checkpoints之间的最小时间”在 checkpoint 的执行时间超过平均值时不会受到影响(例如如果目标的存储系统忽然变得很慢)。
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 作业取消后仍然保存Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
State Backend起到了持久化存储数据的重要功能。Flink将State Backend抽象成了一种插件,并提供了三种State Backend,每种State Backend对数据的保存和恢复方式略有不同。
从名字中可以看出,这种State Backend主要基于内存,它将数据存储在Java的堆区。当进行分布式快照时,所有算子子任务将自己内存上的状态同步到JobManager的堆上。因此,一个作业的所有状态要小于JobManager的内存大小。这种方式显然不能存储过大的状态数据,否则将抛出OutOfMemoryError异常。这种方式只适合调试或者实验,不建议在生产环境下使用。下面的代码告知一个Flink作业使用内存作为State Backend,并在参数中指定了状态的最大值,默认情况下,这个最大值是5MB。
env.setStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE));
如果不做任何配置,默认情况是使用内存作为State Backend。
这种方式下,数据持久化到文件系统上,文件系统包括本地磁盘、HDFS以及包括Amazon、阿里云在内的云存储服务。使用时,我们要提供文件系统的地址,尤其要写明前缀,比如:file://、hdfs://或s3://。此外,这种方式支持Asynchronous Snapshot,默认情况下这个功能是开启的,可加快数据同步速度。
// 使用HDFS作为State Backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));
// 关闭Asynchronous Snapshot
env.setStateBackend(new FsStateBackend(checkpointPath, false));
Flink的本地状态仍然在TaskManager的内存堆区上,直到执行快照时状态数据会写到所配置的文件系统上。因此,这种方式能够享受本地内存的快速读写访问,也能保证大容量状态作业的故障恢复能力。
这种方式下,本地状态存储在本地的RocksDB上。RocksDB是一种嵌入式Key-Value数据库,数据实际保存在本地磁盘上。比起FsStateBackend的本地状态存储在内存中,RocksDB利用了磁盘空间,所以可存储的本地状态更大。然而,每次从RocksDB中读写数据都需要进行序列化和反序列化,因此读写本地状态的成本更高。快照执行时,Flink将存储于本地RocksDB的状态同步到远程的存储上,因此使用这种State Backend时,也要配置分布式存储的地址。Asynchronous Snapshot在默认情况也是开启的。
此外,这种State Backend允许增量快照(Incremental Checkpoint),Incremental Checkpoint的核心思想是每次快照时只对发生变化的数据增量写到分布式存储上,而不是将所有的本地状态都拷贝过去。Incremental Checkpoint非常适合超大规模的状态,快照的耗时将明显降低,同时,它的代价是重启恢复的时间更长。默认情况下,Incremental Checkpoint没有开启,需要我们手动开启。
// 开启Incremental Checkpoint
boolean enableIncrementalCheckpointing = true;
env.setStateBackend(new RocksDBStateBackend(checkpointPath, enableIncrementalCheckpointing));
Flink的重启恢复逻辑相对比较简单:
这样的机制可以保证Flink内部状态的Excatly-Once一致性。至于端到端的Exactly-Once一致性,要根据Source和Sink的具体实现而定,我们还会在第7章端到端Exactly-Once详细讨论。当发生故障时,一部分数据有可能已经流入系统,但还未进行Checkpoint,Source的Checkpoint记录了输入的Offset;当重启时,Flink能把最近一次的Checkpoint恢复到内存中,并根据Offset,让Source从该位置重新发送一遍数据,以保证数据不丢不重。像Kafka等消息队列是提供重发功能的,socketTextStream就不具有这种功能,也意味着不能保证端到端的Exactly-Once投递保障。
当一个作业出现故障,进行重启时,势必会暂停一段时间,这段时间上游数据仍然继续发送过来。作业被重新拉起后,肯定需要将刚才未处理的数据消化掉。这个过程可以被理解为,一次跑步比赛,运动员不慎跌倒,爬起来重新向前追击。为了赶上当前最新进度,作业必须以更快的速度处理囤积的数据。所以,在设定资源时,我们必须留出一定的富余量,以保证重启后这段“赶进度”过程中的资源消耗。
一般情况下,一个作业遇到一些异常情况会导致运行异常,潜在的异常情况包括:机器故障、部署环境抖动、流量激增、输入数据异常等。以输入数据异常为例,如果一个作业发生了故障重启,如果触发故障的原因没有根除,那么重启之后仍然会出现故障。因此,在解决根本问题之前,一个作业很可能无限次地故障重启,陷入死循环。为了避免重启死循环,Flink提供了三种重启策略:
Fixed Delay策略下,作业最多重启次数不会超过某个设定值,两次重启之间有一个可设定的延迟时间。超过最多重启次数后,该作业被认定为失败。两次重启之间有延迟,是考虑到一些作业与外部系统有连接,连接一般会设置超时,频繁建立连接对数据准确性和作业运行都不利。如果在程序中用代码配置,可以写为:
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这表示作业最多自动重启3次,两次重启之间有5秒的延迟。
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));
如果开启了Checkpoint,但没有设置重启策略,Flink会默认使用这个策略,最大重启次数为
Integer.MAX_VALUE
。
Failure Rate策略下,在设定的时间内,重启失败次数小于设定阈值,该作业继续重启,重启失败次数超出设定阈值,该作业被最终认定为失败。两次重启之间会有一个等待的延迟。在程序中用代码配置,可以写为:
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这表示在30秒的时间内,重启次数小于3次时,继续重启,否则认定该作业为失败。两次重启之间的延迟为3秒。
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(30),Time.seconds(3)));
No Restart策略下,一个作业遇到异常情况后,直接被判定为失败,不进行重启尝试。使用代码配置,可以写为:
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这表示作业发生异常直接失败
env.setRestartStrategy(RestartStrategies.noRestart());
Keyed State | Operator State | |
---|---|---|
适用算子类型 | 只适用于KeyedStream 上的算子 | 可以用于所有算子 |
状态分配 | 每个Key对应一个状态 | 一个算子子任务对应一个状态 |
创建和访问方式 | 重写Rich Function,通过里面的RuntimeContext访问 | 实现CheckpointedFunction 等接口 |
横向扩展 | 状态随着Key自动在多个算子子任务上迁移 | 有多种状态重新分配的方式 |
支持的数据结构 | ValueState、ListState、MapState等 | ListState、BroadcastState等 |
对于Keyed State,Flink提供了几种现成的数据结构供我们使用,包括ValueState、ListState等,他们的继承关系如下图所示。首先,State主要有三种实现,分别为ValueState、MapState和AppendingState、AppendingState又可以细分为ListState、ReducingState和AggregatingState。
这几个状态的具体区别在于:
Keyed State会帮助我们管理key,我们在使用时只需要关注value就可以。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,5000));
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用Transformation
SingleOutputStreamOperator<Tuple2<String,Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(String s) throws Exception {
return Tuple2.of(s,1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> map = wordAndOne.keyBy(0).map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private transient ValueState<Integer> counter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//想使用状态要先定义一个状态描述器(State的名称,类型)
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("wc-desc", Integer.class);
//初始化或恢复历史状态
counter = getRuntimeContext().getState(stateDescriptor);
}
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
Integer currentCount = input.f1;
//从ValueState中取出历史次数
Integer historyCount = counter.value();//获取当前key对应的value
if (historyCount == null) {
historyCount = 0;
}
Integer toutle = historyCount + currentCount;//累加
//更新内存中的状态
counter.update(toutle);
input.f1 = toutle;
return input;
}
});
map.print();
env.execute("");
}
相当于ValueState中存放Map
public static void main(String[] args) throws Exception {
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
//Source
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用Transformation
SingleOutputStreamOperator<Tuple3<String, String, Double>> tpDataStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String s) throws Exception {
String[] fields = s.split(",");
return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
}
});
KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple3<String, String, Double>> result = keyedStream.process(new KeyedProcessFunction<String, Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {
//transient表示该值不参与序列化反序列化
private transient MapState<String, Double> mapState;
@Override
public void open(Configuration parameters) throws Exception {
//定义一个状态描述器
MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<>("kv-state", String.class, Double.class);
//初始化或恢复历史状态
mapState = getRuntimeContext().getMapState(stateDescriptor);
}
@Override
public void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
String city = value.f1;
Double money = value.f2;
Double historyMoney = mapState.get(city);
if (historyMoney == null) {
historyMoney = 0.0;
}
Double totalMoney = historyMoney + money;
//更新State
mapState.put(city, totalMoney);
value.f2 = totalMoney;
out.collect(value);
}
});
result.print();
env.execute("StreamingWordCount");
}
相当于ValueState中存放List
public static void main(String[] args) throws Exception {
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
//Source
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用Transformation
SingleOutputStreamOperator<Tuple2<String, String>> tpDataStream = lines.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] fields = s.split(",");
return Tuple2.of(fields[0], fields[1]);
}
});
KeyedStream<Tuple2<String, String>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple2<String, List<String>>> process = keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, List<String>>>() {
private transient ListState<String> listState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("list_state", String.class);
//初始化或恢复状态
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
listState.add(value.f1);
Iterable<String> iterator = listState.get();
ArrayList<String> event = new ArrayList<>();
for (String name : iterator) {
event.add(name);
}
out.collect(Tuple2.of(value.f0, event));
}
});
process.print();
env.execute("StreamingWordCount");
}
通过ValueState中存放List来实现相同效果
public static void main(String[] args) throws Exception {
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
//Source
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用Transformation
SingleOutputStreamOperator<Tuple2<String, String>> tpDataStream = lines.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
String[] fields = s.split(",");
return Tuple2.of(fields[0], fields[1]);
}
});
KeyedStream<Tuple2<String, String>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple2<String, List<String>>> process = keyedStream.process(new KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, List<String>>>() {
private transient ValueState<List<String>> listValueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<List<String>> listValueStateDescriptor = new ValueStateDescriptor<>("lst_state", TypeInformation.of(new TypeHint<List<String>>(){}));
listValueState = getRuntimeContext().getState(listValueStateDescriptor);
}
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, List<String>>> out) throws Exception {
List<String> lst = listValueState.value();
if (lst == null){
lst = new ArrayList<String>();
}
lst.add(value.f1);
listValueState.update(lst);
out.collect(Tuple2.of(value.f0, lst));
}
});
process.print();
env.execute("StreamingWordCount");
}
实现的功能:
区别:这里每个定时器都是在SubTask中定期执行的,而Checkpoint是由JobManager发起的。
public static void main(String[] args) throws Exception {
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这表示作业最多自动重启3次,两次重启之间有5秒的延迟。
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));
//Source
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用Transformation
SingleOutputStreamOperator<String> wordDataStream = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] words = s.split(" ");
for (String word : words) {
if ("error".equals(word)){
throw new RuntimeException("出现异常!");
}
collector.collect(word);
}
}
});
//将单词和1组合
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = wordDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s, 1);
}
});
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tp) throws Exception {
return tp.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> map = keyed.map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
//存储中间结果的集合
private HashMap<String, Integer> counter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//获取当前SubTask的编号
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
File ckFile = new File("src/main/resources/myKeystate/"+indexOfThisSubtask);
if (ckFile.exists()){
FileInputStream fileInputStream = new FileInputStream(ckFile);
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream);
counter = (HashMap<String, Integer>)objectInputStream.readObject();
}else {
counter = new HashMap<>();
}
//简化直接在当前SubTask启动一个定时器
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
Thread.sleep(10000);
if (!ckFile.exists()){
ckFile.createNewFile();
}
//将HashMap对象持久化到文件中
ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream(ckFile));
objectOutputStream.writeObject(counter);
objectOutputStream.flush();
objectOutputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> input) throws Exception {
String word = input.f0;
Integer count = input.f1;
//从map中取出历史数据
Integer historyCount = counter.get(word);
if (historyCount == null) {
historyCount = 0;
}
int sum = historyCount + count;
counter.put(word, sum);
return Tuple2.of(word, sum);
}
});
map.print();
env.execute("StreamingWordCount");
}
状态从本质上来说,是Flink算子子任务的一种本地数据,为了保证数据可恢复性,使用Checkpoint机制来将状态数据持久化输出到存储空间上。状态相关的主要逻辑有两项:
Keyed State对这两项内容做了更完善的封装,开发者可以开箱即用。对于Operator State来说,每个算子子任务管理自己的Operator State,或者说每个算子子任务上的数据流共享同一个状态,可以访问和修改该状态。Flink的算子子任务上的数据在程序重启、横向伸缩等场景下不能保证百分百的一致性。换句话说,重启Flink作业后,某个数据流元素不一定流入重启前的算子子任务上。因此,使用Operator State时,我们需要根据自己的业务场景来设计Snapshot和Restore的逻辑。为了实现这两个步骤,Flink提供了最为基础的CheckpointedFunction接口类。
实现一个通过Operator State记录读取文件偏移量的Source:
package org.example.Restart;
import org.apache.commons.io.Charsets;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.io.RandomAccessFile;
/***********************************
*@Desc TODO
*@ClassName MyAtLeastOnceSource
*@Author DLX
*@Data 2021/5/18 13:09
*@Since JDK1.8
*@Version 1.0
***********************************/
public class MyAtLeastOnceSource extends RichParallelSourceFunction<String> implements CheckpointedFunction {
private Long offset = 0L;
private String path;
private boolean flag = true;
private transient ListState<Long> listState;
public MyAtLeastOnceSource(String path) {
this.path = path;
}
//相当于open方法在初始化状态或回复状态时执行一次
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<>("offset-state", Long.class);
listState = context.getOperatorStateStore().getListState(listStateDescriptor);
//当前的状态是否已经恢复了
if (context.isRestored()){
//从listState中恢复便宜量
Iterable<Long> iterable = listState.get();
for (Long aLong : iterable) {
offset = aLong;
}
}
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
RandomAccessFile randomAccessFile = new RandomAccessFile(path + "/" + indexOfThisSubtask + ".txt","r");
randomAccessFile.seek(offset);//从指定位置读取数据
while (flag){
String line = randomAccessFile.readLine();
if (line != null){
line = new String(line.getBytes(Charsets.ISO_8859_1),Charsets.UTF_8);
//对offset加锁,防止更新偏移量的时候进行checkPoint
synchronized (ctx.getCheckpointLock()){
offset = randomAccessFile.getFilePointer();
ctx.collect(indexOfThisSubtask+".txt:"+line);
}
}else {
Thread.sleep(1000);
}
}
}
//在做CheckPoint时周期性执行
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//定期更新OperatorState
listState.clear();
listState.add(offset);
}
@Override
public void cancel() {
flag = false;
}
}
main方法:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//对状态做快照
env.setParallelism(4);
env.enableCheckpointing(30000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<String> errorStream = socketStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value.startsWith("error")) {
int i = 10 / 0;
}
return value;
}
});
DataStreamSource<String> dataStreamSource = env.addSource(new MyAtLeastOnceSource("D:\\IntelliJ_IDEA_Code_Space\\DataDemo\\input"));
//如果不union在一起当出现异常时,上面的流会重启下面的流不会重启
DataStream<String> union = errorStream.union(dataStreamSource);
union.print();
env.execute("");
}
任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值,这会在后面详述。
所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。
在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:
public static void main(String[] args) throws Exception {
//创建Flink流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
//Source
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
//调用Transformation
SingleOutputStreamOperator<Tuple3<String, String, Double>> tpDataStream = lines.map(new MapFunction<String, Tuple3<String, String, Double>>() {
@Override
public Tuple3<String, String, Double> map(String s) throws Exception {
String[] fields = s.split(",");
return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));
}
});
KeyedStream<Tuple3<String, String, Double>, String> keyedStream = tpDataStream.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple3<String, String, Double>> result = keyedStream.process(new KeyedProcessFunction<String, Tuple3<String, String, Double>, Tuple3<String, String, Double>>() {
//transient表示该值不参与序列化反序列化
private transient MapState<String, Double> mapState;
@Override
public void open(Configuration parameters) throws Exception {
//定义一个TTLConfig
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//定义一个状态描述器
MapStateDescriptor<String, Double> stateDescriptor = new MapStateDescriptor<>("kv-state", String.class, Double.class);
//关联状态描述器
stateDescriptor.enableTimeToLive(ttlConfig);
//初始化或恢复历史状态
mapState = getRuntimeContext().getMapState(stateDescriptor);
}
@Override
public void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple3<String, String, Double>> out) throws Exception {
String city = value.f1;
Double money = value.f2;
Double historyMoney = mapState.get(city);
if (historyMoney == null) {
historyMoney = 0.0;
}
Double totalMoney = historyMoney + money;
//更新State
mapState.put(city, totalMoney);
value.f2 = totalMoney;
out.collect(value);
}
});
result.print();
env.execute("StreamingWordCount");
}
TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。
TTL 的更新策略(默认是 OnCreateAndWrite):
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。
注意:
默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground()
.build();
可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。
另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
注意:
另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
该特性可以通过 StateTtlConfig 进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally(10, true)
.build();
该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
注意:
如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
该特性可以通过 StateTtlConfig 进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();
Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
你还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
注意:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。