赞
踩
目录
消费topic为first的消息。
- public class ConsumerTest{
- public void main(string[] args){
- // 0 配置
- Properties properties = new Properties();
- //连接bootstrap . servers
- properties.put(ConsumerConfig.BO0TSTRAP_SERVERS_CONFIG , " hadoop102:9092, hadoop103:9092");
- //反序列化
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
- //配置消费者组id,必须配置,没有也要配置,不然会抛出异常
- properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test" );
-
- // 1 创建一个消费者" ", "hello"
- KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
-
- // 2 订阅主题first,可以订阅多个topic。
- ArrayList<String> topics = new ArrayList<>();topics.add( "first" );
- kafkaConsumer.subscribe(topics);
-
- // 3 消费数据
- while (true){
- //每一秒拉取一次数据。
- ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
- System.out.println(consumerRecord);
- }
- }
- }
- }
-
应用场景:当生产者将所有消息发往特定的某个主题分区。
消费first主题0号分区代码:
- public class customConsumerPartition {
- public static void main(String[ ] args) {
- // 0 配置
- Properties properties = new Properties();
- //连接
- properties.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG , " hadoop102:9092 , hadoop103:9092");
- //反序列化
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ CONFI6,StringDeserializer.class.getName());
- //组id
- properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test");l
- // 1 创建一个消费者
- KafkaConsumer<String,String> kafkaConsumer = new KafkaConsume
- r<>(properties);
- // 2 订阅主题对应的分区
- ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition( topic: "first", partition: 0))kafkaconsumer.assign(topicPartitions);
-
- // 3 消费数据
- while (true){
- ConsumerRecords<String,String> consumerReconds = kafkaConsumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
- system.out.println(consumerRecord);
- }
- }
- }
- }
测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。
创建三个消费者对某一分区进行消费 。
将消费主题中的代码复制三份,由于group id是一样的,所以这三个消费者为同一消费者组。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。