当前位置:   article > 正文

Flink实现计数器_flink 计数器

flink 计数器

一种错误的实现方式:这个下面就是一种错误的实现方式,首先定义数据源,其中有五个单词,然后进行数据处理,map里面实现了RichMapFunction接口,重写map方法,先定义一个计数器counter为0,每读一个单词,counter加一,就把这个单词变为counter进行输出,这里设置的并行度为1,输出结果是1,2,3,4,5

但是当我们将并行度设置为2的时候,输出变成了1,2,3,1,2 其实很好理解,就是Integer counter = 0;这句话我们执行了两次,因为并行度为2,导致我们最后是不同的并行度有不同的counter进行计数的!所以这样就不行

public class JavaCountApp {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> data = env.fromElements("flink", "hadoop", "spark", "storm", "linux");
        data.map(new RichMapFunction<String, Integer>() {
            Integer counter = 0;
            @Override
            public Integer map(String value) throws Exception {
                counter++;
               return counter;
            }
        }).setParallelism(1).print();
    }
}
//并行度为1输出:1,2,3,4,5
//并行度为2输出:1,2,3,1,2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

正确实现:

实现计数器有三个步骤:

  • 1.定义计数器
  • 2.注册计数器
  • 3.获取计数器

代码运行步骤:

  • 首先和上面的实现方法一样,map中实现RichMapFunction方法
  • 第一步就是定义计数器
  • 这里我们在open方法中进行注册计数器,每次注册完计数器看一下初始值进行输出(open方法有什么作用后面会说)
  • 之后再map方法中计数器进行add(1)操作 并将值进行输出
  • 最后我们在jobResult中获取到这个计数器

输出如下:

   open counter: 0
    map counter: 1
    map counter: 2
    map counter: 3
    open counter: 0
    map counter: 1
    map counter: 2
    num: 5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  public class JavaCountApp {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<String> data = env.fromElements("flink", "hadoop", "spark", "storm", "linux");
    
            DataSet<String> text = data.map(new RichMapFunction<String, String>() {
                //step1:定义计数器
                LongCounter counter = new LongCounter();
                @Override
                public void open(Configuration parameters) throws Exception {
                    //step2:注册计数器
                    getRuntimeContext().addAccumulator("ele-counts-java", counter);
                   //这里是为了调试进行输出每次注册计数器的初始值
                    System.out.println("open counter: "+counter.getLocalValue());
    
                }
    
                @Override
                public String map(String value) throws Exception {
                    counter.add(1);
    				//查看计数器的值
                    System.out.println("map counter: "+counter.getLocalValue());
                    return value;
                }
            }).setParallelism(2);
    
            String filePath = "C:\\Users\\77051\\IdeaProjects\\flinktrain01\\src\\main\\scala\\com\\imooc\\file\\sink1\\";
            text.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);
    
    
            JobExecutionResult jobResult = env.execute("JavaCountApp");
            //step3:获取计数器
            Long num = jobResult.getAccumulatorResult("ele-counts-java");
            System.out.println("num: "+num);
    
    
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

其实这里我们还是可以看到,在并行度为2的情况下,open方法同样执行了两次,每次注册计数器都为0,所以两个并行度都有各自的计数器在计算,只不过在最后加了起来!最后得到的结果为5,那么是在哪里加起来的呢?

实际上是在job执行execute方法中自动执行的!将所有的计数器进行相加!
这一部分是ExecutionGraph.java的方法,进行相加计数器

/**
	 * Merges all accumulator results from the tasks previously executed in the Executions.
	 * @return The accumulator map
	 */
	public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() {
 
		Map<String, OptionalFailure<Accumulator<?, ?>>> userAccumulators = new HashMap<>();
 
		for (ExecutionVertex vertex : getAllExecutionVertices()) {
			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
			if (next != null) {
				AccumulatorHelper.mergeInto(userAccumulators, next);
			}
		}
 
		return userAccumulators;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

open方法:RichSinkFunction与RichMapFunction都继承了AbstractRichFunction, 而AbstractRichFunction是一个抽象类,它实现了RichFunction接口,而open方法便是在RichFunction中定义的。 open方法相当于在执行具体计算前做一些初始化操作, 会在实际方法调用过程前(例如map)执行,因此适用于做一些一次性的配置工作。 对于一些多并行度的操作,会在每一次并行计算前执行一次open操作。 其中configuration参数包含做一些配置和初始化参数,这些参数可以在自定义。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/696180
推荐阅读
相关标签
  

闽ICP备14008679号