赞
踩
一种错误的实现方式:这个下面就是一种错误的实现方式,首先定义数据源,其中有五个单词,然后进行数据处理,map里面实现了RichMapFunction接口,重写map方法,先定义一个计数器counter为0,每读一个单词,counter加一,就把这个单词变为counter进行输出,这里设置的并行度为1,输出结果是1,2,3,4,5
但是当我们将并行度设置为2的时候,输出变成了1,2,3,1,2 其实很好理解,就是Integer counter = 0;这句话我们执行了两次,因为并行度为2,导致我们最后是不同的并行度有不同的counter进行计数的!所以这样就不行
public class JavaCountApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> data = env.fromElements("flink", "hadoop", "spark", "storm", "linux"); data.map(new RichMapFunction<String, Integer>() { Integer counter = 0; @Override public Integer map(String value) throws Exception { counter++; return counter; } }).setParallelism(1).print(); } } //并行度为1输出:1,2,3,4,5 //并行度为2输出:1,2,3,1,2
正确实现:
实现计数器有三个步骤:
代码运行步骤:
输出如下:
open counter: 0
map counter: 1
map counter: 2
map counter: 3
open counter: 0
map counter: 1
map counter: 2
num: 5
public class JavaCountApp { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> data = env.fromElements("flink", "hadoop", "spark", "storm", "linux"); DataSet<String> text = data.map(new RichMapFunction<String, String>() { //step1:定义计数器 LongCounter counter = new LongCounter(); @Override public void open(Configuration parameters) throws Exception { //step2:注册计数器 getRuntimeContext().addAccumulator("ele-counts-java", counter); //这里是为了调试进行输出每次注册计数器的初始值 System.out.println("open counter: "+counter.getLocalValue()); } @Override public String map(String value) throws Exception { counter.add(1); //查看计数器的值 System.out.println("map counter: "+counter.getLocalValue()); return value; } }).setParallelism(2); String filePath = "C:\\Users\\77051\\IdeaProjects\\flinktrain01\\src\\main\\scala\\com\\imooc\\file\\sink1\\"; text.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE); JobExecutionResult jobResult = env.execute("JavaCountApp"); //step3:获取计数器 Long num = jobResult.getAccumulatorResult("ele-counts-java"); System.out.println("num: "+num); } }
其实这里我们还是可以看到,在并行度为2的情况下,open方法同样执行了两次,每次注册计数器都为0,所以两个并行度都有各自的计数器在计算,只不过在最后加了起来!最后得到的结果为5,那么是在哪里加起来的呢?
实际上是在job执行execute方法中自动执行的!将所有的计数器进行相加!
这一部分是ExecutionGraph.java的方法,进行相加计数器
/** * Merges all accumulator results from the tasks previously executed in the Executions. * @return The accumulator map */ public Map<String, OptionalFailure<Accumulator<?, ?>>> aggregateUserAccumulators() { Map<String, OptionalFailure<Accumulator<?, ?>>> userAccumulators = new HashMap<>(); for (ExecutionVertex vertex : getAllExecutionVertices()) { Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); if (next != null) { AccumulatorHelper.mergeInto(userAccumulators, next); } } return userAccumulators; }
open方法:RichSinkFunction与RichMapFunction都继承了AbstractRichFunction, 而AbstractRichFunction是一个抽象类,它实现了RichFunction接口,而open方法便是在RichFunction中定义的。 open方法相当于在执行具体计算前做一些初始化操作, 会在实际方法调用过程前(例如map)执行,因此适用于做一些一次性的配置工作。 对于一些多并行度的操作,会在每一次并行计算前执行一次open操作。 其中configuration参数包含做一些配置和初始化参数,这些参数可以在自定义。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。