当前位置:   article > 正文

java Flink(三十八)用户自定义Functions-累加器_flink map获取累加值

flink map获取累加值

  自定义Functions官方介绍就是除了我们自定义实现Flink提供的各种算子外,还提供了累加器等。

  实现比较简单,就是简单的上下文注册以及使用。

  代码:

    

  1. import org.apache.flink.api.common.accumulators.IntCounter;
  2. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. public class IntCountMain {
  9. public static void main(String[] args) throws Exception{
  10. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  11. env.setParallelism(1);
  12. DataStreamSource<String> source = env.fromElements("1", "2", "3");
  13. SingleOutputStreamOperator<String> inputStream = source.flatMap(new RichFlatMapFunction<String, String>() {
  14. //创建累加器对象 IntCounter , LongCounter 和 DoubleCounter
  15. private IntCounter intCounter = new IntCounter();
  16. @Override
  17. public void open(Configuration parameters) throws Exception {
  18. super.open(parameters);
  19. //注册累加器
  20. getRuntimeContext().addAccumulator("add", this.intCounter);
  21. }
  22. public void flatMap(String value, Collector<String> out) throws Exception {
  23. //来一条数据累加器加一
  24. this.intCounter.add(1);
  25. }
  26. });
  27. //获取累加器的值
  28. Object add = env.execute().getAccumulatorResult("add");
  29. System.out.println(add);
  30. }
  31. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/696188
推荐阅读
相关标签
  

闽ICP备14008679号