当前位置:   article > 正文

flink 消费kafka数据几种方式_flink消费kafka数据

flink消费kafka数据

先总结三种方式,这三种就是常见的.

  1. /**
  2. * @author daqu
  3. * @date 2022/6/13 15:22
  4. * @descriable:
  5. */
  6. @Slf4j
  7. public class KafkaConsumerUtils {
  8. //简单模式
  9. public static FlinkKafkaConsumer createKafkaConsumer(StreamExecutionEnvironment env, String topic, String groupID) throws Exception {
  10. Properties prop =new Properties();
  11. prop.setProperty("bootstrap.servers", "192.168.1.178:9092,192.168.1.179:9092,192.168.1.180:9092");
  12. prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  13. prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  14. prop.setProperty("auto.offset.reset", "latest");
  15. SimpleStringSchema schema = new SimpleStringSchema();
  16. FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, schema, prop);
  17. //设置自动提交offset
  18. consumer.setCommitOffsetsOnCheckpoints(true);
  19. return consumer;
  20. }
  21. //设置消费位置
  22. public static FlinkKafkaConsumer flinkKafkaConsumerSerDe (StreamExecutionEnvironment env , String topic , String groupID , KafkaDeserializationSchema schema, StartupMode sm) throws IOException {
  23. Properties prop =new Properties();
  24. prop.setProperty("bootstrap.servers", "192.168.1.178:9092,192.168.1.179:9092,192.168.1.180:9092");
  25. prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  26. prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  27. prop.setProperty("auto.offset.reset", "latest");
  28. FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, schema, prop);
  29. //设置kafka 的消费位置
  30. if(sm.equals(StartupMode.EARLIEST)){
  31. consumer.setStartFromEarliest();
  32. }else if (sm.equals(StartupMode.LATEST)){
  33. consumer.setStartFromLatest();
  34. }
  35. //设置自动提交offset
  36. consumer.setCommitOffsetsOnCheckpoints(true);
  37. return consumer;
  38. }
  39. //多个topic
  40. public static FlinkKafkaConsumer mutiTopickafkaConsumer(StreamExecutionEnvironment env,String groupID) throws IOException {
  41. Properties prop =new Properties();
  42. prop.setProperty("bootstrap.servers", "192.168.1.178:9092,192.168.1.179:9092,192.168.1.180:9092");
  43. prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  44. prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  45. prop.setProperty("auto.offset.reset", "latest");
  46. LinkedList<String> topics = new LinkedList<>();
  47. topics.add("test01");
  48. topics.add("test02");
  49. FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), prop);
  50. return consumer;
  51. }
  52. public static void main(String[] args) throws Exception {
  53. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  54. env.setParallelism(1);
  55. String topic ="test01";
  56. String groupId="kafka-group";
  57. FlinkKafkaConsumer consumer = KafkaConsumerUtils.createKafkaConsumer(env, topic, groupId);
  58. env.addSource(consumer).print();
  59. KafkaDeserializationSchemaWrapper schemaWrapper = new KafkaDeserializationSchemaWrapper(new SimpleStringSchema());
  60. FlinkKafkaConsumer consumer1 = KafkaConsumerUtils.flinkKafkaConsumerSerDe(env, topic, groupId, schemaWrapper, StartupMode.LATEST);
  61. env.addSource(consumer1).print();
  62. FlinkKafkaConsumer consumer2 = KafkaConsumerUtils.mutiTopickafkaConsumer(env, groupId);
  63. env.addSource(consumer2).print();
  64. env.execute();
  65. }
  66. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/381416
推荐阅读
相关标签
  

闽ICP备14008679号