赞
踩
把元素广播给所有的分区,数据会被重复处理
dataStream.broadcast()
广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks。
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
广播状态可用于以特定方式组合和联合处理两个事件流。第一个流的事件被广播到运营商的所有并行实例,这些实例将它们维持为状态。
(1)批处理
- public static void main(String[] args) throws Exception{
- ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
- //准备广播变量数据
- ArrayList<Tuple2<String,Integer>> broadData=new ArrayList<>();
- broadData.add(new Tuple2<>("python",18));
- broadData.add(new Tuple2<>("scala",20));
- broadData.add(new Tuple2<>("java",17));
- DataSource<Tuple2<String, Integer>> dataBroadSource = env.fromCollection(broadData);
-
- DataSet<Map<String, Integer>> baseData =dataBroadSource.map(new MapFunction<Tuple2<String, Integer>, Map<String,Integer>>() {
- @Override
- public Map<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
- Map<String,Integer> map=new HashMap<>();
- map.put(stringIntegerTuple2._1,stringIntegerTuple2._2);
- return map;
- }
- });
- DataSet <String> dataSource = env.fromElements("python", "java","java","kafka","scala","redis");
-
- DataSet <String> result =dataSource.map(new RichMapFunction<String, String>() {
- Map<String, Integer> allMap = new HashMap <String, Integer>();
- List<HashMap <String, Integer>> broadCastMap = new ArrayList<HashMap <String, Integer>>();
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.broadCastMap = getRuntimeContext().getBroadcastVariable("baseData");
- for (HashMap map : broadCastMap) {
- allMap.putAll(map);
- }
- }
- @Override
- public String map(String s) throws Exception {
- Integer age = allMap.get(s);
- return s + "," + age;
- }
- }).withBroadcastSet(baseData,"baseData");
- result.print();
- }
计算结果:
python,18
java,17
java,17
kafka,null
scala,20
redis,null
(2)使用广播流,实现数据流的动态配置(taskSlot是内存隔离的,所以broadcast是在Taskslot都有一份)
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<String> filterData = env.addSource(new RichSourceFunction<String>() {
- private boolean isRunning = true;
- //测试数据集
- String[] data = new String[]{"java", "python", "scala"};
- /**
- * 模拟数据源,每1分钟产生一次数据,实现数据的跟新
- * @param cxt
- * @throws Exception
- */
- @Override
- public void run(SourceContext <String> cxt) throws Exception {
- int size = data.length;
- while (isRunning) {
- TimeUnit.MINUTES.sleep(1);
- int seed = (int) (Math.random() * size);
- //在数据集中随机生成一个数据进行发送
- cxt.collect(data[seed]);
- System.out.println("发送的关键字是:" + data[seed]);
- }
- }
- @Override
- public void cancel() {
- isRunning = false;
- }
- });
- //1、定义数据广播的规则:
- MapStateDescriptor<String, String> configFilter = new MapStateDescriptor<String, String>("configFilter", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
- //2、对filterData进行广播
- BroadcastStream<String> broadcastConfig = filterData.setParallelism(1).broadcast(configFilter);
- //定义数据集
- DataStreamSource <String> dataStream = env.addSource(new RichSourceFunction <String>() {
- private boolean isRunning = true;
- //测试数据集
- String[] data = new String[]{
- "java代码量太大",
- "python代码量少,易学习",
- "php是web开发语言",
- "scala流式处理语言,主要应用于大数据开发场景",
- "go是一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言"
- };
-
- /**
- * 模拟数据源,每3s产生一次
- * @param ctx
- * @throws Exception
- */
- @Override
- public void run(SourceContext <String> ctx) throws Exception {
- int size = data.length;
- while (isRunning) {
- TimeUnit.SECONDS.sleep(3);
- int seed = (int) (Math.random() * size);
- //在数据集中随机生成一个数据进行发送
- ctx.collect(data[seed]);
- System.out.println("上游发送的消息:" + data[seed]);
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
- });
-
- //3、dataStream对广播的数据进行关联(使用connect进行连接)
- DataStream<String> result = dataStream.connect(broadcastConfig).process(new BroadcastProcessFunction<String, String, String>() {
-
- //拦截的关键字
- private String keyWords = null;
-
- /**
- * open方法只会执行一次
- * 可以在这实现初始化的功能
- * 4、设置keyWords的初始值,否者会报错:java.lang.NullPointerException
- * @param parameters
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- keyWords="java";
- System.out.println("初始化keyWords:java");
- }
-
- /**
- * 6、 处理流中的数据
- * @param value
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
- if (value.contains(keyWords)) {
- out.collect("拦截消息:" + value + ", 原因:包含拦截关键字:" + keyWords);
- }
- }
-
- /**
- *5、对广播变量的获取更新
- * @param value
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processBroadcastElement(String value, Context ctx, Collector <String> out) throws Exception {
- keyWords = value;
- System.out.println("更新关键字:" + value);
- }
- });
- result.print();
- env.execute(StreamBroadcastDemo.class.getSimpleName());
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。