赞
踩
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
官方网址:https://flink.apache.org/
学习资料:https://flink-learning.org.cn/
package batch.sink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lwh * @date 2023/5/5 * @description 到本地集合 **/ public class SinkToLocalCollectionDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5, 6); // 打印 source.print(); // 带前缀打印 source.print("我是前缀>>>"); // 打印到stderr中 source.printToErr(); // 打印到stderr并带前缀 source.printToErr("我是stderr前缀>>>"); env.execute(); } }
通过writeAsText将数据写出。
支持本地文件和HDFS
package batch.sink; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author lwh * @date 2023/5/5 * @description 基于文件的sink **/ public class SinkToLocalFileAndHDFSDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple3<Integer, String, Double>> source = env.fromElements( Tuple3.of(19, "潇潇", 170.50), Tuple3.of(11, "甜甜", 168.8), Tuple3.of(16, "刚刚", 178.8), Tuple3.of(19, "蛋蛋", 179.99) ); /* 写文件可以设置为并行度为1, 避免产生出来多个文件 */ // 写出到本地文件 source.writeAsText("data/output/SinkToLocalFileAndHDFSDemo.txt", FileSystem.WriteMode.OVERWRITE) .setParallelism(1); // 写出到hdfs source.writeAsText("hdfs://node1:8020/output/SinkToLocalFileAndHDFSDemo.txt", FileSystem.WriteMode.OVERWRITE) .setParallelism(1); // 写出为csv source.writeAsCsv("data/output/SinkToLocalFileAndHDFSDemo.csv", FileSystem.WriteMode.OVERWRITE, "\n", ",").setParallelism(1); env.execute(); } }
通过上面的代码可以看出,writeAsText已经废弃,推荐使用StreamingFileSink
参见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/streamfile_sink.html
这个连接器提供了一个 Sink 来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。
Streaming File Sink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。
桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含一个部分文件(part file)。额外的部分文件(part file)将根据滚动策略创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。
使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
由于我们用的是流任务,那么任务会一直持续进行,数据也会持续不断的写出,由于数据是源源不断的产生,那么就需要给数据设立边界,让其完成某个文件数据的写出。不然某个文件会一直处于写入状态中。
那么StreamingFileSink就是一个写出流数据的类
它会将数据分桶(分part)写出到文件中,按照指定规则(时间、文件大小等),完成某一part的写入过程。
比如:每隔1小时或者每当文件大小达到比如1GB的时候,就完成当前文件的写入,将状态标记为Finished,然后开启一个新文件继续写流数据。
数据在写出之前,在Flink内部会按照各个子任务(并行)划分数据桶,每个桶可以包含多个part文件
文件在写的过程中有3个状态:
StreamingFileSink 支持行编码格式和批量编码格式,比如 Apache Parquet 。这两种变体随附了各自的构建器,可以使用以下静态方法创建:
• Row-encoded sink: StreamingFileSink.forRowFormat(basePath, rowEncoder)
一次写入一行数据
• Bulk-encoded sink: StreamingFileSink.forBulkFormat(basePath, bulkWriterFactory)
一次写入一批数据, 如parquet、avro
简单理解:如何划分桶
桶分配逻辑定义了如何将数据结构化为基本输出目录中的子目录
行格式和批量格式都使用 DateTimeBucketAssigner 作为默认的分配器。 默认情况下,DateTimeBucketAssigner 基于系统默认时区每小时创建一个桶,格式如下: yyyy-MM-dd–HH 。日期格式(即桶的大小)和时区都可以手动配置。
我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner 。
Flink 有两个内置的 BucketAssigners :
• DateTimeBucketAssigner :默认基于时间的分配器
• BasePathBucketAssigner :将所有部分文件(part file)存储在基本路径中的分配器(单个全局桶)
内置的不满足需求可以自定义实现BucketAssigner
简单理解:啥时候(按时间、按大小等)算完成1个文件的写入。
滚动策略 RollingPolicy 定义了指定的文件在何时关闭(closed)并将其变为 Pending 状态,随后变为 Finished 状态。处于 Pending 状态的文件会在下一次 Checkpoint 时变为 Finished 状态,通过设置 Checkpoint 间隔时间,可以控制部分文件(part file)对下游读取者可用的速度、大小和数量。
Flink 有两个内置的滚动策略:
• DefaultRollingPolicy
核心策略:
• OnCheckpointRollingPolicy
核心策略:
当进行一次CheckPoint活动的时候,完成当前文件的写入(跟随检查点的节奏走)
内置的不满足需求可以自定义实现RollingPolicy
package batch.sink; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; /** * @author lwh * @date 2023/5/5 * @description StreamingFileSink **/ public class StreamingFileSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE); env.setStateBackend(new FsStateBackend( "file:///D:\\checkpoint")); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Socket Source DataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999); // 泛型指的是处理的数据类型是什么 StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path("data/output/sink3"), // 文件写出的路径 // 文件写出的序列化器和编码 new SimpleStringEncoder<String>("UTF-8")) // 桶分配策略 .withBucketAssigner(new BasePathBucketAssigner<String>()) // 文件滚动(完成一次文件写出)的策略 .withRollingPolicy( OnCheckpointRollingPolicy.build() ).build(); socketTextStream.addSink(sink); env.execute(); } }
示例
数据写出到Kafka中
使用:FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema()); 来定义一个kafka的sink对象
开发步骤
package batch.sink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import org.codehaus.commons.nullanalysis.Nullable; import java.util.Properties; /** * @author lwh * @date 2023/5/5 * @description sink kafka **/ public class KafkaSinkDemo { public static void main(String[] args) throws Exception { // Env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.fromElements( "Sink to Kafka Test 1", "Sink to Kafka Test 2", "Sink to Kafka Test 3", "Sink to Kafka Test 4", "Sink to Kafka Test 5", "Sink to Kafka Test 6" ); // 使用FlinkKafkaProducer来构建一个Kafka的生产者 String brokerList = "node1:9092,node2:9092,node3:9092"; String topic = "kafkatopic"; Properties properties = new Properties(); properties.setProperty("bootstrap.servers", brokerList); // 废弃 // FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema()); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>( topic, // topic new MyKafkaSerializationSchema(), // 自定义实现kafka的序列化器 properties, // producer的config FlinkKafkaProducer.Semantic.EXACTLY_ONCE // kafka的一致性选择 ); // 构建kafka的sink source.addSink(kafkaSink); env.execute(); } // 自定义构建kafka序列化 public static class MyKafkaSerializationSchema implements KafkaSerializationSchema<String> { private String topic = "kafkatopic"; @Override public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) { return new ProducerRecord<byte[], byte[]>( topic, element.getBytes() ); } /* ProducerRecord有很多构造, 我们使用的是最基础的, 给定topic并且将数据byte化即可, 即ProducerRecord(String topic, V value) 别的构造还会要求传入如: - partition号,int类型, 传入啥写哪个分区, 如果不给定的话, 按照key的hash来计算, 如果没有key的话就按照轮询的方式写入kafka各个分区 - key, 数据的key, 用以计算key的hash来计算数据落入哪个分区 - timestamp, 给数据一个指定的时间戳, 如果不设置, 默认以当前系统时间 上面3个都有默认值, 所以我们不需要设置, 有需要设置可以用其它的重载的构造函数, 如: ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) */ } }
示例
加载下列本地集合,导入MySql中
UserInfo(9, "xiaoxiao", "123456", "潇潇")
开发步骤
package batch.sink; import entity.UserInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * @author lwh * @date 2023/5/5 * @description sink mysql **/ public class MysqlSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserInfo> source = env.fromElements(new UserInfo(9, "xiaoxiao", "123456", "潇潇")); source.addSink(new MyMySQLSink()); env.execute(); } public static class MyMySQLSink extends RichSinkFunction<UserInfo> { private Connection connection = null; // 数据库连接对象 private PreparedStatement ps = null; // ps对象 /* invoke就是执行的方法, 类似自定义Source中的run */ @Override public void invoke(UserInfo value, Context context) throws Exception { this.ps.setInt(1, value.getId()); this.ps.setString(2, value.getUsername()); this.ps.setString(3, value.getPassword()); this.ps.setString(4, value.getName()); this.ps.execute(); } // 实例化的时候执行一次, 适合用来做连接的创建 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); String url = "jdbc:mysql://localhost:3306/flink?useUnicode=true&characterEncoding=utf-8&useSSL=false"; Class.forName("com.mysql.jdbc.Driver"); this.connection = DriverManager.getConnection(url, "root", "123456"); this.ps = connection.prepareStatement("INSERT INTO user VALUES(?,?,?,?);"); } // 销毁实例的时候执行一次, 适合用来释放使用的资源 @Override public void close() throws Exception { super.close(); // 关闭资源 if (this.ps != null) this.ps.close(); if (this.connection != null) this.connection.close(); } } }
通过flink操作redis其实我们可以通过传统的redis连接池Jpoools进行redis的相关操作,但是flink提供了专门操作redis的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink如何使用。
Redis Sink 提供用于向Redis发送数据的接口的类。接收器可以使用三种不同的方法与不同类型的Redis环境进行通信:
注意:本文主要介绍如何创建与单个redis服务器通信的接收器,其他模式请参考flink官网。https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
Redis Sink 核心类是RedisMapper的一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法,如下所示:
使用RedisCommand设置数据结构类型时和redis结构对应关系。
DataType | Redis Command[Sink] |
---|---|
HASH | HSET |
LIST | RPUSH, LPUSH |
SET | SADD |
PUBSUB | PUBLISH |
STRING | SET |
HYPER_LOG_LOG | PFADD |
SORTED_SET | ZADD |
SORTED_SET | ZREM |
代码
package batch.sink; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * @author lwh * @date 2023/5/5 * @description **/ public class RedisSinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Source DataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999); // 将数据转换为Tuple2, 注意, socket 传入的数据需要是kv字符串以空格分隔 SingleOutputStreamOperator<Tuple2<String, String>> map = socketTextStream.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String value) throws Exception { String[] strings = value.split(" "); return Tuple2.of(strings[0], strings[1]); } }); // 构建Redis conf FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("node1").setPort(6379).build(); // 基于自定义的RedisMapper实现来构建RedisSink map.addSink(new RedisSink<Tuple2<String, String>>(redisConf, new MyRedisMapper())); env.execute("Redis Sink Demo"); } /** * 自定义实现RedisMapper接口 */ public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> { /* 描述要写出的数据类型 */ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); } // 设置key @Override public String getKeyFromData(Tuple2<String, String> data) { return data.f0; } // 设置value @Override public String getValueFromData(Tuple2<String, String> data) { return data.f1; } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。