当前位置:   article > 正文

Apache Flink 零基础入门(十三)Flink 计数器_flink全局计数器

flink全局计数器

需求:当一个文本文件进入时,有可能会有一些格式乱码的错误行,如何统计哪些错误行?如何提取错误行

  1. def main(args: Array[String]): Unit = {
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. val data = env.fromElements("hadoop","spark","pyspark", "storm")
  4. data.map(new RichMapFunction[String, Long] {
  5. var counter = 0l
  6. override def map(value: String): Long = {
  7. counter = counter + 1
  8. println("counter:"+counter)
  9. counter
  10. }
  11. }).setParallelism(2).print()
  12. }

使用这种方式,设置并行度之后,无法正确统计。

正确的方式是通过定义Accumulator来进行计数操作。scala实现方式如下:

  1. val info = data.map(new RichMapFunction[String, String] {
  2. // step1:定义计数器
  3. val counter = new LongCounter()
  4. override def open(parameters: Configuration): Unit = {
  5. // step2: 注册计数器
  6. getRuntimeContext.addAccumulator("ele-counts-scala", counter)
  7. }
  8. override def map(in: String): String = {
  9. counter.add(1)
  10. in
  11. }
  12. })
  13. info.writeAsText("E:/test3", WriteMode.OVERWRITE).setParallelism(4)
  14. val jobResult=env.execute("CounterApp")
  15. // step3: 获取计数器
  16. val num =jobResult.getAccumulatorResult[Long]("ele-counts-scala")
  17. println("num:" + num )

Java

  1. public class JavaCounterApp {
  2. public static void main(String[] args) throws Exception {
  3. ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
  4. DataSource<String> data = executionEnvironment.fromElements("hadoop", "spark", "pyspark", "storm");
  5. DataSet dataSet = data.map(new RichMapFunction<String, String>() {
  6. LongCounter counter = new LongCounter();
  7. @Override
  8. public void open(Configuration parameters) throws Exception {
  9. getRuntimeContext().addAccumulator("ele-counts-java",counter);
  10. }
  11. @Override
  12. public String map(String value) throws Exception {
  13. counter.add(1);
  14. return value;
  15. }
  16. });
  17. dataSet.writeAsText("E:/test4", FileSystem.WriteMode.OVERWRITE).setParallelism(3);
  18. JobExecutionResult javaCounterApp = executionEnvironment.execute("JavaCounterApp");
  19. long num = javaCounterApp.getAccumulatorResult("ele-counts-java");
  20. System.out.println("num:" + num);
  21. }
  22. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/696168
推荐阅读
相关标签
  

闽ICP备14008679号