赞
踩
1、Broadcast State 案例
规则流:1,a,b [规则名1 规则为 a 或 b]
图形流:green,a [绿色 a]
问题:如果规则流先于数据流则匹配不上=>此时缓冲数据流中的数据【如果规则流为null】
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import java.util.*;
public class _06_BroadcastState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收广播的规则数据
SingleOutputStreamOperator<_06_Rule> ruleStream = env.socketTextStream("localhost", 8888)
.map(new MapFunction<String, _06_Rule>() {
@Override
public _06_Rule map(String value) throws Exception {
String[] fields = value.split(",");
return new _06_Rule(fields[0], new Tuple2<>(fields[1], fields[2]));
}
});
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<String, _06_Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<_06_Rule>() {
}));
// 广播流,广播规则并且创建 broadcast state
BroadcastStream<_06_Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
// 接收图形数据
SingleOutputStreamOperator<_06_Shape> shapeStream = env.socketTextStream("localhost", 9999)
.map(new MapFunction<String, _06_Shape>() {
@Override
public _06_Shape map(String value) throws Exception {
String[] fields = value.split(",");
return new _06_Shape(fields[0], fields[1]);
}
});
shapeStream.keyBy(_06_Shape::getColour)
.connect(ruleBroadcastStream)
.process(new KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>() {
// private transient ValueState<List<_06_Shape>> dataBuffer;
private transient ListState<_06_Shape> dataBuffer;
@Override
public void open(Configuration parameters) throws Exception {
// ValueStateDescriptor<List<_06_Shape>> dataListStateDescriptor = new ValueStateDescriptor<>("dataBuffer", TypeInformation.of(new TypeHint<List<_06_Shape>>() {
// }));
//
// dataBuffer = getRuntimeContext().getState(dataListStateDescriptor);
ListStateDescriptor<_06_Shape> listStateDescriptor = new ListStateDescriptor<>("dataBuffer", _06_Shape.class);
dataBuffer = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement(_06_Shape value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
// 获取广播的规则数据
System.out.println("输入的数据颜色为=>" + value.getColour() + ",类型为=>" + value.getType());
ReadOnlyBroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
Iterator<Map.Entry<String, _06_Rule>> iterator = broadcastState.immutableEntries().iterator();
if (iterator.hasNext()) {
// 使用 ValueState
// 先从缓存中读取数据进行匹配
// List<_06_Shape> shapeList = dataBuffer.value();
// 多并行度时,防止某个并行度无数据导致报错
// if (shapeList != null) {
// if (!shapeList.isEmpty()) {
// for (_06_Shape shape : shapeList) {
// System.out.println("被缓冲的数据开始进行处理=>" + shape);
// // 从事件数据中继续匹配
// while (iterator.hasNext()) {
// Map.Entry<String, _06_Rule> rule = iterator.next();
// if (Objects.equals(rule.getValue().getRule().f0, shape.getType()) || Objects.equals(rule.getValue().getRule().f1, shape.getType())) {
// out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
// }
// }
// }
//
// shapeList.clear();
// }
// }
// 使用 ListState
Iterator<_06_Shape> dataIterator = dataBuffer.get().iterator();
while (dataIterator.hasNext()){
_06_Shape shape = dataIterator.next();
System.out.println("被缓冲的数据开始进行处理=>" + shape);
while (iterator.hasNext()) {
Map.Entry<String, _06_Rule> rule = iterator.next();
if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {
out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
}
}
dataIterator.remove();
}
// 从事件数据中继续匹配
while (iterator.hasNext()) {
Map.Entry<String, _06_Rule> rule = iterator.next();
if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {
out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
}
}
} else {
System.out.println("此时规则流中无规则,先缓冲数据流");
// 使用 listState
dataBuffer.add(value);
// 使用 valueState
// List<_06_Shape> shapeList = dataBuffer.value();
// if (shapeList == null) {
// shapeList = new ArrayList<>();
// }
// shapeList.add(value);
// dataBuffer.update(shapeList);
}
}
@Override
public void processBroadcastElement(_06_Rule value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.Context ctx, Collector<String> out) throws Exception {
// 获取广播流输入的数据,存入广播状态
System.out.println("输入的规则名称为=>" + value.getRuleName() + ",规则为=>" + value.getRule());
BroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
broadcastState.put(value.getRuleName(), value);
}
})
.print();
env.execute();
}
}
2、Pojo 类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class _06_Rule implements Serializable {
private String ruleName;
private Tuple2<String,String> rule;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class _06_Shape implements Serializable {
private String colour;
private String type;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。