赞
踩
流式计算逻辑中,比如sum,max; 需要记录和后面计算使用到一些历史的累计数据,
状态就是:用户在程序逻辑中用于记录信息的变量
在Flink 中 ,状态state 不仅仅是要记录状态;在程序运行中如果失败,是需要重新恢复,所以这个状态也是需要持久化;一遍后续程序继续运行
我们自定义变量来保存数据
public class _01_status_row { public static void main(String[] args) throws Exception { // 获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000); DataStream<String> dataStream = dataStreamSource.map(new MapFunction<String, String>() { //自己定义的 变量来保存中间值:这里就无法有效的持久化和恢复 //状态: raw state 状态 String oldString = ""; //如何让flink 来托管我们的状态变量,完成持久化和恢复?? @Override public String map(String value) throws Exception { oldString = oldString + value; return oldString; } }); dataStream.print(); env.execute(); } }
flink 提供了内置的状态数据管理机制,也叫状态机制: 状态一致性维护,状态数据的访问和存储;
Flink 任务是一个JOB .JOB 范围很多Task ,Task 对应示例subtask
是subtask 出错的时候,flink 底层会自动的从帮我们恢复task 的运行
如果是Job失败了 从 flink state 恢复,需要在特殊指定一些参数
算子状态:
键控状态 Keyed State
更多的使用场景是键控状态 Keyed State
每个subtask 自己持有一份独立的状态数据;算子状态,在逻辑上,由算子 task下所有subtask共享;
如何理解:正常运行时,subtask自己读写自己的状态数据;而一旦job重启且带状态算子发生了并行度的变化,则之前的状态数据将在新的一批subtask 间均匀分配
public class _02_operator_flink_status { public static void main(String[] args) throws Exception { // 获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //=============配置 =============== //需要开启 Checkpoint 机制 env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); //需要开启持久化的路径 可选hdfs 本地 env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout2/"); //task级别的failover //一个task 失败 job 失败 ,有很多重启策略 //env.setRestartStrategy(RestartStrategies.noRestart()); //task 失败 重启最多3次 , 失败后1秒重启 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000)); //=============配置 =============== DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000); DataStream<String> dataStream = dataStreamSource.map(new StateMapFunction()); dataStream.print(); env.execute(); } } class StateMapFunction implements MapFunction<String,String> , CheckpointedFunction { ListState<String> listState; //正常的处理逻辑 @Override public String map(String value) throws Exception { listState.add(value); Iterable<String> strings = listState.get(); StringBuilder sb = new StringBuilder(); for (String string : strings) { sb.append(string); } //写一个异常 if(value.length()==5){ int a = 1/ 0; } return sb.toString(); } //持久化之前会调用的方法 @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { long checkpointId = context.getCheckpointId(); System.out.println("执行快照!!!!!"+ checkpointId); } //算子的任务在启动之前,会调用下面的方法,为用户的状态初始化 @Override public void initializeState(FunctionInitializationContext context) throws Exception { //context 获取状态存储器 OperatorStateStore operatorStateStore = context.getOperatorStateStore(); //定义一个昨天存储结构的描述器 ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class); //获取状态存储器 中获取容器来存储器 //getListState 方法还会加载之前存储的状态数据 listState = operatorStateStore.getListState(listStateDescriptor); } }
不同点:
算子状态中,一个算子有一个状态存储空间
Keyed State:每个Key 都是有自己的状态存储空间
public class _03_keyed_flink_status { public static void main(String[] args) throws Exception { // 获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //需要开启 Checkpoint 机制 env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); //需要开启持久化的路径 可选hdfs 本地 env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout4/"); //task级别的failover //一个task 失败 job 失败 env.setRestartStrategy(RestartStrategies.noRestart()); //task 失败 重启最多3次 , 失败后1秒重启 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000)); DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000); DataStream<String> dataStream = dataStreamSource.keyBy(x -> x) .map(new KeyedStateMapFunction()).setParallelism(2); dataStream.print("===>").setParallelism(3); env.execute(); } } //flink 状态管理 算子需要实现CheckpointedFunction class KeyedStateMapFunction extends RichMapFunction<String, String>{ ListState<String> listState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); RuntimeContext runtimeContext = getRuntimeContext(); ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class); listState = runtimeContext.getListState(listStateDescriptor); } //正常的处理逻辑 @Override public String map(String value) throws Exception { listState.add(value); Iterable<String> strings = listState.get(); StringBuilder sb = new StringBuilder(); for (String string : strings) { sb.append(string); } //写一个异常 if(value.length()==5){ int a = 1/ 0; } return sb.toString(); } } //====== [root@localhost ~]# nc -lk 9000 a a a b b b c c c c d d d 控制台数据输出为 ===>:2> a ===>:3> aa ===>:1> aaa ===>:1> b ===>:2> bb ===>:3> bbb ===>:1> c ===>:2> cc ===>:3> ccc ===>:1> cccc ========> 每个key 都有一个自己的ListState<String> listState;
class KeyedStateMapFunction_2 extends RichMapFunction<String, String>{ ValueState<String> valueState; ListState<String> listState; MapState<String, String> mapState; ReducingState<Integer> reducingState; AggregatingState<Integer, Double> aggState; @Override public void open(Configuration parameters) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); //单值状态存储器 valueState = runtimeContext.getState(new ValueStateDescriptor<String>("string", String.class)); //列表状态存储器 listState = runtimeContext.getListState(new ListStateDescriptor<>("list", String.class)); //map 状态存储器 mapState = runtimeContext.getMapState(new MapStateDescriptor<String, String>("map", String.class, String.class)); //做累加 reduce reducingState = runtimeContext.getReducingState(new ReducingStateDescriptor<Integer>("reduce", new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1+value2; } }, Integer.class)); //记录聚合状态 --> 平均值 AggregatingState<Integer, Double> aggState = runtimeContext.getAggregatingState(new AggregatingStateDescriptor<>("aggState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() { @Override public Tuple2<Integer, Integer> createAccumulator() { return Tuple2.of(0, 0); } @Override public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) { return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1); } @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { return Double.valueOf(accumulator.f1 / accumulator.f0); } //批处理会使用 @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) { return Tuple2.of(a.f0 + b.f0, b.f0 + b.f1); } }, TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() { }))); } //正常的处理逻辑 @Override public String map(String value) throws Exception { //valueState valueState.update("new value");//更新值 String value1 = valueState.value();//q取值 //listState listState.add(value); //添加一个数据 listState.addAll(Arrays.asList("1","2")); //添加多个数据 listState.update(Arrays.asList("1","2")); //替换原有数据 //mapState Iterable<String> keys = mapState.keys(); boolean contains = mapState.contains("1"); mapState.put("1","2"); //添加数据 Map<String,String> map = new HashMap<>(); map.put("1","2"); mapState.putAll(map);//批量添加数据 //reducingState //做累加 reducingState.add(Integer.valueOf(value)); Integer integer = reducingState.get(); //取值 //计算平均值 aggState.add(Integer.valueOf(value)); Double aDouble = aggState.get();//取值 return value1; } }
RuntimeContext runtimeContext = getRuntimeContext(); //单值状态存储器 ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("string", String.class); //存活时间和过期 参考 StateTtlConfig build = StateTtlConfig.newBuilder(Time.milliseconds(5000)) //数据存活时间 .setTtl(Time.milliseconds(5000)) //数据存活时间 和上面效果一样 .updateTtlOnCreateAndWrite() //插入和更新时 TTL 重新计算存活时间 .updateTtlOnReadAndWrite() //读或者写 TTL 重新计算存活时间 //比如List 是单条数据 Map 则是一个Key value 是一个单独的TTL .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //返回已经过期的数据 .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) //没清楚可以返回过期数据 .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)//TTL处理时间语义 .useProcessingTime() //效果同上 .cleanupFullSnapshot()//清理过期状态数据 在checkpoint 的时候 .cleanupInRocksdbCompactFilter(1000) //只对rocksdb 生效 在rockdb Compact机制在Compact 时过期时间清理 .build(); valueStateDescriptor.enableTimeToLive(build); valueState = runtimeContext.getState(valueStateDescriptor);
状态数据的存储管理的实现,状态数据的本地读写,远端快照数据存储
状态后端是可插拔替换的,它对上层屏蔽了底层的差异,因为在更换状态后端时,用户的代码不需要做任何更改
HashMapStateBacked
EmbeddedRocksDBStateBackend
两种状态后端策略 生成快照checkpoint 文件是一样的 ,重启后改变StateBacked 可以兼容运行;程序在重启后改变状态后端的方式不影响程序运行;
// HashMapStateBacked
env.setStateBackend(new HashMapStateBackend());
//EmbeddedRocksDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend());
前面章节说的流的join 的时候 广播就使用到了 broadcast state
Flink 学习三 Flink 流&process function API
==> 1.7.broadcast
new BroadcastProcessFunction();
状态后端的方式不影响程序运行;**
// HashMapStateBacked
env.setStateBackend(new HashMapStateBackend());
//EmbeddedRocksDBStateBackend
env.setStateBackend(new EmbeddedRocksDBStateBackend());
前面章节说的流的join 的时候 广播就使用到了 broadcast state
Flink 学习三 Flink 流&process function API
==> 1.7.broadcast
new BroadcastProcessFunction();
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。