当前位置:   article > 正文

SeaTunnel 2.1.2的源码解析(2)seatunnel-connectors-flink-file_seatunnel和flink

seatunnel和flink

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


本文已参与「开源摘星计划」,欢迎正在阅读的你加入。活动链接:https://github.com/weopenprojects/WeOpen-Star

随着业务的数据量越来越多,采用的数据库也多样,SeaTunnel【简称ST】开源项目也充当一个数据集成的角色工具。分析SeaTunnel源码的同时也更熟悉flink的使用,这篇文章讲解seatunnel-connectors-flink-file这小模块的代码。


提示:以下是本篇文章正文内容,下面源码分析可供参考,如有出错请指正!

1、总览

SeaTunnel2.1.2的源码分析

2、ST封装flink连接文件的源码分析

1.FileSink类的outputStream流输出函数

源码如下(根据滚动策略、输出文件配置来创建一个流式文件系统的连接器:StreamingFileSink):

@Override
    public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
        final DefaultRollingPolicy<Row, String> rollingPolicy = DefaultRollingPolicy.builder()
      			 //自定义配置或者默认文件大小已达到1GB就会滚动分区文件
                .withMaxPartSize(MB * TypesafeConfigUtils.getConfig(config, MAX_PART_SIZE, DEFAULT_MAX_PART_SIZE)) 
                //自定义配置或者默认至少包含60分钟的数据就会滚动分区文件
                .withRolloverInterval(
                        TimeUnit.MINUTES.toMillis(TypesafeConfigUtils.getConfig(config, ROLLOVER_INTERVAL, DEFAULT_ROLLOVER_INTERVAL))) 
                .build();
        OutputFileConfig outputFileConfig = OutputFileConfig.builder()
                .withPartPrefix(TypesafeConfigUtils.getConfig(config, PART_PREFIX, DEFAULT_PART_PREFIX))
                .withPartSuffix(TypesafeConfigUtils.getConfig(config, PART_SUFFIX, DEFAULT_PART_SUFFIX))
                .build();
        //流式文件系统的连接器:StreamingFileSink<Row>
        //行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)
        final StreamingFileSink<Row> sink = StreamingFileSink
                .forRowFormat(filePath, new SimpleStringEncoder<Row>())
                .withRollingPolicy(rollingPolicy)
                .withOutputFileConfig(outputFileConfig)
                .build();
        dataStream.addSink(sink);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2.FileSink类的outputBatch批输出函数

源码如下():

@Override
    public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
        //设置文件输出的格式【json\csv\txt】
        FormatType format = FormatType.from(config.getString(FORMAT).trim().toLowerCase());
        switch (format) {
            case JSON:
                RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType();
                outputFormat = new JsonRowOutputFormat(filePath, rowTypeInfo);
                break;
            case CSV:
                outputFormat = new CsvRowOutputFormat(filePath);
                break;
            case TEXT:
                outputFormat = new TextOutputFormat<>(filePath);
                break;
            default:
                LOGGER.warn(" unknown file_format [{}],only support json,csv,text", format);
                break;

        }
        //设置写入模式【overwrite\no_overwrite】
        if (config.hasPath(WRITE_MODE)) {
            String mode = config.getString(WRITE_MODE);
            outputFormat.setWriteMode(FileSystem.WriteMode.valueOf(mode));
        }

        DataSink<Row> dataSink = dataSet.output(outputFormat);
        //设置并行度
        if (config.hasPath(PARALLELISM)) {
            int parallelism = config.getInt(PARALLELISM);
            dataSink.setParallelism(parallelism);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/903119
推荐阅读
相关标签
  

闽ICP备14008679号