当前位置:   article > 正文

Kafka(消费者)示例_kafka消费者代码

kafka消费者代码
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import java.time.Duration;
  7. import java.util.Collections;
  8. import java.util.Properties;
  9. public class KafkaConsumerExample {
  10. private static final String TOPIC_NAME = "bltest3";
  11. private static final String BOOTSTRAP_SERVERS = "hadoop01:9092";
  12. private static final String GROUP_ID = "console-consumer-88604";
  13. public static void main(String[] args) {
  14. // 配置Kafka消费者属性
  15. Properties props = new Properties();
  16. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  17. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  18. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  19. props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
  20. // 创建Kafka消费者实例
  21. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  22. // 订阅Kafka主题
  23. consumer.subscribe(Collections.singletonList(TOPIC_NAME));
  24. try {
  25. // 消费消息
  26. while (true) {
  27. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  28. for (ConsumerRecord<String, String> record : records) {
  29. System.out.println("Received message: " + record.value());
  30. }
  31. }
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. } finally {
  35. // 关闭Kafka消费者
  36. consumer.close();
  37. }
  38. }
  39. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/638915
推荐阅读
相关标签
  

闽ICP备14008679号