当前位置:   article > 正文

Flink: Kafka source & sink_kafka sink和source

kafka sink和source

序言

Kafka作为Flink的数据源来进行Demo的制作。

参考:

Kafka连接器版本选择

连接器JAR

Flink-kafka-connector用来做什么?

Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复
Kafka可以作为Flink的source和sink
任务失败,通过设置kafka的offset来恢复应用

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-kafka_2.11</artifactId>
  4. <version>1.12.0</version>
  5. </dependency>

自定义序列化

如果要自定义序列化类则需要用到如下的jar

  1. flink 提供的SimpleStringSchema反序列化默认只将消息输出,topic信息没有,
  2. JSONKeyValueDeserializationSchema类提供了topic消息,要求消息体为json。
  3. 当这些不能满足时,flink也提供了序列化和反序列化接口KeyedDeserializationSchema和KeyedSerializationSchema,可以自定义实现。我们一般都是从kafka消费消息自定义实现KeyedDeserializationSchema接口就可以了。
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-avro</artifactId>
  4. <version>1.12.0</version>
  5. </dependency>

source

  1. package cui.yao.nan.flink.string;
  2. import java.util.Properties;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  7. import org.springframework.boot.CommandLineRunner;
  8. import com.google.gson.Gson;
  9. import cui.yao.nan.pojo.Person;
  10. public class KafkaProducer implements CommandLineRunner{
  11. public static Gson gson = new Gson();
  12. public static void main(String[] args) {
  13. new KafkaProducer().run(null);
  14. }
  15. @Override
  16. public void run(String... args) {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. //FlinkKafkaConsumer<String>(String topic, DeserializationSchema<String> valueDeserializer, Properties props)
  19. FlinkKafkaProducer<String> kafkaProducer = new
  20. FlinkKafkaProducer<String>("topic-name-cui"
  21. , new SimpleStringSchema() , getProperties());
  22. // 加上时间戳,0.10版本之后可以用
  23. kafkaProducer.setWriteTimestampToKafka(true);
  24. DataStream<String> dataStream = env.fromElements(
  25. gson.toJson(new Person("cui",1)),
  26. gson.toJson(new Person("yao",2)),
  27. gson.toJson(new Person("nan",3)));
  28. //中间可以对dataStream进行处理
  29. //print() 打印其结果到 task manager 的日志中
  30. //(如果运行在 IDE 中时,将追加到你的 IDE 控制台)。它会对流中的每个元素都调用 toString() 方法。
  31. dataStream.addSink(kafkaProducer);
  32. try {
  33. env.execute("start kafkaproducer");
  34. } catch (Exception e) {
  35. // TODO Auto-generated catch block
  36. e.printStackTrace();
  37. }
  38. }
  39. private Properties getProperties() {
  40. Properties properties = new Properties();
  41. properties.setProperty("bootstrap.servers", "10.1.80.190:9092");
  42. properties.setProperty("zookeeper.connect", "10.1.80.190:2181");
  43. properties.setProperty("group.id", "15");
  44. // properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  45. // properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  46. return properties;
  47. }
  48. }

source中控制消费的位置

消费失败的管理

对应的代码如下:

设置水位线控制类

这里必须要设置否则没窗口 

这里写法较多可以参考生成 Watermark | Apache Flink

参考代码如下:

  1. package demo;
  2. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.api.common.functions.FlatMapFunction;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  10. import org.apache.flink.streaming.api.windowing.time.Time;
  11. import org.apache.flink.util.Collector;
  12. import java.time.Duration;
  13. /**
  14. * Author: Chenghui Bai
  15. * Date: 2021/3/4 17:48
  16. * ProjectName: frauddetection
  17. * PackageName: demo
  18. * ClassName: WatermarkWCDemo
  19. * Version:
  20. * Description:
  21. */
  22. public class WatermarkWCDemo {
  23. public static void main(String[] args) throws Exception {
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
  26. SerializableTimestampAssigner<String> timestampAssigner = new SerializableTimestampAssigner<String>() {
  27. @Override
  28. public long extractTimestamp(String element, long recordTimestamp) {
  29. String[] fields = element.split(",");
  30. Long aLong = new Long(fields[0]);
  31. return aLong * 1000L;
  32. }
  33. };
  34. SingleOutputStreamOperator<String> watermarkStream = inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(timestampAssigner));
  35. watermarkStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  36. @Override
  37. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  38. String[] fields = value.split(",");
  39. out.collect(new Tuple2<>(fields[1], 1));
  40. }
  41. }).keyBy(data -> data.f0).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum(1).print();
  42. env.execute("run watermark wc");
  43. }
  44. }

sink

  1. package cui.yao.nan.flink.string;
  2. import java.util.Properties;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  7. import org.springframework.boot.CommandLineRunner;
  8. public class KafkaConsumer implements CommandLineRunner {
  9. public static void main(String[] args) {
  10. new KafkaConsumer().run(null);
  11. }
  12. @Override
  13. public void run(String... args) {
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. // FlinkKafkaConsumer<String>(String topic, DeserializationSchema<String>
  16. // valueDeserializer, Properties props)
  17. FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic-name-cui",
  18. new SimpleStringSchema(),
  19. getProperties());
  20. DataStream<String> dataStream = env.addSource(kafkaConsumer);
  21. //打印也是一种sink
  22. dataStream.print();
  23. try {
  24. env.execute("start kafkaconsume");
  25. } catch (Exception e) {
  26. // TODO Auto-generated catch block
  27. e.printStackTrace();
  28. }
  29. }
  30. private Properties getProperties() {
  31. Properties properties = new Properties();
  32. properties.setProperty("bootstrap.servers", "10.1.80.190:9092");
  33. properties.setProperty("zookeeper.connect", "10.1.80.190:2181");
  34. properties.setProperty("group.id", "422");
  35. // properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  36. // properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  37. return properties;
  38. }
  39. }

消费内容如下:( 2> 指出输出来自哪个 sub-task(即 thread))

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/789337
推荐阅读
相关标签
  

闽ICP备14008679号