当前位置:   article > 正文

一文详解!flink的CsvReader解析,还看不懂我就没法了_flink csvreader

flink csvreader

本文主要研究一下flink的CsvReader

实例

  1. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
  3. .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");
  4. csvInput.map(new MapFunction<RecordDto, RecordDto>() {
  5. @Override
  6. public RecordDto map(RecordDto value) throws Exception {
  7. LOGGER.info("execute map:{}",value);
  8. TimeUnit.SECONDS.sleep(5);
  9. return value;
  10. }
  11. }).print();

ExecutionEnvironment.readCsvFile

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

  1. /**
  2. * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
  3. * define parameters and field types and will eventually produce the DataSet that corresponds to
  4. * the read and parsed CSV input.
  5. *
  6. * @param filePath The path of the CSV file.
  7. * @return A CsvReader that can be used to configure the CSV input.
  8. */
  9. public CsvReader readCsvFile(String filePath) {
  10. return new CsvReader(filePath, this);
  11. }
  • 这里根据filePath创建了CsvReader

CsvReader

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.java

  1. public CsvReader(String filePath, ExecutionEnvironment executionContext) {
  2. this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
  3. }
  4. public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
  5. Preconditions.checkNotNull(filePath, "The file path may not be null.");
  6. Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
  7. this.path = filePath;
  8. this.executionContext = executionContext;
  9. }
  10. /**
  11. * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
  12. * must be public or able to set value. The type information for the fields is obtained from the type class.
  13. *
  14. * @param pojoType The class of the target POJO.
  15. * @param pojoFields The fields of the POJO which are mapped to CSV fields.
  16. * @return The DataSet representing the parsed CSV data.
  17. */
  18. public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
  19. Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
  20. Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");
  21. final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
  22. if (!(ti instanceof PojoTypeInfo)) {
  23. throw new IllegalArgumentException(
  24. "The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
  25. }
  26. final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;
  27. CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);
  28. configureInputFormat(inputFormat);
  29. return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
  30. }
  • CsvReader提供了pojoType方法,用于将csv的数据映射为java类型,同时转换为flink的DataSource;创建DataSource的时候,这里提供了PojoCsvInputFormat和PojoTypeInfo

任务

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

  1. /**
  2. * The Task represents one execution of a parallel subtask on a TaskManager.
  3. * A Task wraps a Flink operator (which may be a user function) and
  4. * runs it, providing all services necessary for example to consume input data,
  5. * produce its results (intermediate result partitions) and communicate
  6. * with the JobManager.
  7. *
  8. * <p>The Flink operators (implemented as subclasses of
  9. * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
  10. * The task connects those to the network stack and actor messages, and tracks the state
  11. * of the execution and handles exceptions.
  12. *
  13. * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
  14. * are the first attempt to execute the task, or a repeated attempt. All of that
  15. * is only known to the JobManager. All the task knows are its own runnable code,
  16. * the task's configuration, and the IDs of the intermediate results to consume and
  17. * produce (if any).
  18. *
  19. * <p>Each Task is run by one dedicated thread.
  20. */
  21. public class Task implements Runnable, TaskActions, CheckpointListener {
  22. //......
  23. /**
  24. * The core work method that bootstraps the task and executes its code.
  25. */
  26. @Override
  27. public void run() {
  28. //......
  29. // now load and instantiate the task's invokable code
  30. invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
  31. // ----------------------------------------------------------------
  32. // actual task core work
  33. // ----------------------------------------------------------------
  34. // we must make strictly sure that the invokable is accessible to the cancel() call
  35. // by the time we switched to running.
  36. this.invokable = invokable;
  37. // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
  38. if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
  39. throw new CancelTaskException();
  40. }
  41. // notify everyone that we switched to running
  42. notifyObservers(ExecutionState.RUNNING, null);
  43. taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
  44. // make sure the user code classloader is accessible thread-locally
  45. executingThread.setContextClassLoader(userCodeClassLoader);
  46. // run the invokable
  47. invokable.invoke();
  48. //......
  49. }
  50. }
  • Task的运行方法会调用invokable.invoke(),这里的invokable为DataSourceTask

