赞
踩
flink支持向文件、socket、集合等中读写数据,同时Flink也内置许多connectors,例如Kafka、Hadoop、Redis等。
source 是flink用来获取外部数据的算子,按照获取数据的方式
,可以分为:
基于集合的 Source
基于 Socket 网络端口的 Source
基于文件的 Source
第三方 Connector Source
自定义 Source 五种
从并行度的角度
,source 又可以分为非并行的 source 和并行的 source:
非并行 source: 并行度只能为 1
,即只有一个运行时实例,在读取大量数据时效率比较低,通常是用来做一些实验或测试,例如 Socket Source;
并行 Source: 并行度可以是 1到多个
,在计算资源足够的前提下,并行度越大,效率越高。例如Kafka Source;
package cn.yyds.source; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class _01_ElementsSource { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); // 默认并行度 /** * 从集合得到数据流 */ DataStreamSource<Integer> fromElements = env.fromElements(1, 2, 3, 4, 5); fromElements.map(d -> d * 10).print(); env.execute(); } }
package cn.yyds.source; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class _02_SocketSource { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); // 默认并行度 /** * 从 socket 端口获取数据得到数据流 * socketTextStream方法产生的source算子,是一个单并行度的source算子 */ DataStreamSource<String> socketSource = env.socketTextStream("centos01", 9999); socketSource.print(); env.execute(); } }
package cn.yyds.source; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; public class _03_TextFileSource { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); // 默认并行度 /** * 从文件得到数据流 */ DataStreamSource<String> fileSource = env.readTextFile("files/data/wc.txt", "utf-8"); fileSource.map(String::toUpperCase)/*.print()*/; // FileProcessingMode.PROCESS_ONCE 表示,对文件只读一次,计算一次,然后程序就退出 // FileProcessingMode.PROCESS_CONTINUOUSLY 表示,会监视着文件的变化,一旦发现文件有变化,则会再次对整个文件进行重新计算 DataStreamSource<String> fileSource2 = env.readFile(new TextInputFormat(null), "files/data/wc.txt", FileProcessingMode.PROCESS_CONTINUOUSLY, 1000); fileSource2.map(String::toUpperCase).print(); env.execute(); } }
在实际生产环境中,为了保证 flink 可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如 Apache Kafka。Kafka 的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink 和 Kaka 整合可以高效的读取数据,并且可以保证 Exactly Once(精确一次性语义)。
<!--kafka扩展包-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
/** * 创建一个工具类 * 为了实现从kafka中读取数据,需要创建kafka的消费者的source源 */ public class FlinkUtils { //创建stream的执行环境,不能改变,因此设置为static final public static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * * @param parameterTool 传入的参数工具类 * @param schema * @param <T> 泛型 * @return */ public static <T> DataStream<T> createKafkaStream(ParameterTool parameterTool, Class< ? extends DeserializationSchema<T> > schema) throws Exception { //从工具类中获取checkpoint的时间间隔,默认是30秒中 long interval = parameterTool.getLong("checkpoint.interval", 30000L); env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE); //为了避免运算的数据在程序cancle等时候会把数据丢失,需要设置这个参数 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //获取topics String[] strings = parameterTool.get("kafka.input.topics").split(","); List<String> topics = Arrays.asList(strings); Properties properties = parameterTool.getProperties(); FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<T>( topics, schema.newInstance(), properties ); //不把偏移量设置到特殊的topic中 flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(false); // 从kafka最新的位置进行消费 flinkKafkaConsumer.setStartFromLatest(); //fink整合kafka的消费者 DataStreamSource<T> dataStreamSource = env.addSource(flinkKafkaConsumer); return dataStreamSource; } /** * 需要重载一个创建kafka消费者的方法,因为这个方法中传入的KafkaDeserializationSchema<T> deserializer类型中 * 有一个 T deserialize(ConsumerRecord<byte[], byte[]> record)方法,能够拿到ConsumerRecord信息 * 从而可以拿到topic partition offset方法 * * public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) * T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; * @param parameterTool 配置参数的工具 * @param schema * @param <T> * @return * @throws Exception */ public static <T> DataStream<T> createKafkaStreamWithId(ParameterTool parameterTool, Class< ? extends KafkaDeserializationSchema<T>> schema) throws Exception { //设置checkpointing的时间间隔 long interval = parameterTool.getLong("checkpoint.interval", 30000L); env.enableCheckpointing(interval,CheckpointingMode.EXACTLY_ONCE); EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); //rocksDBStateBackend.setDbStoragePath(""); env.setStateBackend(rocksDBStateBackend); //设置报错checkpoint的数据到hdfs中 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //创建kafka的消费者 String[] split = parameterTool.get("kafka.input.topics").split(","); List<String> topics = Arrays.asList(split); Properties properties = parameterTool.getProperties(); FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>( topics, schema.newInstance(), properties ); //不把偏移量设置到特殊的topic中 kafkaConsumer.setCommitOffsetsOnCheckpoints(false); DataStreamSource<T> dataStreamSource = env.addSource(kafkaConsumer); return dataStreamSource; } }
package cn.yyds.source; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.OffsetResetStrategy; public class _04_kafkaSourceSource { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); // 默认并行度 /** * 引入扩展包 : flink-connector-kafka * 从kafka中读取数据得到数据流 */ KafkaSource<String> kafkaSource = KafkaSource.<String>builder() // 设置订阅的目标主题 .setTopics("tp01") // 设置消费者组id .setGroupId("gp01") // 设置kafka服务器地址 .setBootstrapServers("centos01:9092") // 起始消费位移的指定: // OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST) 消费起始位移选择之前所提交的偏移量(如果没有,则重置为LATEST) // OffsetsInitializer.earliest() 消费起始位移直接选择为 “最早” // OffsetsInitializer.latest() 消费起始位移直接选择为 “最新” // OffsetsInitializer.offsets(Map<TopicPartition,Long>) 消费起始位移选择为:方法所传入的每个分区和对应的起始偏移量 .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) // 设置value数据的反序列化器 .setValueOnlyDeserializer(new SimpleStringSchema()) // 开启kafka底层消费者的自动位移提交机制 // 它会把最新的消费位移提交到kafka的consumer_offsets中 // 就算把自动位移提交机制开启,KafkaSource依然不依赖自动位移提交机制 // (宕机重启时,优先从flink自己的状态中去获取偏移量<更可靠>) .setProperty("auto.offset.commit", "true") // 把本source算子设置成 BOUNDED属性(有界流) // 将来本source去读取数据的时候,读到指定的位置,就停止读取并退出 // 常用于补数或者重跑某一段历史数据 // .setBounded(OffsetsInitializer.committedOffsets()) // 把本source算子设置成 UNBOUNDED属性(无界流) // 但是并不会一直读数据,而是达到指定位置就停止读取,但程序不退出 // 主要应用场景:需要从kafka中读取某一段固定长度的数据,然后拿着这段数据去跟另外一个真正的无界流联合处理 //.setUnbounded(OffsetsInitializer.latest()) .build(); // env.addSource(); // 接收的是 SourceFunction接口的 实现类 DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");// 接收的是 Source 接口的实现类 streamSource.print(); env.execute(); } }
新版本API中flink 会把kafka消费者的消费位移记录在算子状态中
,这样就实现了消费位移状态的容错,从而可以支持端到端的exactly-once;
自定义 source
可以实现SourceFunction
或者 RichsourceFunction
,这两者都是非并行的 source 算子。
也可实现ParallelSourceFunction
或者 RichParallelSourceFunction
,这两者都是可并行的。
source 算子
带 Rich的,都拥有 open() ,close () ,getRuntimeContext() 方法;
带 Parallel的,都可多实例并行执行。
package cn.yyds.source; import lombok.*; import java.util.Map; @NoArgsConstructor @AllArgsConstructor @Getter @Setter @ToString public class EventLog{ private long guid; private String sessionId; private String eventId; private long timeStamp; private Map<String,String> eventInfo; }
package cn.yyds.source; import com.alibaba.fastjson.JSON; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class _05_SourceFunctionSource { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction()); dataStreamSource.map(JSON::toJSONString).print(); env.execute(); } }
package cn.yyds.source; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.HashMap; public class MySourceFunction implements SourceFunction<EventLog> { volatile boolean flag = true; @Override public void run(SourceContext<EventLog> ctx) throws Exception { EventLog eventLog = new EventLog(); String[] events = {"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"}; HashMap<String, String> eventInfoMap = new HashMap<>(); while(flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase()); eventLog.setTimeStamp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphabetic(1),RandomStringUtils.randomAlphabetic(2)); eventLog.setEventInfo(eventInfoMap); ctx.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(200,1500)); } } @Override public void cancel() { flag = false; } }
可以看到,source只有一个并行度
package cn.yyds.source; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.HashMap; public class _06_RichParallelSource { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 8081); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyRichParallelSourceFunction()); SingleOutputStreamOperator<String> resStream = dataStreamSource.map(JSON::toJSONString).disableChaining(); resStream.print(); env.execute(); } } class MyRichParallelSourceFunction extends RichParallelSourceFunction<EventLog> { volatile boolean flag = true; /** * source组件初始化 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); // 可以从运行时上下文中,取到本算子所属的 task 的task名 String taskName = runtimeContext.getTaskName(); // 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskId int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); } /** * source组件生成数据的过程(核心工作逻辑) * @param ctx * @throws Exception */ @Override public void run(SourceContext<EventLog> ctx) throws Exception { EventLog eventLog = new EventLog(); String[] events = {"appLaunch","pageLoad","adShow","adClick","itemShare","itemCollect","putBack","wakeUp","appClose"}; HashMap<String, String> eventInfoMap = new HashMap<>(); while(flag){ eventLog.setGuid(RandomUtils.nextLong(1,1000)); eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase()); eventLog.setTimeStamp(System.currentTimeMillis()); eventLog.setEventId(events[RandomUtils.nextInt(0,events.length)]); eventInfoMap.put(RandomStringUtils.randomAlphabetic(1),RandomStringUtils.randomAlphabetic(2)); eventLog.setEventInfo(eventInfoMap); ctx.collect(eventLog); eventInfoMap.clear(); Thread.sleep(RandomUtils.nextInt(500,1500)); } } /** * job取消调用的方法 */ @Override public void cancel() { flag = false; } /** * 组件关闭调用的方法 * @throws Exception */ @Override public void close() throws Exception { System.out.println("组件被关闭了....."); } }
可以看到,source有12个并行度
sink 算子是将计算结果最终输出的算了不同的 sink 算子可以将数据输出到不同的目标,如写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台
。
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class _01_FileSinkOperator { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); env.setParallelism(2); DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); // 输出到文件 streamSource.map(bean -> Tuple5.of(bean.getEventId(), bean.getGuid(), bean.getEventInfo(), bean.getSessionId(), bean.getTimeStamp())).returns(new TypeHint<Tuple5<String, Long, Map<String, String>, String, Long>>() { }) /*.writeAsCsv("d:/sink_test2", FileSystem.WriteMode.OVERWRITE)*/; streamSource.writeAsText("d:/flink/sink_test", FileSystem.WriteMode.OVERWRITE); env.execute(); } }
StreamFileSink不但可以将数据写入到各种文件系统中,而且整合了 checkpoint 机制来保证 Exacly Once 语义还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。
streamFileSink 中输出的文件,其生命周期会经历 3种状态:
<!-- 应用StreamFileSink功能所需要的依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.7</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency>
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; public class _02_StreamSinkRow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 应用 StreamFileSink 算子,来将数据输出到 文件系统 * * 输出为 行格式 */ // 构造一个FileSink对象 FileSink<String> rowSink = FileSink .forRowFormat(new Path("d:/flink/filesink/"), new SimpleStringEncoder<String>("utf-8")) // 文件的滚动策略 (间隔时长10s,或文件大小达到 5M,就进行文件切换 .withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(10000).withMaxPartSize(5 * 1024 * 1024).build()) // 分桶的策略(划分子文件夹的策略) .withBucketAssigner(new DateTimeBucketAssigner<String>()) .withBucketCheckInterval(5) // 输出文件的文件名相关配置 .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".txt").build()) .build(); // 然后添加到流,进行输出 streamSource.map(JSON::toJSONString) //.addSink() /* SinkFunction实现类对象,用addSink() 来添加*/ .sinkTo(rowSink); /*Sink 的实现类对象,用 sinkTo()来添加 */ env.execute(); } }
package cn.yyds.sink; import org.apache.avro.Schema; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; public class _02_StreamSinkDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 方式一: * 核心逻辑: * - 构造一个schema * - 利用schema构造一个parquetWriterFactory * - 利用parquetWriterFactory构造一个FileSink算子 * - 将原始数据转成GenericRecord流,输出到FileSink算子 */ // 1. 先定义GenericRecord的数据模式 Schema schema = SchemaBuilder.builder() .record("DataRecord") .namespace("cn.yyds.sink.avro.schema") .doc("用户行为事件数据模式") .fields() .requiredInt("gid") .requiredLong("ts") .requiredString("eventId") .requiredString("sessionId") .name("eventInfo") .type() .map() .values() .type("string") .noDefault() .endRecord(); // 2. 通过定义好的schema模式,来得到一个parquetWriter ParquetWriterFactory<GenericRecord> writerFactory = ParquetAvroWriters.forGenericRecord(schema); // 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子 FileSink<GenericRecord> sink1 = FileSink.forBulkFormat(new Path("d:/flink/datasink/"), writerFactory) .withBucketAssigner(new DateTimeBucketAssigner<GenericRecord>("yyyy-MM-dd--HH")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".parquet").build()) .build(); // 4. 将自定义javabean的流,转成 上述sink算子中parquetWriter所需要的 GenericRecord流 SingleOutputStreamOperator<GenericRecord> recordStream = streamSource .map((MapFunction<EventLog, GenericRecord>) eventLog -> { // 构造一个Record对象 GenericData.Record record = new GenericData.Record(schema); // 将数据填入record record.put("gid", (int) eventLog.getGuid()); record.put("eventId", eventLog.getEventId()); record.put("ts", eventLog.getTimeStamp()); record.put("sessionId", eventLog.getSessionId()); record.put("eventInfo", eventLog.getEventInfo()); return record; }).returns(new GenericRecordAvroTypeInfo(schema)); // 由于avro的相关类、对象需要用avro的序列化器,所以需要显式指定AvroTypeInfo来提供AvroSerializer // 5. 输出数据 recordStream.sinkTo(sink1); env.execute(); } }
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import java.util.HashMap; import java.util.Map; import java.util.Set; public class _02_StreamSinkDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 方式二: * 核心逻辑: * - 编写一个avsc文本文件(json),来描述数据模式 * - 添加 maven代码生成器插件,来针对上述的avsc生成avro特定格式的JavaBean类 * - 利用代码生成器生成的 JavaBean,来构造一个 parquetWriterFactory * - 利用parquetWriterFactory构造一个FileSink算子 * - 将原始数据流 转成 特定格式JavaBean流,输出到 FileSink算子 */ // 1. 先定义avsc文件放在resources文件夹中,并用maven的插件,来编译一下,生成特定格式的JavaBean : AvroEventLog // 这种根据avsc生成的JavaBean类,自身就已经带有了Schema对象 // AvroEventLog avroEventLog = new AvroEventLog(); // Schema schema = avroEventLog.getSchema(); // 2. 通过自动生成 AvroEventLog类,来得到一个parquetWriter ParquetWriterFactory<AvroEventLog> parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(AvroEventLog.class); // 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子 FileSink<AvroEventLog> bulkSink = FileSink.forBulkFormat(new Path("d:/flink/datasink2/"), parquetWriterFactory) .withBucketAssigner(new DateTimeBucketAssigner<AvroEventLog>("yyyy-MM-dd--HH")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".parquet").build()) .build(); // 4. 将自定义javabean的 EventLog 流,转成 上述sink算子中parquetWriter所需要的 AvroEventLog 流 SingleOutputStreamOperator<AvroEventLog> avroEventLogStream = streamSource.map(new MapFunction<EventLog, AvroEventLog>() { @Override public AvroEventLog map(EventLog eventLog) throws Exception { HashMap<CharSequence, CharSequence> eventInfo1 = new HashMap<>(); // 进行hashmap<charsequenct,charsequence>类型的数据转移 Map<String, String> eventInfo2 = eventLog.getEventInfo(); Set<Map.Entry<String, String>> entries = eventInfo2.entrySet(); for (Map.Entry<String, String> entry : entries) { eventInfo1.put(entry.getKey(), entry.getValue()); } return new AvroEventLog(eventLog.getGuid(), eventLog.getSessionId(), eventLog.getEventId(), eventLog.getTimeStamp(), eventInfo1); } }); // 5. 输出数据 avroEventLogStream.sinkTo(bulkSink); env.execute(); } }
avsc文件如下
{"namespace": "cn.yyds.flink.avro.schema",
"type": "record",
"name": "AvroEventLog",
"fields": [
{"name": "guid", "type": "long"},
{"name": "sessionId", "type": "string"},
{"name": "eventId", "type": "string"},
{"name": "timeStamp", "type": "long"},
{"name": "eventInfo", "type": { "type":"map","values": "string"} }
]
}
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.flink.formats.parquet.avro.ParquetAvroWriters; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; public class _02_StreamSinkDemo3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 方式三: * 核心逻辑: * - 利用自己的JavaBean类,来构造一个 parquetWriterFactory * - 利用parquetWriterFactory构造一个FileSink算子 * - 将原始数据流,输出到 FileSink算子 */ // 2. 通过自己的JavaBean类,来得到一个parquetWriter ParquetWriterFactory<EventLog> parquetWriterFactory = ParquetAvroWriters.forReflectRecord(EventLog.class); // 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子 FileSink<EventLog> bulkSink = FileSink.forBulkFormat(new Path("d:/flink/datasink3/"), parquetWriterFactory) .withBucketAssigner(new DateTimeBucketAssigner<EventLog>("yyyy-MM-dd--HH")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("yyds").withPartSuffix(".parquet").build()) .build(); // 5. 输出数据 streamSource.sinkTo(bulkSink); env.execute(); } }
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import com.alibaba.fastjson.JSON; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; import java.util.Properties; public class _03_KafkaSinkOld { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); SingleOutputStreamOperator<String> mapStream = streamSource.map(JSON::toJSONString); // 写入kafka的topic String topic = "test"; // 设置kafka的相关参数 Properties prop = new Properties(); prop.setProperty("bootstrap.servers","centos01:9092,centos02:9092,centos03:9092"); // 创建kafka生产者 FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>( topic, // 指定topic new KafkaSerializationSchema<String>() { @Override public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long aLong) { return new ProducerRecord<byte[], byte[]>( topic,element.getBytes() ); } }, // 指定写入的kafka序列化schema prop, // 指定kafka相关参数 FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 指定精准一次性语义 ); // 添加KafkaSink mapStream.addSink(kafkaProducer); env.execute(); } }
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class _04_KafkaSinkNew { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInteger("rest.port",8822); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); // 把数据写入kafka // 1. 构造一个kafka的sink算子 KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers("centos01:9092,centos02:9092,centos03:9092") .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder() .setTopic("event-log") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setTransactionalIdPrefix("yyds-") .build(); // 2. 把数据流输出到构造好的sink算子 streamSource .map(JSON::toJSONString).disableChaining() .sinkTo(kafkaSink); env.execute(); } }
KafkaSink 是能结合 Flink 的 Checkpoint 机制,来支持端到端精确一次语义的 。底层是利用了 kafka producer 的事务机制。
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import com.alibaba.fastjson.JSON; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.PreparedStatement; import java.sql.SQLException; public class _05_JdbcSinkOperatorNoEOS { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 一、 不保证 EOS语义的方式 */ SinkFunction<EventLog> jdbcSink = JdbcSink.sink( "insert into event_log values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ", new JdbcStatementBuilder<EventLog>() { @Override public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException { preparedStatement.setLong(1, eventLog.getGuid()); preparedStatement.setString(2, eventLog.getSessionId()); preparedStatement.setString(3, eventLog.getEventId()); preparedStatement.setLong(4, eventLog.getTimeStamp()); preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo())); preparedStatement.setLong(6, eventLog.getGuid()); preparedStatement.setString(7, eventLog.getSessionId()); preparedStatement.setString(8, eventLog.getEventId()); preparedStatement.setLong(9, eventLog.getTimeStamp()); preparedStatement.setString(10, JSON.toJSONString(eventLog.getEventInfo())); } }, JdbcExecutionOptions.builder() .withMaxRetries(3) .withBatchSize(1) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8") .withUsername("root") .withPassword("root") .build() ); // 输出数据 streamSource.addSink(jdbcSink); env.execute(); } }
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import com.alibaba.fastjson.JSON; import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.function.SerializableSupplier; import javax.sql.XADataSource; import java.sql.PreparedStatement; import java.sql.SQLException; public class _05_JdbcSinkOperatorEOS { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); /** * 二、可以提供 EOS 语义保证的 sink */ SinkFunction<EventLog> exactlyOnceSink = JdbcSink.exactlyOnceSink( "insert into event_log values (?,?,?,?,?) on duplicate key update guid=?,sessionId=?,eventId=?,ts=?,eventInfo=? ", new JdbcStatementBuilder<EventLog>() { @Override public void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException { preparedStatement.setLong(1, eventLog.getGuid()); preparedStatement.setString(2, eventLog.getSessionId()); preparedStatement.setString(3, eventLog.getEventId()); preparedStatement.setLong(4, eventLog.getTimeStamp()); preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo())); preparedStatement.setLong(6, eventLog.getGuid()); preparedStatement.setString(7, eventLog.getSessionId()); preparedStatement.setString(8, eventLog.getEventId()); preparedStatement.setLong(9, eventLog.getTimeStamp()); preparedStatement.setString(10, JSON.toJSONString(eventLog.getEventInfo())); } }, JdbcExecutionOptions.builder() .withMaxRetries(3) .withBatchSize(1) .build(), JdbcExactlyOnceOptions.builder() // mysql不支持同一个连接上存在并行的多个事务,必须把该参数设置为true .withTransactionPerConnection(true) .build(), new SerializableSupplier<XADataSource>() { @Override public XADataSource get() { // XADataSource就是jdbc连接,不过它是支持分布式事务的连接 // 而且它的构造方法,不同的数据库构造方法不同 MysqlXADataSource xaDataSource = new MysqlXADataSource(); xaDataSource.setUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"); xaDataSource.setUser("root"); xaDataSource.setPassword("root"); return xaDataSource; } } ); // 输出数据 streamSource.addSink(exactlyOnceSink); env.execute(); } }
<!-- redis sink的依赖 -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_${scala.binary.version}</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
package cn.yyds.sink; import cn.yyds.source.EventLog; import cn.yyds.source.MySourceFunction; import com.alibaba.fastjson.JSON; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class _06_RedisSink { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/flink/ckpt"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); // eventLog数据插入redis FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("centos01").build(); RedisSink<EventLog> redisSink = new RedisSink<>(config, new StringInsertMapper()); streamSource.addSink(redisSink); env.execute(); } static class StringInsertMapper implements RedisMapper<EventLog> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SET); } /** * 如果选择的是没有内部key的redis数据结构,则此方法返回的就是大 key * 如果选择的是有内部key的redis数据结构(hset),则此方法返回的是hset内部的小key,二把上面Description中传入的值作为大key * @param data * @return */ @Override public String getKeyFromData(EventLog data) { return data.getGuid()+"-"+data.getSessionId()+"-"+data.getTimeStamp(); // 这里就是string数据的大key } @Override public String getValueFromData(EventLog data) { return JSON.toJSONString(data); // 这里就是string数据的value } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。