实现Java监听Kafka

一、流程概述

在这个任务中,我们将教你如何在Java中实现监听Kafka。下面是整个过程的步骤:

整体流程 30% 20% 40% 10% 整体流程 创建KafkaConsumer对象 订阅主题 处理消息 关闭KafkaConsumer

二、具体步骤及代码

1. 创建KafkaConsumer对象

首先,我们需要创建一个KafkaConsumer对象,用于连接到Kafka集群

  1. // 创建KafkaConsumer实例
  2. Properties props = new Properties();
  3. props.put("bootstrap.servers", "localhost:9092");
  4. props.put("group.id", "test-group");
  5. props.put("enable.auto.commit", "true");
  6. props.put("auto.commit.interval.ms", "1000");
  7. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  8. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  9. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
2. 订阅主题

接下来,我们需要订阅一个或多个主题,以接收消息。

  1. // 订阅主题
  2. consumer.subscribe(Arrays.asList("topic1", "topic2"));
  • 1.
  • 2.
3. 处理消息

然后,我们需要处理接收到的消息。

  1. // 处理消息
  2. try {
  3. while (true) {
  4. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  5. for (ConsumerRecord<String, String> record : records) {
  6. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  7. }
  8. }
  9. } catch (WakeupException e) {
  10. // Ignore for shutdown
  11. } finally {
  12. consumer.close();
  13. }
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
4. 关闭KafkaConsumer

最后,记得在程序结束时关闭KafkaConsumer。

  1. // 关闭KafkaConsumer
  2. consumer.close();
  • 1.
  • 2.

三、类图

下面是Kafka相关类的类图:

KafkaConsumer +subscribe() +poll() +close() properties -bootstrap.servers -group.id -enable.auto.commit -auto.commit.interval.ms -key.deserializer -value.deserializer ConsumerRecords -offset -key -value ConsumerRecord -offset -key -value

通过以上步骤和代码,你可以成功实现Java监听Kafka的功能。希望这篇文章对你有所帮助!

结尾

希望通过这篇文章,你对如何在Java中监听Kafka有了更好的理解。记得在实践中多加练习,加深对Kafka的理解,不断提升自己的技术水平!祝你学习进步!