DataSourceTask.invoke

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/DataSourceTask.java

  1. @Override
  2. public void invoke() throws Exception {
  3. // --------------------------------------------------------------------
  4. // Initialize
  5. // --------------------------------------------------------------------
  6. initInputFormat();
  7. LOG.debug(getLogString("Start registering input and output"));
  8. try {
  9. initOutputs(getUserCodeClassLoader());
  10. } catch (Exception ex) {
  11. throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
  12. ex.getMessage(), ex);
  13. }
  14. LOG.debug(getLogString("Finished registering input and output"));
  15. // --------------------------------------------------------------------
  16. // Invoke
  17. // --------------------------------------------------------------------
  18. LOG.debug(getLogString("Starting data source operator"));
  19. RuntimeContext ctx = createRuntimeContext();
  20. final Counter numRecordsOut;
  21. {
  22. Counter tmpNumRecordsOut;
  23. try {
  24. OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
  25. ioMetricGroup.reuseInputMetricsForTask();
  26. if (this.config.getNumberOfChainedStubs() == 0) {
  27. ioMetricGroup.reuseOutputMetricsForTask();
  28. }
  29. tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
  30. } catch (Exception e) {
  31. LOG.warn("An exception occurred during the metrics setup.", e);
  32. tmpNumRecordsOut = new SimpleCounter();
  33. }
  34. numRecordsOut = tmpNumRecordsOut;
  35. }
  36. Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
  37. if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
  38. ((RichInputFormat) this.format).setRuntimeContext(ctx);
  39. LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
  40. ((RichInputFormat) this.format).openInputFormat();
  41. LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
  42. }
  43. ExecutionConfig executionConfig = getExecutionConfig();
  44. boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
  45. LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
  46. final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
  47. try {
  48. // start all chained tasks
  49. BatchTask.openChainedTasks(this.chainedTasks, this);
  50. // get input splits to read
  51. final Iterator<InputSplit> splitIterator = getInputSplits();
  52. // for each assigned input split
  53. while (!this.taskCanceled && splitIterator.hasNext())
  54. {
  55. // get start and end
  56. final InputSplit split = splitIterator.next();
  57. LOG.debug(getLogString("Opening input split " + split.toString()));
  58. final InputFormat<OT, InputSplit> format = this.format;
  59. // open input format
  60. format.open(split);
  61. LOG.debug(getLogString("Starting to read input from split " + split.toString()));
  62. try {
  63. final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
  64. if (objectReuseEnabled) {
  65. OT reuse = serializer.createInstance();
  66. // as long as there is data to read
  67. while (!this.taskCanceled && !format.reachedEnd()) {
  68. OT returned;
  69. if ((returned = format.nextRecord(reuse)) != null) {
  70. output.collect(returned);
  71. }
  72. }
  73. } else {
  74. // as long as there is data to read
  75. while (!this.taskCanceled && !format.reachedEnd()) {
  76. OT returned;
  77. if ((returned = format.nextRecord(serializer.createInstance())) != null) {
  78. output.collect(returned);
  79. }
  80. }
  81. }
  82. if (LOG.isDebugEnabled() && !this.taskCanceled) {
  83. LOG.debug(getLogString("Closing input split " + split.toString()));
  84. }
  85. } finally {
  86. // close. We close here such that a regular close throwing an exception marks a task as failed.
  87. format.close();
  88. }
  89. completedSplitsCounter.inc();
  90. } // end for all input splits
  91. // close the collector. if it is a chaining task collector, it will close its chained tasks
  92. this.output.close();
  93. // close all chained tasks letting them report failure
  94. BatchTask.closeChainedTasks(this.chainedTasks, this);
  95. }
  96. catch (Exception ex) {
  97. // close the input, but do not report any exceptions, since we already have another root cause
  98. try {
  99. this.format.close();
  100. } catch (Throwable ignored) {}
  101. BatchTask.cancelChainedTasks(this.chainedTasks);
  102. ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
  103. if (ex instanceof CancelTaskException) {
  104. // forward canceling exception
  105. throw ex;
  106. }
  107. else if (!this.taskCanceled) {
  108. // drop exception, if the task was canceled
  109. BatchTask.logAndThrowException(ex, this);
  110. }
  111. } finally {
  112. BatchTask.clearWriters(eventualOutputs);
  113. // --------------------------------------------------------------------
  114. // Closing
  115. // --------------------------------------------------------------------
  116. if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
  117. ((RichInputFormat) this.format).closeInputFormat();
  118. LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
  119. }
  120. }
  121. if (!this.taskCanceled) {
  122. LOG.debug(getLogString("Finished data source operator"));
  123. }
  124. else {
  125. LOG.debug(getLogString("Data source operator cancelled"));
  126. }
  127. }
  • DataSourceTask的调用方法这里只要不是taskCanceled及format.reachedEnd(),都会调用format.nextRecord(serializer.createInstance())来拉取数据,然后执行output.collect(返回)
  • 这里的format为CsvInputFormat(PojoCsvInputFormat),不过nextRecord以及reachedEnd方法它是调用的父类DelimitedInputFormat
  • PojoCsvInputFormat继承了抽象类CsvInputFormat,而CsvInputFormat继承了抽象类GenericCsvInputFormat,GenericCsvInputFormat则继承了抽象类DelimitedInputFormat

