赞
踩
在Apache Kafka中,消费者用于从Kafka集群订阅并消费指定主题的消息。以下是一个使用Java实现的消费者基本功能示例,展示了如何创建消费者、订阅主题、消费消息以及关闭消费者:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class SimpleKafkaConsumer { public static void main(String[] args) { // 1. 配置消费者属性 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Kafka broker列表 props.put("group.id", "my-consumer-group"); // 消费者组ID props.put("key.deserializer", StringDeserializer.class.getName()); // 键反序列化器 props.put("value.deserializer", StringDeserializer.class.getName()); // 值反序列化器 props.put("auto.offset.reset", "earliest"); // 如果没有已提交的偏移量,从头开始消费 // 2. 创建消费者实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 3. 订阅主题 consumer.subscribe(Arrays.asList("my-topic")); // 订阅一个或多个主题 // 4. 消费消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 轮询拉取消息 for (ConsumerRecord<String, String> record : records) { System.out.printf("Consumed record from topic '%s' partition [%d] @ offset %d: key=%s, value=%s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } // 5. 关闭消费者(在实际应用中,应放在适当的退出条件处) consumer.close(); } }
代码解析:
配置消费者属性:
bootstrap.servers
:设置Kafka集群的Broker地址列表,这里使用localhost:9092
作为示例。在实际环境中,应替换为实际的Broker地址。group.id
:指定消费者所属的消费者组。同一组内的消费者共享订阅的主题,并通过负载均衡机制分配分区。key.deserializer
和 value.deserializer
:指定键和值的反序列化器,这里使用StringDeserializer
,表示接收的消息键和值都是字符串类型。根据实际业务需求,可以使用其他反序列化器。auto.offset.reset
:设置消费者在没有已提交的偏移量时如何开始消费。这里设置为earliest
,表示从头开始消费。其他可选值包括latest
(从最新消息开始消费)和none
(如果无已提交偏移量,抛出异常)。创建消费者实例:
KafkaConsumer
实例。消费者实例不是线程安全的,如果要在多线程环境中使用,需要为每个线程创建单独的消费者实例。订阅主题:
consumer.subscribe()
方法订阅一个或多个主题。当主题中有新消息到达时,消费者会自动拉取消息。消费消息:
consumer.poll()
方法,该方法会阻塞一段时间(由参数决定),直到收到新的消息或达到超时时间。收到消息后,返回一个ConsumerRecords
对象,包含本次拉取的所有消息。ConsumerRecords
,处理每条ConsumerRecord
,打印其主题、分区、偏移量、键和值。关闭消费者:
consumer.close()
方法关闭消费者,释放资源。确保在不再需要消费者时关闭它,以避免资源泄露。以上代码实现了Java中Kafka消费者的基本功能。在实际项目中,可能还需要考虑消息消费的并行处理、消费位移管理、消费重试、事务性消费等高级特性,这些可以通过进一步配置和使用KafkaConsumer
的相应API来实现。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。