赞
踩
本文主要研究一下flink的CsvReader
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
- .pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");
-
- csvInput.map(new MapFunction<RecordDto, RecordDto>() {
- @Override
- public RecordDto map(RecordDto value) throws Exception {
- LOGGER.info("execute map:{}",value);
- TimeUnit.SECONDS.sleep(5);
- return value;
- }
- }).print();
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java
- /**
- * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
- * define parameters and field types and will eventually produce the DataSet that corresponds to
- * the read and parsed CSV input.
- *
- * @param filePath The path of the CSV file.
- * @return A CsvReader that can be used to configure the CSV input.
- */
- public CsvReader readCsvFile(String filePath) {
- return new CsvReader(filePath, this);
- }
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.java
- public CsvReader(String filePath, ExecutionEnvironment executionContext) {
- this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
- }
-
- public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
- Preconditions.checkNotNull(filePath, "The file path may not be null.");
- Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
-
- this.path = filePath;
- this.executionContext = executionContext;
- }
-
- /**
- * Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
- * must be public or able to set value. The type information for the fields is obtained from the type class.
- *
- * @param pojoType The class of the target POJO.
- * @param pojoFields The fields of the POJO which are mapped to CSV fields.
- * @return The DataSet representing the parsed CSV data.
- */
- public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
- Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
- Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");
-
- final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
- if (!(ti instanceof PojoTypeInfo)) {
- throw new IllegalArgumentException(
- "The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
- }
- final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;
-
- CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);
-
- configureInputFormat(inputFormat);
-
- return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
- }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
- /**
- * The Task represents one execution of a parallel subtask on a TaskManager.
- * A Task wraps a Flink operator (which may be a user function) and
- * runs it, providing all services necessary for example to consume input data,
- * produce its results (intermediate result partitions) and communicate
- * with the JobManager.
- *
- * <p>The Flink operators (implemented as subclasses of
- * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
- * The task connects those to the network stack and actor messages, and tracks the state
- * of the execution and handles exceptions.
- *
- * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
- * are the first attempt to execute the task, or a repeated attempt. All of that
- * is only known to the JobManager. All the task knows are its own runnable code,
- * the task's configuration, and the IDs of the intermediate results to consume and
- * produce (if any).
- *
- * <p>Each Task is run by one dedicated thread.
- */
- public class Task implements Runnable, TaskActions, CheckpointListener {
- //......
-
- /**
- * The core work method that bootstraps the task and executes its code.
- */
- @Override
- public void run() {
- //......
- // now load and instantiate the task's invokable code
- invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
-
- // ----------------------------------------------------------------
- // actual task core work
- // ----------------------------------------------------------------
-
- // we must make strictly sure that the invokable is accessible to the cancel() call
- // by the time we switched to running.
- this.invokable = invokable;
-
- // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
- if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
- throw new CancelTaskException();
- }
-
- // notify everyone that we switched to running
- notifyObservers(ExecutionState.RUNNING, null);
- taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
-
- // make sure the user code classloader is accessible thread-locally
- executingThread.setContextClassLoader(userCodeClassLoader);
-
- // run the invokable
- invokable.invoke();
-
- //......
- }
- }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/DataSourceTask.java
- @Override
- public void invoke() throws Exception {
- // --------------------------------------------------------------------
- // Initialize
- // --------------------------------------------------------------------
- initInputFormat();
-
- LOG.debug(getLogString("Start registering input and output"));
-
- try {
- initOutputs(getUserCodeClassLoader());
- } catch (Exception ex) {
- throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
- ex.getMessage(), ex);
- }
-
- LOG.debug(getLogString("Finished registering input and output"));
-
- // --------------------------------------------------------------------
- // Invoke
- // --------------------------------------------------------------------
- LOG.debug(getLogString("Starting data source operator"));
-
- RuntimeContext ctx = createRuntimeContext();
-
- final Counter numRecordsOut;
- {
- Counter tmpNumRecordsOut;
- try {
- OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
- ioMetricGroup.reuseInputMetricsForTask();
- if (this.config.getNumberOfChainedStubs() == 0) {
- ioMetricGroup.reuseOutputMetricsForTask();
- }
- tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
- } catch (Exception e) {
- LOG.warn("An exception occurred during the metrics setup.", e);
- tmpNumRecordsOut = new SimpleCounter();
- }
- numRecordsOut = tmpNumRecordsOut;
- }
-
- Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
-
- if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
- ((RichInputFormat) this.format).setRuntimeContext(ctx);
- LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
- ((RichInputFormat) this.format).openInputFormat();
- LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
- }
-
- ExecutionConfig executionConfig = getExecutionConfig();
-
- boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
-
- LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
-
- final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
-
- try {
- // start all chained tasks
- BatchTask.openChainedTasks(this.chainedTasks, this);
-
- // get input splits to read
- final Iterator<InputSplit> splitIterator = getInputSplits();
-
- // for each assigned input split
- while (!this.taskCanceled && splitIterator.hasNext())
- {
- // get start and end
- final InputSplit split = splitIterator.next();
-
- LOG.debug(getLogString("Opening input split " + split.toString()));
-
- final InputFormat<OT, InputSplit> format = this.format;
-
- // open input format
- format.open(split);
-
- LOG.debug(getLogString("Starting to read input from split " + split.toString()));
-
- try {
- final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
-
- if (objectReuseEnabled) {
- OT reuse = serializer.createInstance();
-
- // as long as there is data to read
- while (!this.taskCanceled && !format.reachedEnd()) {
-
- OT returned;
- if ((returned = format.nextRecord(reuse)) != null) {
- output.collect(returned);
- }
- }
- } else {
- // as long as there is data to read
- while (!this.taskCanceled && !format.reachedEnd()) {
- OT returned;
- if ((returned = format.nextRecord(serializer.createInstance())) != null) {
- output.collect(returned);
- }
- }
- }
-
- if (LOG.isDebugEnabled() && !this.taskCanceled) {
- LOG.debug(getLogString("Closing input split " + split.toString()));
- }
- } finally {
- // close. We close here such that a regular close throwing an exception marks a task as failed.
- format.close();
- }
- completedSplitsCounter.inc();
- } // end for all input splits
-
- // close the collector. if it is a chaining task collector, it will close its chained tasks
- this.output.close();
-
- // close all chained tasks letting them report failure
- BatchTask.closeChainedTasks(this.chainedTasks, this);
-
- }
- catch (Exception ex) {
- // close the input, but do not report any exceptions, since we already have another root cause
- try {
- this.format.close();
- } catch (Throwable ignored) {}
-
- BatchTask.cancelChainedTasks(this.chainedTasks);
-
- ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
-
- if (ex instanceof CancelTaskException) {
- // forward canceling exception
- throw ex;
- }
- else if (!this.taskCanceled) {
- // drop exception, if the task was canceled
- BatchTask.logAndThrowException(ex, this);
- }
- } finally {
- BatchTask.clearWriters(eventualOutputs);
- // --------------------------------------------------------------------
- // Closing
- // --------------------------------------------------------------------
- if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
- ((RichInputFormat) this.format).closeInputFormat();
- LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
- }
- }
-
- if (!this.taskCanceled) {
- LOG.debug(getLogString("Finished data source operator"));
- }
- else {
- LOG.debug(getLogString("Data source operator cancelled"));
- }
- }
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DelimitedInputFormat.java
- /**
- * The default read buffer size = 1MB.
- */
- private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
-
- private transient byte[] readBuffer;
-
- private int bufferSize = -1;
-
- private void initBuffers() {
- this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
-
- if (this.bufferSize <= this.delimiter.length) {
- throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
- }
-
- if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
- this.readBuffer = new byte[this.bufferSize];
- }
- if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
- this.wrapBuffer = new byte[256];
- }
-
- this.readPos = 0;
- this.limit = 0;
- this.overLimit = false;
- this.end = false;
- }
-
- /**
- * Checks whether the current split is at its end.
- *
- * @return True, if the split is at its end, false otherwise.
- */
- @Override
- public boolean reachedEnd() {
- return this.end;
- }
-
- @Override
- public OT nextRecord(OT record) throws IOException {
- if (readLine()) {
- return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
- } else {
- this.end = true;
- return null;
- }
- }
-
- /**
- * Fills the read buffer with bytes read from the file starting from an offset.
- */
- private boolean fillBuffer(int offset) throws IOException {
- int maxReadLength = this.readBuffer.length - offset;
- // special case for reading the whole split.
- if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
- int read = this.stream.read(this.readBuffer, offset, maxReadLength);
- if (read == -1) {
- this.stream.close();
- this.stream = null;
- return false;
- } else {
- this.readPos = offset;
- this.limit = read;
- return true;
- }
- }
-
- // else ..
- int toRead;
- if (this.splitLength > 0) {
- // if we have more data, read that
- toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
- }
- else {
- // if we have exhausted our split, we need to complete the current record, or read one
- // more across the next split.
- // the reason is that the next split will skip over the beginning until it finds the first
- // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
- // previous split.
- toRead = maxReadLength;
- this.overLimit = true;
- }
-
- int read = this.stream.read(this.readBuffer, offset, toRead);
-
- if (read == -1) {
- this.stream.close();
- this.stream = null;
- return false;
- } else {
- this.splitLength -= read;
- this.readPos = offset; // position from where to start reading
- this.limit = read + offset; // number of valid bytes in the read buffer
- return true;
- }
- }
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvInputFormat.java
- @Override
- public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
- /*
- * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
- */
- // Found window's end line, so find carriage return before the newline
- if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
- //reduce the number of bytes so that the Carriage return is not taken as data
- numBytes--;
- }
-
- if (commentPrefix != null && commentPrefix.length <= numBytes) {
- //check record for comments
- boolean isComment = true;
- for (int i = 0; i < commentPrefix.length; i++) {
- if (commentPrefix[i] != bytes[offset + i]) {
- isComment = false;
- break;
- }
- }
- if (isComment) {
- this.commentCount++;
- return null;
- }
- }
-
- if (parseRecord(parsedValues, bytes, offset, numBytes)) {
- return fillRecord(reuse, parsedValues);
- } else {
- this.invalidLineCount++;
- return null;
- }
- }
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/PojoCsvInputFormat.java
- /**
- * Input format that reads csv into POJOs.
- * @param <OUT> resulting POJO type
- */
- @Internal
- public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
-
- //......
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
-
- pojoFields = new Field[pojoFieldNames.length];
-
- Map<String, Field> allFields = new HashMap<String, Field>();
-
- findAllFields(pojoTypeClass, allFields);
-
- for (int i = 0; i < pojoFieldNames.length; i++) {
- pojoFields[i] = allFields.get(pojoFieldNames[i]);
-
- if (pojoFields[i] != null) {
- pojoFields[i].setAccessible(true);
- } else {
- throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
- }
- }
- }
-
- @Override
- public OUT fillRecord(OUT reuse, Object[] parsedValues) {
- for (int i = 0; i < parsedValues.length; i++) {
- try {
- pojoFields[i].set(reuse, parsedValues[i]);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
- }
- }
- return reuse;
- }
-
- //......
- }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
- public class CountingCollector<OUT> implements Collector<OUT> {
- private final Collector<OUT> collector;
- private final Counter numRecordsOut;
-
- public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
- this.collector = collector;
- this.numRecordsOut = numRecordsOut;
- }
-
- @Override
- public void collect(OUT record) {
- this.numRecordsOut.inc();
- this.collector.collect(record);
- }
-
- @Override
- public void close() {
- this.collector.close();
- }
- }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
- @Override
- public void collect(IT record) {
- try {
- this.numRecordsIn.inc();
- this.outputCollector.collect(this.mapper.map(record));
- } catch (Exception ex) {
- throw new ExceptionInChainedStubException(this.taskName, ex);
- }
- }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputCollector.java
- /**
- * Collects a record and emits it to all writers.
- */
- @Override
- public void collect(T record) {
- if (record != null) {
- this.delegate.setInstance(record);
- try {
- for (RecordWriter<SerializationDelegate<T>> writer : writers) {
- writer.emit(this.delegate);
- }
- }
- catch (IOException e) {
- throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
- }
- }
- else {
- throw new NullPointerException("The system does not support records that are null."
- + "Null values are only supported as fields inside other objects.");
- }
- }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
- public void emit(T record) throws IOException, InterruptedException {
- for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
- sendToTarget(record, targetChannel);
- }
- }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
- @Override
- public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
- switch (strategy) {
- case FORWARD:
- return forward();
- case PARTITION_RANDOM:
- case PARTITION_FORCED_REBALANCE:
- return robin(numberOfChannels);
- case PARTITION_HASH:
- return hashPartitionDefault(record.getInstance(), numberOfChannels);
- case BROADCAST:
- return broadcast(numberOfChannels);
- case PARTITION_CUSTOM:
- return customPartition(record.getInstance(), numberOfChannels);
- case PARTITION_RANGE:
- return rangePartition(record.getInstance(), numberOfChannels);
- default:
- throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
- }
- }
-
- private int[] forward() {
- return this.channels;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。