DelimitedInputFormat

flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DelimitedInputFormat.java

  1. /**
  2. * The default read buffer size = 1MB.
  3. */
  4. private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
  5. private transient byte[] readBuffer;
  6. private int bufferSize = -1;
  7. private void initBuffers() {
  8. this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
  9. if (this.bufferSize <= this.delimiter.length) {
  10. throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
  11. }
  12. if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
  13. this.readBuffer = new byte[this.bufferSize];
  14. }
  15. if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
  16. this.wrapBuffer = new byte[256];
  17. }
  18. this.readPos = 0;
  19. this.limit = 0;
  20. this.overLimit = false;
  21. this.end = false;
  22. }
  23. /**
  24. * Checks whether the current split is at its end.
  25. *
  26. * @return True, if the split is at its end, false otherwise.
  27. */
  28. @Override
  29. public boolean reachedEnd() {
  30. return this.end;
  31. }
  32. @Override
  33. public OT nextRecord(OT record) throws IOException {
  34. if (readLine()) {
  35. return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
  36. } else {
  37. this.end = true;
  38. return null;
  39. }
  40. }
  41. /**
  42. * Fills the read buffer with bytes read from the file starting from an offset.
  43. */
  44. private boolean fillBuffer(int offset) throws IOException {
  45. int maxReadLength = this.readBuffer.length - offset;
  46. // special case for reading the whole split.
  47. if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
  48. int read = this.stream.read(this.readBuffer, offset, maxReadLength);
  49. if (read == -1) {
  50. this.stream.close();
  51. this.stream = null;
  52. return false;
  53. } else {
  54. this.readPos = offset;
  55. this.limit = read;
  56. return true;
  57. }
  58. }
  59. // else ..
  60. int toRead;
  61. if (this.splitLength > 0) {
  62. // if we have more data, read that
  63. toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
  64. }
  65. else {
  66. // if we have exhausted our split, we need to complete the current record, or read one
  67. // more across the next split.
  68. // the reason is that the next split will skip over the beginning until it finds the first
  69. // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
  70. // previous split.
  71. toRead = maxReadLength;
  72. this.overLimit = true;
  73. }
  74. int read = this.stream.read(this.readBuffer, offset, toRead);
  75. if (read == -1) {
  76. this.stream.close();
  77. this.stream = null;
  78. return false;
  79. } else {
  80. this.splitLength -= read;
  81. this.readPos = offset; // position from where to start reading
  82. this.limit = read + offset; // number of valid bytes in the read buffer
  83. return true;
  84. }
  85. }
  • DelimitedInputFormat首先调用readLine()读取数据到currBuffer,如果有数据,则调用子类CsvInputFormat实现的readRecord方法,此处传递了currBuffer,currOffset,currLen
  • DelimitedInputFormat的readLine()方法里头会调用fillBuffer方法,fillBuffer方法会根据splitLength(DelimitedInputFormat.getStatistics方法里头FileInputSplit的length)及maxReadLength来确定要读取,然后从偏移开始到读取到从文件读取数据到readBuffer中,然后设置currBuffer,currOffset,currLen
  • readBuffer在初始的时候会设置bufferSize,bufferSize初始化的时候为-1,在getStatistics方法里头被设置为4 * 1024,而DEFAULT_READ_BUFFER_SIZE是1024 * 1024

