当前位置:   article > 正文

flink广播状态详解_flink 广播状态

flink 广播状态

接下来来讲讲flink中的广播状态

  1. /**
  2. * 搜索用户匹配行为 使用广播状态
  3. * 去匹配比如该用户是否是先login再buy 或者先login再pay这种组合行为
  4. */
  5. case class Action(userid:String,action:String)
  6. case class Pattern1(act1:String,act2:String)
  7. object BroadcastStateExample {
  8. def main(args: Array[String]): Unit = {
  9. val env = StreamExecutionEnvironment.getExecutionEnvironment
  10. env.setParallelism(1)
  11. //定义一个实时的用户行为流
  12. val actionDS = env.fromElements(
  13. Action("zhangsan","login"),
  14. Action("zhangsan","buy"),
  15. Action("lisi","login"),
  16. Action("lisi","pay")
  17. )
  18. //定义一个行为模式流,代表了要检查的标准
  19. val patternDS = env.fromElements(
  20. Pattern1("login","buy"),
  21. Pattern1("login","pay")
  22. )
  23. //建造广播流的描述器 广播流底层是kv形式 所以也必须用mapState的解释器
  24. // 这里的两个类型分别指的是map的keyvalue的类型 因为这里无所谓key 所以也可以写成unit类和Pattern1类 写String也没事
  25. val msd = new MapStateDescriptor[String, Pattern1]("pattern", classOf[String], classOf[Pattern1])
  26. //按照描述器制造1个广播流
  27. val broadcastDS = patternDS.broadcast(msd)
  28. //将事件流和广播流链接在一起
  29. actionDS.keyBy(_.userid).connect(broadcastDS).process(new PatternFilter).print()
  30. env.execute()
  31. }
  32. }
  33. class PatternFilter extends KeyedBroadcastProcessFunction[String,Action,Pattern1,(String,Pattern1)]{
  34. //定义值状态 保存上一次的用户行为
  35. private lazy val preAct: ValueState[String] = getRuntimeContext.getState(new ValueStateDescriptor[String]("pre_action", classOf[String]))
  36. override def processElement(in1: Action, readOnlyContext: KeyedBroadcastProcessFunction[String, Action, Pattern1, (String, Pattern1)]#ReadOnlyContext, collector: Collector[(String, Pattern1)]): Unit = {
  37. //获得广播流
  38. val bcDS = readOnlyContext.getBroadcastState(new MapStateDescriptor[String,Pattern1]("pattern",classOf[String],classOf[Pattern1]))
  39. //查看用户属于哪种行为
  40. if(bcDS.contains(s"${preAct.value()}_${in1.action}")){
  41. collector.collect((in1.userid,bcDS.get(s"${preAct.value()}_${in1.action}")))
  42. //清除状态
  43. preAct.clear()
  44. }
  45. //记录本次用户行为
  46. preAct.update(in1.action)
  47. }
  48. //优先执行 广播流方法?因为要先加载完广播流
  49. override def processBroadcastElement(in2: Pattern1, context: KeyedBroadcastProcessFunction[String, Action, Pattern1, (String, Pattern1)]#Context, collector: Collector[(String, Pattern1)]): Unit = {
  50. //获取广播流
  51. val bcState = context.getBroadcastState(new MapStateDescriptor[String,Pattern1]("pattern",classOf[String],classOf[Pattern1]))
  52. //将你获得的规则数据存放到广播流中
  53. bcState.put(s"${in2.act1}_${in2.act2}",in2)
  54. }
  55. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/891447
推荐阅读
相关标签
  

闽ICP备14008679号