当前位置:   article > 正文

Kafka 实战 - 消费者消费消息的基本实现_kafka消费消息的java代码

kafka消费消息的java代码

在Apache Kafka中,消费者用于从Kafka集群订阅并消费指定主题的消息。以下是一个使用Java实现的消费者基本功能示例,展示了如何创建消费者、订阅主题、消费消息以及关闭消费者:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class SimpleKafkaConsumer {

    public static void main(String[] args) {
        // 1. 配置消费者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka broker列表
        props.put("group.id", "my-consumer-group"); // 消费者组ID
        props.put("key.deserializer", StringDeserializer.class.getName()); // 键反序列化器
        props.put("value.deserializer", StringDeserializer.class.getName()); // 值反序列化器
        props.put("auto.offset.reset", "earliest"); // 如果没有已提交的偏移量,从头开始消费

        // 2. 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 3. 订阅主题
        consumer.subscribe(Arrays.asList("my-topic")); // 订阅一个或多个主题

        // 4. 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 轮询拉取消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Consumed record from topic '%s' partition [%d] @ offset %d: key=%s, value=%s%n",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }

        // 5. 关闭消费者(在实际应用中,应放在适当的退出条件处)
        consumer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

代码解析:

  1. 配置消费者属性

    • bootstrap.servers:设置Kafka集群的Broker地址列表,这里使用localhost:9092作为示例。在实际环境中,应替换为实际的Broker地址。
    • group.id:指定消费者所属的消费者组。同一组内的消费者共享订阅的主题,并通过负载均衡机制分配分区。
    • key.deserializervalue.deserializer:指定键和值的反序列化器,这里使用StringDeserializer,表示接收的消息键和值都是字符串类型。根据实际业务需求,可以使用其他反序列化器。
    • auto.offset.reset:设置消费者在没有已提交的偏移量时如何开始消费。这里设置为earliest,表示从头开始消费。其他可选值包括latest(从最新消息开始消费)和none(如果无已提交偏移量,抛出异常)。
  2. 创建消费者实例

    • 使用配置好的属性创建KafkaConsumer实例。消费者实例不是线程安全的,如果要在多线程环境中使用,需要为每个线程创建单独的消费者实例。
  3. 订阅主题

    • 调用consumer.subscribe()方法订阅一个或多个主题。当主题中有新消息到达时,消费者会自动拉取消息。
  4. 消费消息

    • 使用无限循环调用consumer.poll()方法,该方法会阻塞一段时间(由参数决定),直到收到新的消息或达到超时时间。收到消息后,返回一个ConsumerRecords对象,包含本次拉取的所有消息。
    • 遍历ConsumerRecords,处理每条ConsumerRecord,打印其主题、分区、偏移量、键和值。
  5. 关闭消费者

    • 在程序结束前,调用consumer.close()方法关闭消费者,释放资源。确保在不再需要消费者时关闭它,以避免资源泄露。

以上代码实现了Java中Kafka消费者的基本功能。在实际项目中,可能还需要考虑消息消费的并行处理、消费位移管理、消费重试、事务性消费等高级特性,这些可以通过进一步配置和使用KafkaConsumer的相应API来实现。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/638928
推荐阅读
相关标签
  

闽ICP备14008679号