CsvInputFormat.readRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvInputFormat.java

  1. @Override
  2. public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
  3. /*
  4. * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
  5. */
  6. // Found window's end line, so find carriage return before the newline
  7. if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
  8. //reduce the number of bytes so that the Carriage return is not taken as data
  9. numBytes--;
  10. }
  11. if (commentPrefix != null && commentPrefix.length <= numBytes) {
  12. //check record for comments
  13. boolean isComment = true;
  14. for (int i = 0; i < commentPrefix.length; i++) {
  15. if (commentPrefix[i] != bytes[offset + i]) {
  16. isComment = false;
  17. break;
  18. }
  19. }
  20. if (isComment) {
  21. this.commentCount++;
  22. return null;
  23. }
  24. }
  25. if (parseRecord(parsedValues, bytes, offset, numBytes)) {
  26. return fillRecord(reuse, parsedValues);
  27. } else {
  28. this.invalidLineCount++;
  29. return null;
  30. }
  31. }
  • CsvInputFormat的readRecord方法负责读取原始数据,之后通过parseRecord方法解析原始数据填充到parsedValues(Object[]),之后调用子类的fillRecord方法(这里是PojoCsvInputFormat)将parsededValues填充到重用对象(该对象是DataSourceTask在调用format.nextRecord时传入的serializer.createInstance())

PojoCsvInputFormat.fillRecord

flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/PojoCsvInputFormat.java

  1. /**
  2. * Input format that reads csv into POJOs.
  3. * @param <OUT> resulting POJO type
  4. */
  5. @Internal
  6. public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
  7. //......
  8. @Override
  9. public void open(FileInputSplit split) throws IOException {
  10. super.open(split);
  11. pojoFields = new Field[pojoFieldNames.length];
  12. Map<String, Field> allFields = new HashMap<String, Field>();
  13. findAllFields(pojoTypeClass, allFields);
  14. for (int i = 0; i < pojoFieldNames.length; i++) {
  15. pojoFields[i] = allFields.get(pojoFieldNames[i]);
  16. if (pojoFields[i] != null) {
  17. pojoFields[i].setAccessible(true);
  18. } else {
  19. throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
  20. }
  21. }
  22. }
  23. @Override
  24. public OUT fillRecord(OUT reuse, Object[] parsedValues) {
  25. for (int i = 0; i < parsedValues.length; i++) {
  26. try {
  27. pojoFields[i].set(reuse, parsedValues[i]);
  28. } catch (IllegalAccessException e) {
  29. throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
  30. }
  31. }
  32. return reuse;
  33. }
  34. //......
  35. }
  • PojoCsvInputFormat的open方法用于在执行者的executePlan的时候调用,提前使用反射获取所需的字段
  • fillRecord方法此处可以使用反射将解析的值设置到pojo中
  • 如果反射设置不成功则引发IllegalAccessException异常

