赞
踩
自定义Functions官方介绍就是除了我们自定义实现Flink提供的各种算子外,还提供了累加器等。
实现比较简单,就是简单的上下文注册以及使用。
代码:
- import org.apache.flink.api.common.accumulators.IntCounter;
- import org.apache.flink.api.common.functions.RichFlatMapFunction;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- public class IntCountMain {
- public static void main(String[] args) throws Exception{
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStreamSource<String> source = env.fromElements("1", "2", "3");
-
- SingleOutputStreamOperator<String> inputStream = source.flatMap(new RichFlatMapFunction<String, String>() {
- //创建累加器对象 IntCounter , LongCounter 和 DoubleCounter
- private IntCounter intCounter = new IntCounter();
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- //注册累加器
- getRuntimeContext().addAccumulator("add", this.intCounter);
- }
-
- public void flatMap(String value, Collector<String> out) throws Exception {
- //来一条数据累加器加一
- this.intCounter.add(1);
- }
- });
-
- //获取累加器的值
- Object add = env.execute().getAccumulatorResult("add");
- System.out.println(add);
-
- }
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。