赞
踩
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
随着业务的数据量越来越多,采用的数据库也多样,SeaTunnel【简称ST】开源项目也充当一个数据集成的角色工具。分析SeaTunnel源码的同时也更熟悉flink的使用,这篇文章讲解seatunnel-connectors-flink-file这小模块的代码。
提示:以下是本篇文章正文内容,下面源码分析可供参考,如有出错请指正!
源码如下(根据滚动策略、输出文件配置来创建一个流式文件系统的连接器:StreamingFileSink):
@Override public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) { final DefaultRollingPolicy<Row, String> rollingPolicy = DefaultRollingPolicy.builder() //自定义配置或者默认文件大小已达到1GB就会滚动分区文件 .withMaxPartSize(MB * TypesafeConfigUtils.getConfig(config, MAX_PART_SIZE, DEFAULT_MAX_PART_SIZE)) //自定义配置或者默认至少包含60分钟的数据就会滚动分区文件 .withRolloverInterval( TimeUnit.MINUTES.toMillis(TypesafeConfigUtils.getConfig(config, ROLLOVER_INTERVAL, DEFAULT_ROLLOVER_INTERVAL))) .build(); OutputFileConfig outputFileConfig = OutputFileConfig.builder() .withPartPrefix(TypesafeConfigUtils.getConfig(config, PART_PREFIX, DEFAULT_PART_PREFIX)) .withPartSuffix(TypesafeConfigUtils.getConfig(config, PART_SUFFIX, DEFAULT_PART_SUFFIX)) .build(); //流式文件系统的连接器:StreamingFileSink<Row> //行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder) final StreamingFileSink<Row> sink = StreamingFileSink .forRowFormat(filePath, new SimpleStringEncoder<Row>()) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig) .build(); dataStream.addSink(sink); }
源码如下():
@Override public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) { //设置文件输出的格式【json\csv\txt】 FormatType format = FormatType.from(config.getString(FORMAT).trim().toLowerCase()); switch (format) { case JSON: RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType(); outputFormat = new JsonRowOutputFormat(filePath, rowTypeInfo); break; case CSV: outputFormat = new CsvRowOutputFormat(filePath); break; case TEXT: outputFormat = new TextOutputFormat<>(filePath); break; default: LOGGER.warn(" unknown file_format [{}],only support json,csv,text", format); break; } //设置写入模式【overwrite\no_overwrite】 if (config.hasPath(WRITE_MODE)) { String mode = config.getString(WRITE_MODE); outputFormat.setWriteMode(FileSystem.WriteMode.valueOf(mode)); } DataSink<Row> dataSink = dataSet.output(outputFormat); //设置并行度 if (config.hasPath(PARALLELISM)) { int parallelism = config.getInt(PARALLELISM); dataSink.setParallelism(parallelism); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。