赞
踩
Flink做为数据处理引擎,要把最终处理好的数据写入外部存储,为外部系统或应用提供支持。与输入算子Source相对应的,输出算子为Sink。
前面一直在用的print就是一种Sink,用来将数据流写到控制台打印
Flink程序中所有对外的输出操作,利用Sink算子完成
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法
stream.addSink(new SinkFunction(…));
//重写SinkFunction接口的invoke方法,用来将指定的值写入到外部系统中
//invoke方法在每条数据记录到来时都会调用。
Flink1.12开始,Sink算子的创建是通过调用DataStream的.sinkTo()方法
stream.sinkTo(…)
Flink官网为我们提供了一部分的框架的Sink连接器:
source/sink即可读可写,能做为数据源连接,也能做为下游去输出。
先引入Flink流式文件系统的连接器FileSink的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder):
下面演示实现读往d盘下的tmp目录写数据(tmp目录不用提前创建,不存在会自动创建):
public class SinkFile { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每个目录中,都有 并行度个数的 文件是正在写入状态 env.setParallelism(1); // 必须开启checkpoint,否则文件一直都是 .inprogress状态,即正在写入 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); //生成器模拟一个数据源 DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>( new GeneratorFunction<Long, String>() { @Override public String map(Long value) throws Exception { return "Number:" + value; } }, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1000), //每秒生成1000条数据 Types.STRING ); DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator"); // 输出到文件系统 FileSink<String> fieSink = FileSink // 输出行式存储的文件,指定路径、指定编码 .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8")) // 输出文件的一些配置: 文件名的前缀、后缀,new也行,这里展示build方式创建配置对象 .withOutputFileConfig( OutputFileConfig.builder() .withPartPrefix("code9527") .withPartSuffix(".log") .build() ) // 按照目录分桶:如下,就是每个小时一个目录。ZoneId.systemDefault()即系统默认时区,也可是ZoneId类中的其他时区 .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault())) // 文件滚动策略: 1分钟 或 1m .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(Duration.ofMinutes(1)) .withMaxPartSize(new MemorySize(1024*1024)) .build() ) .build(); dataGen.sinkTo(fieSink); env.execute(); } }
运行,看下效果:inprocess,此时文件正在写入数据,不可读。一个这个inprocess文件,因为上面并行度设置的1
总结:重点还是FileSink对象的创建
输出行/批文件存储的文件,可指定文件路径、文件编码、文件前后缀
按目录分桶,传参的接口实现类对象自选,demo中是按照时间给文件夹命名
特别注意文件滚动策略,是达到指定时间或者文件到达指定大小,是或的关系
FileSink对象创建完后,直接流对象调用sinkTo即可完成写入到文件的动作
添加KafKa连接器的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
以下用socket模拟无界流,来演示数据输出到KafKa:
public class SinkKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 如果是精准一次,必须开启checkpoint,否则无法写入Kafka env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); SingleOutputStreamOperator<String> sensorDS = env .socketTextStream("node1", 9527); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() // 指定 kafka 的地址和端口 .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 指定序列化器:指定Topic名称、具体的序列化 .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setTopic("topic1") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) // 写到kafka的一致性级别: 精准一次、至少一次 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 如果是精准一次,必须设置 事务的前缀 .setTransactionalIdPrefix("test-") // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"") .build(); sensorDS.sinkTo(kafkaSink); env.execute(); } }
关于 Kafka Sink,如果要使用精准一次写入Kafka,需要满足以下条件,缺一不可
如果要指定写入kafka的key,可以自定义序列化器:
public class SinkKafkaWithKey { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(RestartStrategies.noRestart()); SingleOutputStreamOperator<String> sensorDS = env .socketTextStream("node1", 9527); /** *指定写入kafka的key,可以自定义序列化器: */ KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") .setRecordSerializer( new KafkaRecordSerializationSchema<String>() { @Nullable @Override public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) { String[] datas = element.split(","); //输入的测试数据格式为a,b,c,所以这里先分割一下 byte[] key = datas[0].getBytes(StandardCharsets.UTF_8); byte[] value = element.getBytes(StandardCharsets.UTF_8); return new ProducerRecord<>("topic1", key, value); } } ) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setTransactionalIdPrefix("test-") .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "") .build(); sensorDS.sinkTo(kafkaSink); env.execute(); } }
添加MySQL驱动依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
再引入flink-jdbc连接器依赖:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
PS:
教学视频中提到了另一种情况,这里记录下。即:官方还未提供flink-connector-jdbc的某高版本的正式依赖,如1.17.0(当前时间已有),暂时从apache snapshot仓库下,因此引入依赖前,先在pom文件中指定仓库路径
<repositories>
<repository>
<id>apache-snapshots</id> <!--这个id后面setting.xml里有用-->
<name>apache-snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
</repositories>
再引入flink-jdbc连接器依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.17-SNAPSHOT</version>
</dependency>
如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加!apache-snapshots
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*,!apache-snapshots</mirrorOf> <!--即除了apache-snapshots,其余的都去阿里仓库下,!即排除,后面的名称是pom中定义的那个-->
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
根据你的数据类型,建立对应结构的表,这里根据要接收的自定义对象WaterSensor建表test:
mysql>
CREATE TABLE `ws` (
`id` varchar(100) NOT NULL,
`ts` bigint(20) DEFAULT NULL,
`vc` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
输出到MySQL的Demo代码:
public class SinkMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<WaterSensor> sensorDS = env .socketTextStream("node01", 9527) .map(new WaterSensorMapFunction()); //输入的信息映射转为自定义的WaterSensor实体类对象 SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink( "insert into ws values(?,?,?)", new JdbcStatementBuilder<WaterSensor>() { @Override public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException { //每收到一条WaterSensor,如何去填充占位符 preparedStatement.setString(1, waterSensor.getId()); preparedStatement.setLong(2, waterSensor.getTs()); preparedStatement.setInt(3, waterSensor.getVc()); } }, JdbcExecutionOptions.builder() .withMaxRetries(3) // 重试次数 .withBatchSize(100) // 批次的大小:条数 .withBatchIntervalMs(3000) // 批次的时间 .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://node01:3306/testDB?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("admin123") .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间 .build() ); sensorDS.addSink(jdbcSink); env.execute(); } }
总结: 写入mysql时注意只能用老的sink写法: addsink,此外JdbcSink的4个参数:
运行,输入数据,查看MySQL:
现有的Flink连接器不能满足需求时,需要自定义连接器进行输出。与Source类似,Flink提供了通用的SinkFunction
接口和对应的RichSinkDunction抽象类,实现这个接口,就可通过DataStream的.addSink()方法自定义写入任何的外部存储。
public class MySinkFunction implements SinkFunction<String>{
@Override
public void invoke(String value, Context context) throws Exception{
//输出逻辑
//value即流中的数据,来一条数据,invoke方法就被调用一次(所以不要在这里创建连接对象)
//如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象
}
}
stream.addSink(new MySinkFunction<String>());
来一条数据,invoke方法就被调用一次,如果你的外部存储必须先创建连接对象,那就用富函数的生命周期方法去创建连接对象:
public class MySinkFunction implements RichSinkFunction<String>{ Connection connection = null; @Overrdie public void open(Configuration parameters) throws Exception{ connection = new xxConnection(xx); } @Override public void close() throws Exception{ super.close(); } @Override public void invoke(String value, Context context) throws Exception{ //输出逻辑 connection.executeXXX(xxx); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。