当前位置:   article > 正文

Flink 系例 之 Connectors 连接 Kafka_flink connector kafka

flink connector kafka

通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作;

示例环境

  1. java.version: 1.8.x
  2. flink.version: 1.11.1
  3. kafka:2.11

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与示例模块

数据流输入

DataStreamSource.java

  1. package com.flink.examples.kafka;
  2. import com.flink.examples.TUser;
  3. import com.google.gson.Gson;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.apache.flink.api.common.functions.FilterFunction;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  8. import org.apache.flink.streaming.api.CheckpointingMode;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  12. import org.apache.kafka.clients.consumer.ConsumerConfig;
  13. import java.util.Properties;
  14. /**
  15. * @Description 从Kafka中消费数据
  16. */
  17. public class DataStreamSource {
  18. /**
  19. * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
  20. */
  21. public static void main(String[] args) throws Exception {
  22. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. //设置并行度(使用几个CPU核心)
  24. env.setParallelism(1);
  25. //每隔2000ms进行启动一个检查点
  26. env.enableCheckpointing(2000);
  27. //设置模式为exactly-once
  28. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  29. // 确保检查点之间有进行500 ms的进度
  30. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  31. //1.消费者客户端连接到kafka
  32. Properties props = new Properties();
  33. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");
  34. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
  35. props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45");
  36. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  37. FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props);
  38. //setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息
  39. //consumer.setStartFromEarliest();
  40. //Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
  41. //consumer.setStartFromTimestamp(1559801580000L);
  42. //Flink从topic中最新的数据开始消费
  43. //consumer.setStartFromLatest();
  44. //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
  45. //consumer.setStartFromGroupOffsets();
  46. //2.在算子中进行处理
  47. DataStream<TUser> sourceStream = env.addSource(consumer)
  48. .filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
  49. .map((MapFunction<String, TUser>) value -> {
  50. System.out.println("print:" + value);
  51. //注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时,
  52. //会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。
  53. //因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。
  54. Gson gson = new Gson();
  55. try {
  56. TUser user = gson.fromJson(value, TUser.class);
  57. return user;
  58. }catch(Exception e){
  59. System.out.println("error:" + e.getMessage());
  60. }
  61. return new TUser();
  62. })
  63. .returns(TUser.class);
  64. sourceStream.print();
  65. //3.执行
  66. env.execute("flink kafka source");
  67. }
  68. }

数据流输出

DataStreamSink.java

  1. package com.flink.examples.kafka;
  2. import com.flink.examples.TUser;
  3. import com.google.gson.Gson;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  6. import org.apache.flink.streaming.api.CheckpointingMode;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  10. import org.apache.kafka.clients.producer.ProducerConfig;
  11. import java.util.Properties;
  12. /**
  13. * @Description 将生产者数据写入到kafka
  14. */
  15. public class DataStreamSink {
  16. /**
  17. * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
  18. */
  19. public static void main(String[] args) throws Exception {
  20. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. //必需设置setParallelism并行度,否则不会输出
  22. env.setParallelism(1);
  23. //每隔2000ms进行启动一个检查点
  24. env.enableCheckpointing(2000);
  25. //设置模式为exactly-once
  26. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  27. // 确保检查点之间有进行500 ms的进度
  28. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  29. // 检查点必须在一分钟内完成,或者被丢弃
  30. env.getCheckpointConfig().setCheckpointTimeout(60000);
  31. // 同一时间只允许进行一个检查点
  32. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  33. //1.连接kafka
  34. Properties props = new Properties();
  35. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");
  36. FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props);
  37. //2.创建数据,并写入数据到流中
  38. TUser user = new TUser();
  39. user.setId(8);
  40. user.setName("liu3");
  41. user.setAge(22);
  42. user.setSex(1);
  43. user.setAddress("CN");
  44. user.setCreateTimeSeries(1598889600000L);
  45. DataStream<String> sourceStream = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
  46. //3.将数据流输入到kafka
  47. sourceStream.addSink(producer);
  48. sourceStream.print();
  49. env.execute("flink kafka sink");
  50. }
  51. }
  1. 在 kafka 上创建名称为 test 的 topic
  2. 先启动 DataStreamSource.java 获取输出流,在启动 DataStreamSink.java 输入流

数据展示

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/500690
推荐阅读
相关标签
  

闽ICP备14008679号