赞
踩
接下来来讲讲flink中的广播状态
- /**
- * 搜索用户匹配行为 使用广播状态
- * 去匹配比如该用户是否是先login再buy 或者先login再pay这种组合行为
- */
- case class Action(userid:String,action:String)
- case class Pattern1(act1:String,act2:String)
- object BroadcastStateExample {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- //定义一个实时的用户行为流
- val actionDS = env.fromElements(
- Action("zhangsan","login"),
- Action("zhangsan","buy"),
- Action("lisi","login"),
- Action("lisi","pay")
- )
-
- //定义一个行为模式流,代表了要检查的标准
- val patternDS = env.fromElements(
- Pattern1("login","buy"),
- Pattern1("login","pay")
- )
-
- //建造广播流的描述器 广播流底层是kv形式 所以也必须用mapState的解释器
- // 这里的两个类型分别指的是map的key和value的类型 因为这里无所谓key 所以也可以写成unit类和Pattern1类 写String也没事
- val msd = new MapStateDescriptor[String, Pattern1]("pattern", classOf[String], classOf[Pattern1])
- //按照描述器制造1个广播流
- val broadcastDS = patternDS.broadcast(msd)
-
- //将事件流和广播流链接在一起
- actionDS.keyBy(_.userid).connect(broadcastDS).process(new PatternFilter).print()
-
- env.execute()
- }
- }
- class PatternFilter extends KeyedBroadcastProcessFunction[String,Action,Pattern1,(String,Pattern1)]{
- //定义值状态 保存上一次的用户行为
- private lazy val preAct: ValueState[String] = getRuntimeContext.getState(new ValueStateDescriptor[String]("pre_action", classOf[String]))
-
-
- override def processElement(in1: Action, readOnlyContext: KeyedBroadcastProcessFunction[String, Action, Pattern1, (String, Pattern1)]#ReadOnlyContext, collector: Collector[(String, Pattern1)]): Unit = {
- //获得广播流
- val bcDS = readOnlyContext.getBroadcastState(new MapStateDescriptor[String,Pattern1]("pattern",classOf[String],classOf[Pattern1]))
- //查看用户属于哪种行为
- if(bcDS.contains(s"${preAct.value()}_${in1.action}")){
- collector.collect((in1.userid,bcDS.get(s"${preAct.value()}_${in1.action}")))
- //清除状态
- preAct.clear()
- }
- //记录本次用户行为
- preAct.update(in1.action)
- }
-
- //优先执行 广播流方法?因为要先加载完广播流
- override def processBroadcastElement(in2: Pattern1, context: KeyedBroadcastProcessFunction[String, Action, Pattern1, (String, Pattern1)]#Context, collector: Collector[(String, Pattern1)]): Unit = {
- //获取广播流
- val bcState = context.getBroadcastState(new MapStateDescriptor[String,Pattern1]("pattern",classOf[String],classOf[Pattern1]))
- //将你获得的规则数据存放到广播流中
- bcState.put(s"${in2.act1}_${in2.act2}",in2)
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。