当前位置:   article > 正文

Flink(18):Flink之累加器_flink 累加器

flink 累加器

目录

0. 相关文章链接

1. Flink中的累加器概述

2. 编码步骤

3. 代码演示


0. 相关文章链接

Flink文章汇总

1. Flink中的累加器概述

        Flink中的累加器,与Mapreduce counter的应用场景类似可以很好地观察task在运行期间的数据变化,如在Flink job任务中的算子函数中操作累加器,在任务执行结束之后才能获得累加器的最终结果。

Flink有以下内置累加器每个累加器都实现了Accumulator接口。

  • IntCounter
  • LongCounter
  • DoubleCounter

2. 编码步骤

  1. 创建累加器:private IntCounter numLines = new IntCounter();
  2. 注册累加器:getRuntimeContext().addAccumulator("num-lines", this.numLines);
  3. 使用累加器:this.numLines.add(1);
  4. 获取累加器的结果:myJobExecutionResult.getAccumulatorResult("num-lines")

3. 代码演示

  1. import org.apache.flink.api.common.JobExecutionResult;
  2. import org.apache.flink.api.common.accumulators.IntCounter;
  3. import org.apache.flink.api.common.functions.RichMapFunction;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.operators.MapOperator;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.core.fs.FileSystem;
  9. /**
  10. * Author itcast
  11. * Desc 演示Flink累加器,统计处理的数据条数
  12. */
  13. public class OtherAPI_Accumulator {
  14. public static void main(String[] args) throws Exception {
  15. //1.env
  16. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  17. //2.Source
  18. DataSource<String> dataDS = env.fromElements("aaa", "bbb", "ccc", "ddd");
  19. //3.Transformation
  20. MapOperator<String, String> result = dataDS.map(new RichMapFunction<String, String>() {
  21. //-1.创建累加器
  22. private IntCounter elementCounter = new IntCounter();
  23. Integer count = 0;
  24. @Override
  25. public void open(Configuration parameters) throws Exception {
  26. super.open(parameters);
  27. //-2注册累加器
  28. getRuntimeContext().addAccumulator("elementCounter", elementCounter);
  29. }
  30. @Override
  31. public String map(String value) throws Exception {
  32. //-3.使用累加器
  33. this.elementCounter.add(1);
  34. count+=1;
  35. System.out.println("不使用累加器统计的结果:"+count);
  36. return value;
  37. }
  38. }).setParallelism(2);
  39. //4.Sink
  40. result.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);
  41. //5.execute
  42. //-4.获取加强结果
  43. JobExecutionResult jobResult = env.execute();
  44. int nums = jobResult.getAccumulatorResult("elementCounter");
  45. System.out.println("使用累加器统计的结果:"+nums);
  46. }
  47. }

此博客根据某马2020年贺岁视频改编而来:【狂野大数据】Flink1.12从入门到精通#2021#流批一体#黑马程序员#大数据_哔哩哔哩_bilibili

注:其他相关文章链接由此进 -> Flink文章汇总


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/696167
推荐阅读
相关标签
  

闽ICP备14008679号