CountingCollector.collect

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java

  1. public class CountingCollector<OUT> implements Collector<OUT> {
  2. private final Collector<OUT> collector;
  3. private final Counter numRecordsOut;
  4. public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
  5. this.collector = collector;
  6. this.numRecordsOut = numRecordsOut;
  7. }
  8. @Override
  9. public void collect(OUT record) {
  10. this.numRecordsOut.inc();
  11. this.collector.collect(record);
  12. }
  13. @Override
  14. public void close() {
  15. this.collector.close();
  16. }
  17. }
  • 这里的collector为org.apache.flink.runtime.operators.chaining.ChainedMapDriver

ChainedMapDriver

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java

  1. @Override
  2. public void collect(IT record) {
  3. try {
  4. this.numRecordsIn.inc();
  5. this.outputCollector.collect(this.mapper.map(record));
  6. } catch (Exception ex) {
  7. throw new ExceptionInChainedStubException(this.taskName, ex);
  8. }
  9. }
  • 这里会先调用mapper的地图方法,执行地图逻辑,然后调用outputCollector.collect将结果发送出去
  • 这里的outputCollector为CountingCollector,它里头包装的collector为org.apache.flink.runtime.operators.shipping.OutputCollector

输出收集器

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputCollector.java

  1. /**
  2. * Collects a record and emits it to all writers.
  3. */
  4. @Override
  5. public void collect(T record) {
  6. if (record != null) {
  7. this.delegate.setInstance(record);
  8. try {
  9. for (RecordWriter<SerializationDelegate<T>> writer : writers) {
  10. writer.emit(this.delegate);
  11. }
  12. }
  13. catch (IOException e) {
  14. throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
  15. }
  16. catch (InterruptedException e) {
  17. throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
  18. }
  19. }
  20. else {
  21. throw new NullPointerException("The system does not support records that are null."
  22. + "Null values are only supported as fields inside other objects.");
  23. }
  24. }
  • 这里调用RecordWriter的emit方法来发射数据

记录作家

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

  1. public void emit(T record) throws IOException, InterruptedException {
  2. for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
  3. sendToTarget(record, targetChannel);
  4. }
  5. }
  • 这里通过channelSelector.selectChannels返回要发送的targetChannel,这里的channelSelector为OutputEmitter

输出发射器

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputEmitter.java

  1. @Override
  2. public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
  3. switch (strategy) {
  4. case FORWARD:
  5. return forward();
  6. case PARTITION_RANDOM:
  7. case PARTITION_FORCED_REBALANCE:
  8. return robin(numberOfChannels);
  9. case PARTITION_HASH:
  10. return hashPartitionDefault(record.getInstance(), numberOfChannels);
  11. case BROADCAST:
  12. return broadcast(numberOfChannels);
  13. case PARTITION_CUSTOM:
  14. return customPartition(record.getInstance(), numberOfChannels);
  15. case PARTITION_RANGE:
  16. return rangePartition(record.getInstance(), numberOfChannels);
  17. default:
  18. throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
  19. }
  20. }
  21. private int[] forward() {
  22. return this.channels;
  23. }
  • 这里的strategy为FORWARD

小结

  • CsvReader创建的inputFormat为PojoCsvInputFormat,它主要的方法是fillRecord,利用反射填充数据,而数据的读取则是在DelimitedInputFormat的readLine方法中,它会调用fillBuffer方法,而fillBuffer方法会根据splitLength(DelimitedInputFormat.getStatistics方法里头FileInputSplit的length)以及maxReadLength来确定toRead,之后从偏移开始到toRead从文件读取数据到readBuffer中
  • DataSourceTask在调用方法里头会不断循环调用format.nextRecord,然后挨个调用output.collect方法(包装了org.apache.flink.runtime.operators.shipping.OutputCollector的CountingCollector),直到taskCanceled或者format.reachedEnd()
  • output.collect方法,这里的输出为CountingCollector,它代理的collector为ChainedMapDriver; ChainedMapDriver重新读取的数据进行地图操作,最后将地图的结果传递给代理了OutputCollector的CountingCollector,OutputCollector使用RecordWriter来发射数据

doc

  • CsvReader
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/779500
推荐阅读
相关标签
  

闽ICP备14008679号