赞
踩
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.connector.file.src.reader.StreamFormat;
- import org.apache.flink.connector.file.src.reader.TextLineFormat;
- import org.apache.flink.core.fs.FSDataInputStream;
-
- import javax.annotation.Nullable;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
-
-
- public class MyStreamFormat implements StreamFormat<String> {
- private static final long serialVersionUID = 1L;
-
- public static final String DEFAULT_CHARSET_NAME = "UTF-8";
-
- private final String charsetName;
-
- public MyStreamFormat() {
- this(DEFAULT_CHARSET_NAME);
- }
-
- public MyStreamFormat(String charsetName) {
- this.charsetName = charsetName;
- }
-
- @Override
- public Reader createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException {
- final BufferedReader reader =
- new BufferedReader(new InputStreamReader(stream, charsetName));
- return new MyStreamFormat.Reader(reader);
- }
-
- @Override
- public Reader restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException {
- stream.seek(restoredOffset);
- return createReader(config, stream,fileLen,splitEnd);
- }
-
- @Override
- public boolean isSplittable() {
- return true;
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return Types.STRING;
- }
-
- public static final class Reader implements StreamFormat.Reader<String> {
-
- private final BufferedReader reader;
-
- Reader(final BufferedReader reader) {
- this.reader = reader;
- }
-
- @Nullable
- @Override
- public String read() throws IOException {
- return reader.readLine();
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。