赞
踩
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
-
- public class KafkaConsumerExample {
- private static final String TOPIC_NAME = "bltest3";
- private static final String BOOTSTRAP_SERVERS = "hadoop01:9092";
- private static final String GROUP_ID = "console-consumer-88604";
-
- public static void main(String[] args) {
- // 配置Kafka消费者属性
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
-
- // 创建Kafka消费者实例
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-
- // 订阅Kafka主题
- consumer.subscribe(Collections.singletonList(TOPIC_NAME));
-
- try {
- // 消费消息
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- System.out.println("Received message: " + record.value());
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // 关闭Kafka消费者
- consumer.close();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。