当前位置:   article > 正文

Flink的广播变量和广播状态_flink的状态是不和广播变量一样

flink的状态是不和广播变量一样

1、dataStreaming中的broadcast

把元素广播给所有的分区,数据会被重复处理

dataStream.broadcast()

2、机器级别的广播

广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks。
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

3、广播状态

广播状态可用于以特定方式组合和联合处理两个事件流。第一个流的事件被广播到运营商的所有并行实例,这些实例将它们维持为状态。

4、用法

(1)批处理

  1. public static void main(String[] args) throws Exception{
  2. ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
  3. //准备广播变量数据
  4. ArrayList<Tuple2<String,Integer>> broadData=new ArrayList<>();
  5. broadData.add(new Tuple2<>("python",18));
  6. broadData.add(new Tuple2<>("scala",20));
  7. broadData.add(new Tuple2<>("java",17));
  8. DataSource<Tuple2<String, Integer>> dataBroadSource = env.fromCollection(broadData);
  9. DataSet<Map<String, Integer>> baseData =dataBroadSource.map(new MapFunction<Tuple2<String, Integer>, Map<String,Integer>>() {
  10. @Override
  11. public Map<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
  12. Map<String,Integer> map=new HashMap<>();
  13. map.put(stringIntegerTuple2._1,stringIntegerTuple2._2);
  14. return map;
  15. }
  16. });
  17. DataSet <String> dataSource = env.fromElements("python", "java","java","kafka","scala","redis");
  18. DataSet <String> result =dataSource.map(new RichMapFunction<String, String>() {
  19. Map<String, Integer> allMap = new HashMap <String, Integer>();
  20. List<HashMap <String, Integer>> broadCastMap = new ArrayList<HashMap <String, Integer>>();
  21. @Override
  22. public void open(Configuration parameters) throws Exception {
  23. super.open(parameters);
  24. this.broadCastMap = getRuntimeContext().getBroadcastVariable("baseData");
  25. for (HashMap map : broadCastMap) {
  26. allMap.putAll(map);
  27. }
  28. }
  29. @Override
  30. public String map(String s) throws Exception {
  31. Integer age = allMap.get(s);
  32. return s + "," + age;
  33. }
  34. }).withBroadcastSet(baseData,"baseData");
  35. result.print();
  36. }

计算结果:

python,18
java,17
java,17
kafka,null
scala,20
redis,null

(2)使用广播流,实现数据流的动态配置(taskSlot是内存隔离的,所以broadcast是在Taskslot都有一份)

  1. public static void main(String[] args) throws Exception{
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. DataStreamSource<String> filterData = env.addSource(new RichSourceFunction<String>() {
  4. private boolean isRunning = true;
  5. //测试数据集
  6. String[] data = new String[]{"java", "python", "scala"};
  7. /**
  8. * 模拟数据源,每1分钟产生一次数据,实现数据的跟新
  9. * @param cxt
  10. * @throws Exception
  11. */
  12. @Override
  13. public void run(SourceContext <String> cxt) throws Exception {
  14. int size = data.length;
  15. while (isRunning) {
  16. TimeUnit.MINUTES.sleep(1);
  17. int seed = (int) (Math.random() * size);
  18. //在数据集中随机生成一个数据进行发送
  19. cxt.collect(data[seed]);
  20. System.out.println("发送的关键字是:" + data[seed]);
  21. }
  22. }
  23. @Override
  24. public void cancel() {
  25. isRunning = false;
  26. }
  27. });
  28. //1、定义数据广播的规则:
  29. MapStateDescriptor<String, String> configFilter = new MapStateDescriptor<String, String>("configFilter", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
  30. //2、对filterData进行广播
  31. BroadcastStream<String> broadcastConfig = filterData.setParallelism(1).broadcast(configFilter);
  32. //定义数据集
  33. DataStreamSource <String> dataStream = env.addSource(new RichSourceFunction <String>() {
  34. private boolean isRunning = true;
  35. //测试数据集
  36. String[] data = new String[]{
  37. "java代码量太大",
  38. "python代码量少,易学习",
  39. "php是web开发语言",
  40. "scala流式处理语言,主要应用于大数据开发场景",
  41. "go是一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言"
  42. };
  43. /**
  44. * 模拟数据源,每3s产生一次
  45. * @param ctx
  46. * @throws Exception
  47. */
  48. @Override
  49. public void run(SourceContext <String> ctx) throws Exception {
  50. int size = data.length;
  51. while (isRunning) {
  52. TimeUnit.SECONDS.sleep(3);
  53. int seed = (int) (Math.random() * size);
  54. //在数据集中随机生成一个数据进行发送
  55. ctx.collect(data[seed]);
  56. System.out.println("上游发送的消息:" + data[seed]);
  57. }
  58. }
  59. @Override
  60. public void cancel() {
  61. isRunning = false;
  62. }
  63. });
  64. //3、dataStream对广播的数据进行关联(使用connect进行连接)
  65. DataStream<String> result = dataStream.connect(broadcastConfig).process(new BroadcastProcessFunction<String, String, String>() {
  66. //拦截的关键字
  67. private String keyWords = null;
  68. /**
  69. * open方法只会执行一次
  70. * 可以在这实现初始化的功能
  71. * 4、设置keyWords的初始值,否者会报错:java.lang.NullPointerException
  72. * @param parameters
  73. * @throws Exception
  74. */
  75. @Override
  76. public void open(Configuration parameters) throws Exception {
  77. super.open(parameters);
  78. keyWords="java";
  79. System.out.println("初始化keyWords:java");
  80. }
  81. /**
  82. * 6、 处理流中的数据
  83. * @param value
  84. * @param ctx
  85. * @param out
  86. * @throws Exception
  87. */
  88. @Override
  89. public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
  90. if (value.contains(keyWords)) {
  91. out.collect("拦截消息:" + value + ", 原因:包含拦截关键字:" + keyWords);
  92. }
  93. }
  94. /**
  95. *5、对广播变量的获取更新
  96. * @param value
  97. * @param ctx
  98. * @param out
  99. * @throws Exception
  100. */
  101. @Override
  102. public void processBroadcastElement(String value, Context ctx, Collector <String> out) throws Exception {
  103. keyWords = value;
  104. System.out.println("更新关键字:" + value);
  105. }
  106. });
  107. result.print();
  108. env.execute(StreamBroadcastDemo.class.getSimpleName());
  109. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/696207
推荐阅读
相关标签
  

闽ICP备14008679号