赞
踩
需求:当一个文本文件进入时,有可能会有一些格式乱码的错误行,如何统计哪些错误行?如何提取错误行
- def main(args: Array[String]): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val data = env.fromElements("hadoop","spark","pyspark", "storm")
- data.map(new RichMapFunction[String, Long] {
- var counter = 0l
- override def map(value: String): Long = {
- counter = counter + 1
- println("counter:"+counter)
- counter
- }
- }).setParallelism(2).print()
-
- }
使用这种方式,设置并行度之后,无法正确统计。
正确的方式是通过定义Accumulator来进行计数操作。scala实现方式如下:
- val info = data.map(new RichMapFunction[String, String] {
- // step1:定义计数器
- val counter = new LongCounter()
- override def open(parameters: Configuration): Unit = {
- // step2: 注册计数器
- getRuntimeContext.addAccumulator("ele-counts-scala", counter)
- }
- override def map(in: String): String = {
- counter.add(1)
- in
- }
- })
- info.writeAsText("E:/test3", WriteMode.OVERWRITE).setParallelism(4)
- val jobResult=env.execute("CounterApp")
- // step3: 获取计数器
- val num =jobResult.getAccumulatorResult[Long]("ele-counts-scala")
- println("num:" + num )

- public class JavaCounterApp {
- public static void main(String[] args) throws Exception {
- ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
- DataSource<String> data = executionEnvironment.fromElements("hadoop", "spark", "pyspark", "storm");
- DataSet dataSet = data.map(new RichMapFunction<String, String>() {
- LongCounter counter = new LongCounter();
-
- @Override
- public void open(Configuration parameters) throws Exception {
- getRuntimeContext().addAccumulator("ele-counts-java",counter);
- }
-
- @Override
- public String map(String value) throws Exception {
- counter.add(1);
- return value;
- }
- });
- dataSet.writeAsText("E:/test4", FileSystem.WriteMode.OVERWRITE).setParallelism(3);
- JobExecutionResult javaCounterApp = executionEnvironment.execute("JavaCounterApp");
- long num = javaCounterApp.getAccumulatorResult("ele-counts-java");
- System.out.println("num:" + num);
- }
- }

赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。