当前位置:   article > 正文

kafka-consumer-消费者代码实例_kafka消费代码

kafka消费代码

目录

1 消费一个主题

2 消费一个分区

3 消费者组案例


1 消费一个主题

消费topic为first的消息。

  1. public class ConsumerTest{
  2. public void main(string[] args){
  3. // 0 配置
  4. Properties properties = new Properties();
  5. //连接bootstrap . servers
  6. properties.put(ConsumerConfig.BO0TSTRAP_SERVERS_CONFIG , " hadoop102:9092, hadoop103:9092");
  7. //反序列化
  8. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  9. //配置消费者组id,必须配置,没有也要配置,不然会抛出异常
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test" );
  11. // 1 创建一个消费者" ", "hello"
  12. KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
  13. // 2 订阅主题first,可以订阅多个topic。
  14. ArrayList<String> topics = new ArrayList<>();topics.add( "first" );
  15. kafkaConsumer.subscribe(topics);
  16. // 3 消费数据
  17. while (true){
  18. //每一秒拉取一次数据。
  19. ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  20. for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
  21. System.out.println(consumerRecord);
  22. }
  23. }
  24. }
  25. }

2 消费一个分区

应用场景:当生产者将所有消息发往特定的某个主题分区。

消费first主题0号分区代码:

  1. public class customConsumerPartition {
  2. public static void main(String[ ] args) {
  3. // 0 配置
  4. Properties properties = new Properties();
  5. //连接
  6. properties.put(ConsumerConfig.B00TSTRAP_SERVERS_CONFIG , " hadoop102:9092 , hadoop103:9092");
  7. //反序列化
  8. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_ CONFI6,StringDeserializer.class.getName());
  9. //组id
  10. properties.put(ConsumerConfig.GROUP_ID_CONFIG , "test");l
  11. // 1 创建一个消费者
  12. KafkaConsumer<String,String> kafkaConsumer = new KafkaConsume
  13. r<>(properties);
  14. // 2 订阅主题对应的分区
  15. ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition( topic: "first", partition: 0))kafkaconsumer.assign(topicPartitions);
  16. // 3 消费数据
  17. while (true){
  18. ConsumerRecords<String,String> consumerReconds = kafkaConsumer.poll(Duration.ofSeconds(1));
  19. for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
  20. system.out.println(consumerRecord);
  21. }
  22. }
  23. }
  24. }

3 消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费者进行消费。

创建三个消费者对某一分区进行消费 。

将消费主题中的代码复制三份,由于group id是一样的,所以这三个消费者为同一消费者组。

  1. 生产者发送消息(方便阅读)

    此时消息分布在0,1,2三个分区中。
  2. 消费结果发现,三个消费者每个消费者消费一个分区的数据。


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/638926
推荐阅读
相关标签
  

闽ICP备14008679号