当前位置:   article > 正文

java消费kafka的数据

java消费kafka
  1. <!--kafka依赖-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-clients</artifactId>
  9. </dependency>
  1. @Test
  2. public void KafkaTests(){
  3. //1.配置属性值
  4. Properties properties = new Properties();
  5. //kafka是服务器地址
  6. properties.put("bootstrap.servers", "192.168.239.200:9092");
  7. //定义消费者组
  8. properties.put("group.id", "test");
  9. //自动提交(offset)
  10. properties.put("enable.auto.commit", "true");
  11. //自动处理的间隔时间1
  12. properties.put("auto.commit.interval.ms", "1000");
  13. //keyvalues的持久化设置
  14. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  15. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  16. //2.创建消费者
  17. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  18. //3.订阅消费topic(可以有多个topic)
  19. kafkaConsumer.subscribe(Arrays.asList("cctv1"));
  20. //4.执行消费的操作
  21. while (true) {
  22. //100ms消费一次
  23. //kafkaConsumer.poll(100)读出来,读到records
  24. ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
  25. for (ConsumerRecord<String, String> record : records) {
  26. System.out.println("-----------------");
  27. //打印偏移量,keyvalue
  28. System.out.printf("offset = %d, value = %s", record.offset(), record.key(),record.value());
  29. System.out.println();
  30. }
  31. }
  32. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/781326
推荐阅读
相关标签
  

闽ICP备14008679号