赞
踩
广播状态:把状态广播出去
状态是一个数据流中计算过程中存储的数据,它只是在自己的数据流中存在,如果别的数据流也想使用你这个数据流中的状态那就需要你将你的数据流中的状态进行广播共享,之后才能使用
需求将数据流2中的状态广播给数据流1使用
步骤:
注意:
广播状态根据使用的时候,处理的数据流的不同分为了两种
一种是要处理的数据流是通过keyby生成的keyedStream,另一种直接处理的数据流没有使用keyby进行分区
根据这两种数据流的状态不同,在使用的时候就得使用不同的process方法进行处理
package gbgx.operator import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState} import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.scala.{BroadcastConnectedStream, DataStream, StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.util.Collector object Pinglun { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //评论数据流 val plstream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9999) //规则流 val rule: DataStream[String] = environment.socketTextStream("192.168.229.10", 9998) //广播流,(将评论流里需要的数据放在状态中,通过流发出去,这个流就是广播流) val zztmsz = new MapStateDescriptor[String, String]("map", createTypeInformation[String], createTypeInformation[String]) val gbStream: BroadcastStream[String] = rule.broadcast(zztmsz) //广播连接流(评论流需要规则流里面的数据,规则流把这些数据通过 广播流广播了出来,那评论流就用从广播流中的数据,他俩一结合就形成了广播连接 流) val gbljStream: BroadcastConnectedStream[String, String] = plstream.connect(gbStream) //当评论流拿到规则后就可以根据规则进行数据的处理了,那他们两个流合在了一起,也就是说要拿着这个广播连接流进行处理,进行处理就是用这个process这个方法 //这个方法需要传入一个方法作为参数,也就是说要通过怎样的方法对 数据流进行处理,然后自己就自定义一个类去实现 具体怎么处理这个流中的数据 //以构造参数的形式把上面定义广播流时候定义的状态描述者传过去,用来保证评论流和规则流使用时用的是同一个状态描述者 //通过自定义的方法计算完之后得到计算完后的数据 val value: DataStream[String] = gbljStream.process(new MyoperFunction(zztmsz)) //然后拿着计算完后的数据输出进行验证一下 value.print() environment.execute("pinglun") } } //定义一个类继承BroadcastProcessFunction这个方法, // 里面三个参数,分别是原始数据流的类型(评论流),规则流数据类型(规则流)、和要输出的类型 , // 然后实现里面的两个方法 //在评论流使用规则流广播出来的状态的时候,必须保证他们指的是同一个状态, //也就是说这个类中的两个方法在处理的时候他们所指的状态描述者应该说的是同一个状态描述者。这个可以通过传入的参数确定这个唯一的状态描述者 class MyoperFunction(zztmsz:MapStateDescriptor[String,String]) extends BroadcastProcessFunction[String,String,String]{ /** * processElement这个方法是用来处理 高吞吐量流的,也就是评论流 * @param in1 这个参数指的是评论流(高吞吐量流)中发送过来的数据 * @param readOnlyContext 通过上下文获取到的制度的状态,它是只读的,因为这个状态来自于规则流中广播出来的状态,评论流只能有读取使用的权利,没有更改的权利 * @param collector 这个是用来 做输出的。想把数据输出出去就用这个来完成 */ override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, collector: Collector[String]): Unit = { //从状态中获取到规则流存入的数据,以供后续业务使用 val plzt: ReadOnlyBroadcastState[String, String] = readOnlyContext.getBroadcastState(zztmsz) //然后就判断评论流中的数据中有没有需要进行过滤的单词,如果有就获取到这个的单词然后进行替换,如果没有就不做处理 if (plzt.contains("keyword")){ //这个str就是评论流中包含有规则中关键字的单词 val str: String = plzt.get("keyword") //然后替换,生成新的str1 val str1: String = str.replace(str, "***") //替换完成之后就把它发送出去 collector.collect(str1) }else{ collector.collect(in1) } } /** * processBroadcastElement这个方法是来处理低吞吐量流的(规则流),因为规则流的数据少,评论流里面的数据多,所以称评论流为高吞吐量流,规则流为低吞吐量流 * @param in2 这个字的是低吞吐量流传过来的数据 * @param context 运行时上下文 * @param collector 输出 */ override def processBroadcastElement(in2: String, context: BroadcastProcessFunction[String, String, String]#Context, collector: Collector[String]): Unit = { //将规则流中的数据存入状态,以供后续评论流使用 val gzzt: BroadcastState[String, String] = context.getBroadcastState(zztmsz) gzzt.put("keyword",in2) } }
在简单的认识了广播变量的使用后进行业务的完善
package gbgx.operator import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState} import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * 对之前的代码进行改进 * 1.先说一下上一个代码干了什么事情 * 它说的是有两个数据流,一个数据流中是过来的评论,另一个数据流中过来的是规则,要做到评论中如果有规则流里面的关键字就进行 * 替换 * 2.存在的问题 * 2.1.规则流是不固定的,第二个规则关键字进来了之后就会更新状态,也就是说刚才上一个规则被覆盖掉了,现在只处理当前状态的规则 * 2.2.规则是一个Map存的键值对,不能同时存储多个规则,也不能对规则进行添加或者删除,不够灵活 * * 3.改进方案, * 3.1,实现可以添加规则,也可以删除规则 * 在规则流输入的时候进行字符串前缀判断,用来辨别是增加规则还是删除规则 * 3.2,实现规则中 可以存储多个数据 * 在Map中本来存的是k-v键值对,,将v改成一个List集合就可以存储多个规则了 */ object Pinglun2 { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val plstream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9999) val rule: DataStream[String] = environment.socketTextStream("192.168.229.10", 9998) //这次输入的规则流中的数据需要带一个标识符,用来表示这个规则是增加的还是删除的;例如:word + 这就是说增加一个规则,word - 这是删除这个规则 //将规则流中输入进来的数据进行映射成一个元祖,第一个是word关键字,第二个是标记操作类型,是增加还是删除规则 val rule2: DataStream[(String, String)] = rule.map(_.split(" ")).map(v => (v(0), v(1))) //为了可以存储多个规则,Map中的值就设置为List类型 val zztmsz = new MapStateDescriptor[String, List[String]]("map", createTypeInformation[String], createTypeInformation[List[String]]) val gbStream: BroadcastStream[(String, String)] = rule2.broadcast(zztmsz) val gbljStream: BroadcastConnectedStream[String, (String, String)] = plstream.connect(gbStream) val value: DataStream[String] = gbljStream.process(new MyoperFunction2(zztmsz)) value.print() environment.execute("pinglun") } } class MyoperFunction2(zztmsz:MapStateDescriptor[String,List[String]]) extends BroadcastProcessFunction[String,(String,String),String]{ override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, (String, String), String]#ReadOnlyContext, collector: Collector[String]): Unit = { val plzt: ReadOnlyBroadcastState[String, List[String]] = readOnlyContext.getBroadcastState(zztmsz) if (plzt.contains("keyword")){ val list: List[String] = plzt.get("keyword") val resault: String = list.fold(in1)((result, elem) => result.replace(elem, "**")) collector.collect(resault) }else{ collector.collect(in1) } } override def processBroadcastElement(in2: (String, String), context: BroadcastProcessFunction[String, (String, String), String]#Context, collector: Collector[String]): Unit = { val gzzt: BroadcastState[String, List[String]] = context.getBroadcastState(zztmsz) if (gzzt.contains("keyword")){ val list: List[String] = gzzt.get("keywor") if("+".equals(in2._2)){ val list1: List[String] = list :+ in2._1 gzzt.put("keyword",list1) }else if("-".equals(in2._2)){ val list1: List[String] = list.filter(!_.equals(in2._1)) gzzt.put("keyword",list1) }else{ if("+".equals(in2._2)){ gzzt.put("keyword",List(in2._1)) } } } } }
某电商平台,用户在某一类别下消费总金额达到一定数量,会有奖励
分析:
不同类别会有对应的奖励机制,需要把这个奖励机制广播给用户消费对应的流
用户的消费应该是一个高吞吐量流
通过用户消费流连接奖励机制流,然后通过process处理
用户消费流应该根据用户标记以以及类别分组===》流是KeyedStream
ProcessFunction应该选中KeyedBroadcastProcessFunction
在KeyedBroadcastProcessFunction中完成奖励机制以及用户消费统计、分析、处理
具体实现:
import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState, ReducingState, ReducingStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * 1.优惠券信息是实时的 * 2.订单信息是实时的 * 就是实时统计用户在某一个分类下的订单总金额 * * 1. 用户+分类进行keyby * 2.分类下订单总金额达到规定的值,会有对应的优惠券 * */ //写两个样本类将传入的数据封装成对象,方便使用 /** * 订单详情类 * @param userNo 用户编号 * @param categoryName 商品类别 * @param money 订单金额 */ case class OrderItem(userNo:String,categoryName:String,money:Double) /** * 优惠券类 * @param categoryName 商品类别 * @param orderMoney 订单金额 * @param yhqMoney 优惠券金额 */ case class YouHuiQuan(categoryName:String,orderMoney:Double,yhqMoney:Double) object YouhuiQuanJob { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //订单信息: 用户编号 商品类别 订单金额 val dataStream1: DataStream[String] = environment.socketTextStream("hadoop10", 9999) //高吞吐量流 val keyedStream: KeyedStream[OrderItem, Tuple] = dataStream1 .map(_.split(" ")) .map(array => new OrderItem(array(0), array(1), array(2).toDouble)) //为了接下来的数据处理方法,可以把数据信息映射成一个实体对象 .keyBy("userNo", "categoryName")//根据实体对象的属性名进行分组 //这种写法,就是参考word count的结构 /*val keyedStream: KeyedStream[(String, OrderItem), String] = dataStream1 .map(_.split(" ")) .map(array => (array(0) + ":" + array(1), new OrderItem(array(0), array(1), array(2).toDouble))) .keyBy(_._1)*/ //优惠券信息: 商品类别 订单总金额 优惠券金额 val dataStream2: DataStream[String] = environment.socketTextStream("hadoop10", 8888) //低吞吐量流 val lowStream: DataStream[YouHuiQuan] = dataStream2 .map(_.split(" ")) .map(array => new YouHuiQuan(array(0), array(1).toDouble, array(2).toDouble)) //这里面存储的是类别下的优惠券信息:商品类别作为key,优惠券对象作为value var broadcastStateDescriptor:MapStateDescriptor[String,YouHuiQuan]=new MapStateDescriptor[String,YouHuiQuan]("bcsd",createTypeInformation[String],createTypeInformation[YouHuiQuan]) //低吞吐量流生成广播流 val broadcastStream: BroadcastStream[YouHuiQuan] = lowStream.broadcast(broadcastStateDescriptor) //高吞吐量流连接广播流,生成广播连接流 val broadcastConnectedStream: BroadcastConnectedStream[OrderItem, YouHuiQuan] = keyedStream.connect(broadcastStream) val result: DataStream[String] = broadcastConnectedStream.process(new MyKeyedBroadcastProcessFunction(broadcastStateDescriptor)) result.print() environment.execute("youhuiquanJob") } } class MyKeyedBroadcastProcessFunction(broadcastStateDescriptor:MapStateDescriptor[String,YouHuiQuan]) extends KeyedBroadcastProcessFunction[Tuple,OrderItem,YouHuiQuan,String]{ var reducingState:ReducingState[Double]=_ override def open(parameters: Configuration): Unit = { var reduceFunction:ReduceFunction[Double]=new ReduceFunction[Double] { override def reduce(value1: Double, value2: Double): Double = value1+value2 } var reducingStateDescriptor:ReducingStateDescriptor[Double]=new ReducingStateDescriptor[Double]("rsd",reduceFunction,createTypeInformation[Double]) reducingState=getRuntimeContext.getReducingState(reducingStateDescriptor) } override def processElement(value: OrderItem, ctx: KeyedBroadcastProcessFunction[Tuple, OrderItem, YouHuiQuan, String]#ReadOnlyContext, out: Collector[String]): Unit = { //获取到状态 val readOnlyState: ReadOnlyBroadcastState[String, YouHuiQuan] = ctx.getBroadcastState(broadcastStateDescriptor) //判断优惠券信息是否存储 //获取到,到目前位置用户在分类下的订单总金额:把用户在这个类别下的每一次下的订单里面的金额都应该计算求和 //要做计算,就应该使用状态。reducingState完成 //判断订单总金额是否达到了优惠标准 //把当前这个订单的金额加入到总金额中 reducingState.add(value.money) if(readOnlyState.contains(value.categoryName)){ //这个类别下有优惠券信息 val youHuiQuan: YouHuiQuan = readOnlyState.get(value.categoryName) //获取到,到目前位置,用户在这个分类下的订单总金额 val totalMoney: Double = reducingState.get() if(totalMoney>=youHuiQuan.orderMoney){ //有优惠券 //发放优惠券 out.collect("在"+value.categoryName+"分类下,订单总金额已经达到了"+youHuiQuan.orderMoney+",发放了一个价值"+youHuiQuan.yhqMoney+"的优惠券。请到优惠券频道查看并使用") //优惠券发放完毕之后,应该把订单总金额清除 reducingState.clear() }else{ //还没有达到要求,需要再消费一些才可以有优惠券的发放 out.collect("在"+value.categoryName+"分类下,还需要再消费"+(youHuiQuan.orderMoney-totalMoney)+"就可以获取到价值"+youHuiQuan.yhqMoney+"的优惠券") } }else{ //这个类别下没有设置优惠券信息 /*//统计用户在这个分类下的订单总金额 reducingState.add(value.money)*/ out.collect("在"+value.categoryName+"分类下,还没有设置优惠券信息,不过没有关系,现在的消费金额也会参与到后续的优惠券发放统计内") } } override def processBroadcastElement(value: YouHuiQuan, ctx: KeyedBroadcastProcessFunction[Tuple, OrderItem, YouHuiQuan, String]#Context, out: Collector[String]): Unit = { //获取到状态 val broadcastState: BroadcastState[String, YouHuiQuan] = ctx.getBroadcastState(broadcastStateDescriptor) //把优惠券对象存入到状态中:商品类别是key,优惠券对象是Value broadcastState.put(value.categoryName,value) } }
升级版:
package gbgx.keyed import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState, ReducingState, ReducingStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.streaming.api.scala.{BroadcastConnectedStream, DataStream, KeyedStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import scala.util.control.Breaks object Yhqsjb { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //拿到用户购买信息的流 val dataStream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9999) val keyedSteam: KeyedStream[Dd, Tuple] = dataStream.map(_.split(" ")).map(v => new Dd(v(0), v(1), v(2).toDouble)).keyBy("id", "lb") //拿到优惠券信息的流 val yhqStream: DataStream[String] = environment.socketTextStream("192.168.229.10", 9998) val yhqStream2: DataStream[(String, Yhq)] = yhqStream.map(_.split(" ")).map(v => (v(3), new Yhq(v(0), v(1).toDouble, v(2).toDouble))) //搞一个状态描述者保证广播流中的状态和用户数据流中使用的状态是同一个状态 val msz = new MapStateDescriptor[String, List[Yhq]]("msz", createTypeInformation[String], createTypeInformation[List[Yhq]]) //低吞吐量流将数据通过状态广播出去,高吞吐量流连接这个流形成广播状态流 val gb: BroadcastStream[(String, Yhq)] = yhqStream2.broadcast(msz) val gbdataStream: BroadcastConnectedStream[Dd, (String, Yhq)] = keyedSteam.connect(gb) //使用自定义方法进行数据的处理,然后输出 val resault: DataStream[String] = gbdataStream.process(new MyFunyhq(msz)) resault.print() environment.execute("") } } //自定义一个类实现具体的计算方法 class MyFunyhq(msz:MapStateDescriptor[String, List[Yhq]]) extends KeyedBroadcastProcessFunction[Tuple,Dd,(String,Yhq),String]{ //定义一个状态用来存储总金额信息,每过类一个用户的类别的金额给他累加起来,当领到优惠券之后就把这个总金额从状态中清除 var allmoney:ReducingState[Double] = _ //写个open方法用来将金额累计起来 override def open(parameters: Configuration): Unit = { var reduceFunction:ReduceFunction[Double]=new ReduceFunction[Double] { override def reduce(value1: Double, value2: Double): Double = value1+value2 } var reducingStateDescriptor:ReducingStateDescriptor[Double]=new ReducingStateDescriptor[Double]("rsd",reduceFunction,createTypeInformation[Double]) allmoney=getRuntimeContext.getReducingState(reducingStateDescriptor) } //这个是处理高吞吐量流的 override def processElement(in1: Dd, readOnlyContext: KeyedBroadcastProcessFunction[Tuple, Dd, (String, Yhq), String]#ReadOnlyContext, collector: Collector[String]): Unit = { //1.从上下文中拿到状态 val state: ReadOnlyBroadcastState[String, List[Yhq]] = readOnlyContext.getBroadcastState(msz) //将金额累计到累计状态中 allmoney.add(in1.money) //然后判断优惠券list中有没有这个优惠券,有的话就发放优惠券并清空累计金额,没有的话就累计金额并告诉他别灰心 if(state.contains(in1.lb)){ //拿到这个类别的优惠券的集合然后降序排列 val list: List[Yhq] = state.get(in1.lb) val list2: List[Yhq] = list.sortBy(v => v.money).reverse //这里就需要使用到状态中存的总金额了 ,用来判断发放怎样的优惠券 val am: Double = allmoney.get() var count=0//循环标记 Breaks.breakable( for (youHuiQuan <- list2) { if(am>=youHuiQuan.money){ collector.collect("在"+in1.lb+"分类下,订单总金额已经达到了"+youHuiQuan.money+",发放了一个价值"+youHuiQuan.yhq+"的优惠券。请到优惠券频道查看并使用") allmoney.clear() Breaks.break() } count+=1 } ) //上面的for循环,都已经读取了list2中的所有数据,都没有进入到if里面 // 没有达到任务的优惠 if(count==list2.size){ var youHuiQuan=list2(list2.size-1)//最后一个 //还没有达到要求,需要再消费一些才可以有优惠券的发放 collector.collect("在"+in1.lb+"分类下,还需要再消费"+(youHuiQuan.money-am)+"就可以获取到价值"+youHuiQuan.yhq+"的优惠券") } }else{ //否则就说明这个类别还没有优惠券 collector.collect("在"+in1.lb+"分类下,还没有设置优惠券信息,不过没有关系,现在的消费金额也会参与到后续的优惠券发放统计内") } } //这个是处理低吞吐量流的 override def processBroadcastElement(in2: (String, Yhq), context: KeyedBroadcastProcessFunction[Tuple, Dd, (String, Yhq), String]#Context, collector: Collector[String]): Unit = { //从上下文中拿到状态 val state: BroadcastState[String, List[Yhq]] = context.getBroadcastState(msz) //1.先判断状态中有没有现在进来的这个优惠券,如果有在根据是增加还是删除进行进一步操作,如果状态中没有新进来的这个优惠券就添加进去状态 if(state.contains(in2._2.lb)){ val list: List[Yhq] = state.get(in2._2.lb) //如果当前状态中有该优惠券的类别就先拿到这个优惠券类别的集合,如果是增加就在集合后面追加,如果是减少就在集合中去除即可 if("+".equals(in2._1)){ val yhqs: List[Yhq] = list :+ in2._2 state.put(in2._2.lb,yhqs) }else if("-".equals(in2._1)){ val list1: List[Yhq] = list.filter(v => v.money != in2._2.money) state.put(in2._2.lb,list1) } }else{ //如果是增加的优惠券,并且当前状态中还没有,就把优惠券 对象 放进集合中 if("+".equals(in2._1)){state.put(in2._2.lb,List(in2._2))} } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。