当前位置:   article > 正文

Kafka消费者api编写教程

Kafka消费者api编写教程

1.基本属性配置

输入new Properties().var 回车

  1. //创建属性
  2.         Properties properties = new Properties();
  3.        //连接集群
  4.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
  5.         //反序列化
  6.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  7.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  8.         //指定消费者组id
  9.         properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

  1. //创建消费者
  2. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

  1. //创建一个数组列表变量接收topics值
  2. ArrayList<String> topics = new ArrayList<>();
  3. //指定要订阅的主题
  4. topics.add("customers");
  5. //订阅主题
  6. kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

  1. //消费数据
  2. while (true){
  3. //if (flag == true) flag 标志位置
  4. //break;
  5. //}生产中退出循环的位置;
  6. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  7. //将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历
  8. for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
  9. System.out.println(consumerRecord);
  10. }
  11. }

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

  1. package com.ljr.kafka.replay;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.TopicPartition;
  7. import org.apache.kafka.common.serialization.StringDeserializer;
  8. import java.time.Duration;
  9. import java.util.ArrayList;
  10. import java.util.Properties;
  11. public class MyConsumer {
  12. public static void main(String[] args) {
  13. //创建属性
  14. Properties properties = new Properties();
  15. //连接集群
  16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
  17. //反序列化
  18. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  19. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
  20. //指定消费者组id
  21. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");
  22. //创建消费者
  23. KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
  24. /*//订阅主题
  25. //创建一个数组列表变量接收topics值
  26. ArrayList<String> topics = new ArrayList<>();
  27. //指定要订阅的主题
  28. topics.add("customers");
  29. //订阅主题
  30. kafkaConsumer.subscribe(topics);*/
  31. //订阅分区
  32. //创建一个数组列表变量接收主题分区值
  33. ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
  34. //指定要订阅的分区
  35. topicPartitions.add(new TopicPartition("customers",2));
  36. //订阅分区
  37. kafkaConsumer.assign(topicPartitions);
  38. //消费数据
  39. while (true){
  40. //if (flag == true) flag 标志位置
  41. //break;
  42. //}生产中退出循环的位置;
  43. ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
  44. //将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历
  45. for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
  46. System.out.println(consumerRecord);
  47. }
  48. }
  49. }
  50. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/680489
推荐阅读
相关标签
  

闽ICP备14008679号