当前位置:   article > 正文

flink之addSource & fromSource 、addSink & SinkTo

addsink
一、addSource & fromSource 、addSink & SinkTo
       这两组算子区别在于:addSource和addSink需要自己实现SourceFunction或者是SinkFunction,其中读取数据的逻辑,容错等都需要自己实现;fromSource和SinkTo,是flink提供的简易的读取和输出的算子,建议优先使用fromSource和SinkTo,并结合flink官方文档;
二、filesystem source算子
1.readTextFile( filePath: String, charsetName: String ):底层调用的是 readFile(format , filePath , FileProcessingMode. PROCESS_ONCE , - 1 , typeInfo)
2.readFile( FileInputFormat< OUT > inputFormat , String filePath , FileProcessingMode watchType , long interval , FilePathFilter filter
①FileInputFormat<OUT> inputFormat:定义读取文件的类,具体可以看有哪些实现类,根据需要读取文件的类型定,也可以自定义;
②String filePath:文件路径
③FileProcessingMode watchType:定义读取文件的模式,读一次:FileProcessingMode. PROCESS_ONCE 读多次:FileProcessingMode . PROCESS_CONTINUOUSLY
④long interval:读取文件的的时间间隔,batch模式设置为-1,单位毫秒
⑤FilePathFilter filter:过滤文件,标记为 @Deprecated
以上两种source底层源码并行度都为1,而且都是调用的addSource传入的SourceFunction;说个题外话,在1.14以前flink Kafka都是使用的是addSource,实现的是ParalismSourceFunction以及一些容错的类,1.14发布以后采用的fromSource,使用的架构是 分片(Splits) 分片枚举器(SplitEnumerator)  以及  源阅读器(SourceReader)
3.FileSource
A ·batch模式
val fileSource: FileSource[String] = FileSource
. forBulkFileFormat ()
.build()
B.stream模式
依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.12.5</version>
</dependency>
代码:
val fileSource: FileSource[String] = FileSource
. forRecordStreamFormat (new TextLineFormat(), new Path(""))
.monitorContinuously(Duration. ofSeconds (1000))
.build()
这种方式可以并行读取文件,并且也做好了容错等,但是具体的逻辑请看flink官方代码, 需要注意的是如果使用的是 TextLineFormat,因为其父类 SimpleStreamFormat的isSplittable方法返回的是false所以即使fromSource是多并行度的是,但是仍然不可以并行读取文件;但是可以但是可以重写StreamFormat,下面是一个参照TextLineFormat简单的实现
  1. import org.apache.flink.api.common.typeinfo.TypeInformation;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.connector.file.src.reader.StreamFormat;
  5. import org.apache.flink.connector.file.src.reader.TextLineFormat;
  6. import org.apache.flink.core.fs.FSDataInputStream;
  7. import javax.annotation.Nullable;
  8. import java.io.BufferedReader;
  9. import java.io.IOException;
  10. import java.io.InputStreamReader;
  11. public class MyStreamFormat implements StreamFormat<String> {
  12. private static final long serialVersionUID = 1L;
  13. public static final String DEFAULT_CHARSET_NAME = "UTF-8";
  14. private final String charsetName;
  15. public MyStreamFormat() {
  16. this(DEFAULT_CHARSET_NAME);
  17. }
  18. public MyStreamFormat(String charsetName) {
  19. this.charsetName = charsetName;
  20. }
  21. @Override
  22. public Reader createReader(Configuration config, FSDataInputStream stream, long fileLen, long splitEnd) throws IOException {
  23. final BufferedReader reader =
  24. new BufferedReader(new InputStreamReader(stream, charsetName));
  25. return new MyStreamFormat.Reader(reader);
  26. }
  27. @Override
  28. public Reader restoreReader(Configuration config, FSDataInputStream stream, long restoredOffset, long fileLen, long splitEnd) throws IOException {
  29. stream.seek(restoredOffset);
  30. return createReader(config, stream,fileLen,splitEnd);
  31. }
  32. @Override
  33. public boolean isSplittable() {
  34. return true;
  35. }
  36. @Override
  37. public TypeInformation<String> getProducedType() {
  38. return Types.STRING;
  39. }
  40. public static final class Reader implements StreamFormat.Reader<String> {
  41. private final BufferedReader reader;
  42. Reader(final BufferedReader reader) {
  43. this.reader = reader;
  44. }
  45. @Nullable
  46. @Override
  47. public String read() throws IOException {
  48. return reader.readLine();
  49. }
  50. @Override
  51. public void close() throws IOException {
  52. reader.close();
  53. }
  54. }
  55. }

二、filesystem sink 算子
StreamFileSink,1.14版本之后是FileSink,这里以前者为例
两种写入模式:forRowFormat、forBulkFormat
1.forRowFormat、forBulkFormat的构造
①forRowFormat(final Path basePath, final Encoder<IN> encoder)
行模式下,自定义内容限定于文件内部,想对文件进行压缩等操作,则很难办到;
//这个类只有一个方法
public interface Encoder<IN> extends Serializable {
    void encode(IN element, OutputStream stream) throws IOException;
}
②forBulkFormat( final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory)
列模式下,不仅可以对文件内部操作,也可以轻松做到对文件压缩等操作;
public class Mybucket<T> implements BulkWriter<T>{
   
    FSDataOutputStream fsDataOutputStream=null;
    GzipCompressorOutputStream gzout=null;
    @Override
    //每条数据通过流写入文件
    public void addElement(T element) throws IOException {
         gzout.write(element.toString().getBytes());
    }
    //刷新流,如果考虑效率问题这里可以不刷,到flinish()方法刷也行
    @Override
    public void flush() throws IOException {
         gzout.flush;
    }
    //关闭流
    //注意:此方法不能关闭Factory传入的流,这是框架完成的事!!!
    @Override
    public void finish() throws IOException {
         gzout.close;
    }
    
    //创建一个writer
    //这里将类单独写也行,无非就是目前的外部类多一个构造方法
    class MyFactory implements Factory<T>{
            @Override
            public Mybucket<T> create(FSDataOutputStream out) throws IOException {
                fsDataOutputStream=out;
                GzipCompressorOutputStream gzipOutput = new GzipCompressorOutputStream(output);
                return Mybucket.this;
            }}}
2.withBucketAssigner():
①解释:指定分桶策略,所谓桶,即数据应该去往哪个文件夹; 行模式和列模式是通用的;
②参数:BucketAssigner,其实现类有三个,:
BasePathBucketAssigner:
//直接再给定的路径下生成文件,不会生成文件夹
public String getBucketId(T element, BucketAssigner.Context context) {return "";}
DateTimeBucketAssigner:
//日期格式( 即 桶大小)和时区都可以手动配置。(见其构造方法)
//生成文件夹:yyyy-MM-dd--HH,所以是按小时滚动桶的
public String getBucketId(IN element, BucketAssigner.Context context) {
        if (dateTimeFormatter == null) {
            dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
        }
        return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
    }
自定义BucketAssigner:这个需要根据自己的业务逻辑去看继承哪个,如果只是按数据分桶那可以直接继承BasePathBucketAssigner重写getBucketId,如果是需要自定义时间可以继承DateTimeBucketAssigner重写getBucketId,如果业务逻辑进一步复杂那就重写BucketAssigner,下面简单简绍BucketAssigner的两个方法:
//决定了每条数据应该去往哪个文件夹,没有这个文件夹会自动创建,最终数据写入路径是该方法返回值+给定的路径
//context可以获取一些时间
BucketID getBucketId(IN element, BucketAssigner.Context context);
//序列化/反序列化getBucketId返回值
SimpleVersionedSerializer<BucketID> getSerializer();
3. withRollingPolicy:
①解释: RollingPolicy  定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后在转换为 Finished 状态。 Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在  STREAMING  模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量。在  BATCH  模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。 其中列模式( forBulkFormat )只能使用 CheckpointRollingPolicy
②参数
RollingPolicy(所有RollingPolicy的父类)
public interface RollingPolicy<IN, BucketID> extends Serializable {
    //Determines if the in-progress part file for a bucket should roll on every checkpoint.(if true in-progress move to pending)
    //文件从pending状态到finished状态与此毫不相干
    boolean shouldRollOnCheckpoint(final PartFileInfo<BucketID> partFileState) throws IOException;
    //Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.(if true in-progress move to pending)
    boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element)
            throws IOException;
    // Determines if the in-progress part file for a bucket should roll based on a time condition.(if true in-progress move to pending)
    boolean shouldRollOnProcessingTime(
            final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException;
}
CheckpointRollingPolicy
列模式虽然只能使用抽象类CheckpointRollingPolicy(它是RollingPolicy的一个实现,重写了 shouldRollOnCheckpoint返回为true ), CheckpointRollingPolicy只有一个子类 OnCheckpointRollingPolicy(此类中 shouldRollOnEvent、 shouldRollOnProcessingTime返回为false );如果在列模式下,不想按照checkpoint进行滚动文件,那么可以试试继承 CheckpointRollingPolicy后 重写了 shouldRollOnCheckpoint返回为false;
DefaultRollingPolicy,采用builder架构
DefaultRollingPolicy.builder()
          //设置一个文件最大开启时间,超过时间则滚动
          .withRolloverInterval(Duration.ofMinutes(15))
          //设置一个文件无数据写入的时间,超过时间则滚动
          .withInactivityInterval(Duration.ofMinutes(5))
          //设置一个文件最大的容量,超过容量则滚动
          .withMaxPartSize(MemorySize.ofMebiBytes(1024))
          .build()
4. withOutputFileConfig:
①解释:为生成的文件添加前缀后缀; 行模式和列模式是通用的;
.withOutputFileConfig(OutputFileConfig
      .builder()
      .withPartPrefix("gouba-")
      .withPartSuffix(".gz")
      .build())
5. enableCompact:
①解释:合并生成的finished文件; 行模式和列模式是通用的;
②参数:FileCompactStrategy,FileCompactor
FileCompactStrategy
FileCompactStrategy.Builder.newBuilder()
                  //相隔几个checkpoint做一次合并,默认1
          .enableCompactionOnCheckpoint(3)
         //合并文件使用多少个线程,默认1
          .setNumCompactThreads(4)
          .build()
FileCompactor
※ IdenticalFileCompactor:直接复制一个文件的内容,到另一个文件,一次只能复制一个文件。
※ ConcatFileCompactor:可以自定义两个文件直接的分割符,由构造方法传入。
※ RecordWiseFileCompactor:自定义内容比较多
6. withBucketCheckInterval(mills)
解释:根据RollingPolicy检查是否应该关闭in-progress文件,以系统时间0秒时刻开始算的。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/615095
推荐阅读
相关标签
  

闽ICP备14008679号