赞
踩
关于两条流的连接,还有一种比较特殊的用法:DataStream 调用.connect()方法时,传入的参数也可以不是一个 DataStream,而是一个“广播流”(BroadcastStream),这时合并两条流得到的就变成了一个“广播连接流”(BroadcastConnectedStream)。
从概念和原理上讲,广播状态非常容易理解:状态广播出去,所有并行子任务的状态都是相同的;并行度调整时只要直接复制就可以了。
让所有并行子任务都持有同一份状态,也就意味着一旦状态有变化,所以子任务上的实例都要更新。
什么时候会用到这样的广播状态呢?
一个最为普遍的应用,就是“动态配置”或者“动态规则”。我们在处理流数据时,有时会基于一些配置(configuration)或者规则(rule)。
用流处理事件驱动思想,将配置数据也看作一条流,这条流和要处理的数据流进行连接,就可以进行实时配置计算了。
由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务需要把它作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就是一个典型的广播状态。
在代码上,可以直接调用 DataStream 的.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor)说明状态的名称和类型,就可以得到一个“广播流”(BroadcastStream);进而将要处理的数据流与这条广播流进行连接(connect),就会得到“广播连接流”(BroadcastConnectedStream)。
注意:广播状态只能用在广播连接流中。
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
DataStream<String> output = stream.connect(ruleBroadcastStream)
.process( new BroadcastProcessFunction<>() {...} );
这里我们定义了一个“规则流”ruleStream,里面的数据表示了数据流 stream 处理的规则,规则的数据类型定义为 Rule。于是需要先定义一个 MapStateDescriptor 来描述广播状态,然后传入 ruleStream.broadcast()得到广播流,接着用 stream 和广播流进行连接。这里状态描述器中的 key 类型为 String,就是为了区分不同的状态值而给定的 key 的名称。对 于 广 播 连 接 流 调 用 .process() 方 法 , 可 以 传 入 “ 广 播 处 理 函 数 ”KeyedBroadcastProcessFunction 或者 BroadcastProcessFunction 来进行处理计算。广播处理函数里面有两个方法.processElement()和.processBroadcastElement(),源码中定义如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
...
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
...
}
这里的.processElement()方法,处理的是正常数据流,第一个参数 value 就是当前到来的流数据;而.processBroadcastElement()方法就相当于是用来处理广播流的,它的第一个参数 value就是广播流中的规则或者配置数据。两个方法第二个参数都是一个上下文 ctx,都可以通过调用.getBroadcastState()方法获取到当前的广播状态;区别在于,.processElement()方法里的上下文 是 “ 只 读 ” 的 ( ReadOnly ), 因 此 获 取 到 的 广 播 状 态 也 只 能 读 取 不 能 更 改 ;而.processBroadcastElement()方法里的 Context 则没有限制,可以根据当前广播流中的数据更新状态。
Rule rule = ctx.getBroadcastState( new MapStateDescriptor<>("rules", Types.String, Types.POJO(Rule.class))).get("my rule");
通过调用 ctx.getBroadcastState()方法,传入一个 MapStateDescriptor,就可以得到当前的叫作“rules”的广播状态;调用它的.get()方法,就可以取出其中“my rule”对应的值进行计算处理。
接举一个广播状态的应用案例。考虑在电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”或者“登录-支付”,检测出这些连续的行为进行统计,就可以了解平台的运用状况以及用户的行为习惯。
public class BehaviorPatternDetectExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //用户行为数据流 DataStreamSource<Action> actionStream = env.fromElements( new Action("Alice", "login"), new Action("Alice", "pay"), new Action("Bob", "login"), new Action("Bob", "order") ); //行为模式流,基于它构建广播流 DataStreamSource<Pattern> patternStream = env.fromElements( new Pattern("login", "pay"), new Pattern("login", "order") ); //定义广播状态描述器 MapStateDescriptor<Void,Pattern> descriptor = new MapStateDescriptor<Void,Pattern>("pattern", Types.VOID,Types.POJO(Pattern.class)); BroadcastStream<Pattern> broadcastStream = patternStream.broadcast(descriptor); //连接两条流进行处理 SingleOutputStreamOperator<Tuple2<String,Pattern>> matches = actionStream.keyBy(data -> data.userId) .connect(broadcastStream) .process(new PatterDetector()); matches.print(); env.execute(); } //实现自定义的KeyedBroadcastProcessFunction public static class PatterDetector extends KeyedBroadcastProcessFunction<String,Action,Pattern,Tuple2<String,Pattern>>{ //定义一个KeyedState 保存上一次用户行为 public ValueState<String> prevActionState; @Override public void open(Configuration parameters) throws Exception { prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-action",String.class)); } @Override public void processElement(Action value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.ReadOnlyContext ctx, Collector<Tuple2<String, Pattern>> out) throws Exception { // 从广播状态中匹配模式 (只读状态) ReadOnlyBroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<Void, Pattern>("pattern", Types.VOID, Types.POJO(Pattern.class))); Pattern pattern = patternState.get(null); // 获取用户上一次的行为 String prevAction = prevActionState.value(); //判断是否匹配 if (pattern != null && prevAction != null){ if (pattern.action1.equals(prevAction) && pattern.action2.equals(value.action)){ //匹配后输出key和它对应的pattern里的模型 out.collect(new Tuple2<>(ctx.getCurrentKey(),pattern)); } } //更新状态 prevActionState.update(value.action); } @Override public void processBroadcastElement(Pattern value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.Context ctx, Collector<Tuple2<String, Pattern>> out) throws Exception { //从上下文中获取广播状态,并用当前数据更新状态 BroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<Void, Pattern>("pattern", Types.VOID, Types.POJO(Pattern.class))); //更新当前的广播状态 patternState.put(null,value); } } //定义用户行为事件和模式的POJO类 public static class Action{ public String userId; //id public String action; //行为 public Action() {} public Action(String userId, String action) { this.userId = userId; this.action = action; } @Override public String toString() { return "Action[" + "userId='" + userId + '\'' + ", action='" + action + '\'' + ']'; } } public static class Pattern{ public String action1; //行为 1 public String action2; //行为 2 public Pattern() {} public Pattern(String action1, String action2) { this.action1 = action1; this.action2 = action2; } @Override public String toString() { return "Pattern[" + "action1='" + action1 + '\'' + ", action2='" + action2 + '\'' + ']'; } } }
我们将检测的行为模式定义为 POJO 类 Pattern,里面包含了连续的两个行为。由于广播状态中只保存了一个 Pattern,并不关心 MapState 中的 key,所以也可以直接将 key 的类型指定为 Void,具体值就是 null。在具体的操作过程中,我们将广播流中的 Pattern 数据保存为广播变量;在行为数据 Action 到来之后读取当前广播变量,确定行为模式,并将之前的一次行为保存为一个 ValueState——这是针对当前用户的状态保存,所以用到了 Keyed State。检测到如果前一次行为与 Pattern 中的 action1 相同,而当前行为与 action2 相同,则发现了匹配模式的一组行为,输出检测结果。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。