当前位置:   article > 正文

Java实现Kafka消费者(Consumer)两种方式

java实现kafka消费者

实现在Spring Boot项目中监听Kafka指定topic中的消息,有两种实现思路:

一种是使用Spring Boot提供的@KafkaListener注解

另外一种是在kafka提供的原生java客户端中,消费者使用定时任务或者采while(true){…}进行消息拉取,这种方式可以避免与parent 版本出现冲突

目录

一、@KafkaListener注解

二、while(true){…}用.poll()方式进行消息拉取


一、@KafkaListener注解

导入依赖

  1. <!-- spring-kafka -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>版本号</version>
  6. </dependency>

配置文件

  1. # kafka地址
  2. spring.kafka.bootstrap-servers=127.0.0.1:9092
  3. # 消费者组ID
  4. spring.kafka.consumer.group-id=1
  5. # 键序列化方式
  6. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  7. # 值序列化方式
  8. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  9. # 默认提交偏移量
  10. spring.kafka.consumer.enable-auto-commit=true

监听类配置

  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Component;
  3. @Component
  4. public class MyKafkaConsumer {
  5. @KafkaListener(topics = {"test","dev"}) //在这里指定要监听的topic,可以监听多个
  6. public void listenToMessage(String message){
  7. System.out.println("使用注解监听到的消息"+message);
  8. }
  9. }

运行效果

二、使用Kafka提供的原生java客户端中,消费者采while(true){…}用.poll()方式进行消息拉取

导入依赖

  1. <!--kafka-->
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>版本号</version>
  6. </dependency>

配置文件

这里以properties文件为例

  1. #建立与kafka集群连接的host/port组,请通过控制台公网访问获取
  2. bootstrap.servers=127.0.0.1:9092
  3. #用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group
  4. group.id=1
  5. #键的序列化方式
  6. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  7. #值的序列化方式
  8. value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  9. #自动提交
  10. enable.auto.commit=true

编写配置类

  1. import org.apache.kafka.clients.consumer.ConsumerRecords;
  2. import org.apache.kafka.clients.consumer.KafkaConsumer;
  3. import java.io.BufferedInputStream;
  4. import java.io.FileInputStream;
  5. import java.io.IOException;
  6. import java.io.InputStream;
  7. import java.net.URL;
  8. import java.util.ArrayList;
  9. import java.util.Enumeration;
  10. import java.util.List;
  11. import java.util.Properties;
  12. public class MqsConsumer {
  13. public static final String CONFIG_CONSUMER_FILE_NAME = "mqs.sdk.consumer.properties";//配置文件
  14. private KafkaConsumer<Object, Object> consumer;
  15. MqsConsumer(String path)
  16. {
  17. Properties props = new Properties();
  18. try {
  19. InputStream in = new BufferedInputStream(new FileInputStream(path));
  20. props.load(in);
  21. }catch (IOException e)
  22. {
  23. e.printStackTrace();
  24. return;
  25. }
  26. consumer = new KafkaConsumer<Object, Object>(props);
  27. }
  28. public MqsConsumer()
  29. {
  30. Properties props = new Properties();
  31. try {
  32. props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
  33. }catch (IOException e)
  34. {
  35. e.printStackTrace();
  36. return;
  37. }
  38. consumer = new KafkaConsumer<Object, Object>(props);
  39. }
  40. public void consume(List topics)
  41. {
  42. consumer.subscribe(topics);
  43. }
  44. public ConsumerRecords<Object, Object> poll(long timeout)
  45. {
  46. return consumer.poll(timeout);
  47. }
  48. public void close()
  49. {
  50. consumer.close();
  51. }
  52. /**
  53. * get classloader from thread context if no classloader found in thread
  54. * context return the classloader which has loaded this class
  55. *
  56. * @return classloader*/
  57. public static ClassLoader getCurrentClassLoader()
  58. {
  59. ClassLoader classLoader = Thread.currentThread()
  60. .getContextClassLoader();
  61. if (classLoader == null)
  62. {
  63. classLoader = MqsConsumer.class.getClassLoader();
  64. }
  65. return classLoader;
  66. }
  67. /**
  68. * 从classpath 加载配置信息
  69. *
  70. * @param configFileName 配置文件名称
  71. * @return 配置信息
  72. * @throws IOException*/
  73. public static Properties loadFromClasspath(String configFileName) throws IOException
  74. {
  75. ClassLoader classLoader = getCurrentClassLoader();
  76. Properties config = new Properties();
  77. List<URL> properties = new ArrayList<URL>();
  78. Enumeration<URL> propertyResources = classLoader
  79. .getResources(configFileName);
  80. while (propertyResources.hasMoreElements())
  81. {
  82. properties.add(propertyResources.nextElement());
  83. }
  84. for (URL url : properties)
  85. {
  86. InputStream is = null;
  87. try
  88. {
  89. is = url.openStream();
  90. config.load(is);
  91. }
  92. finally
  93. {
  94. if (is != null)
  95. {
  96. is.close();
  97. is = null;
  98. }
  99. }
  100. }
  101. return config;
  102. }

采用while(true){...}进行消息拉取

  1. import com.rococo.mqs.consumer.MqsConsumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.springframework.stereotype.Component;
  5. import javax.annotation.PostConstruct;
  6. import java.util.Arrays;
  7. public class KafkaController {
  8. @PostConstruct
  9. public void testConsumer() throws Exception {
  10. MqsConsumer consumer = new MqsConsumer();
  11. consumer.consume(Arrays.asList("test"));//监听的topic
  12. try {
  13. while (true){
  14. //timeout表示消费者在没有可用消息时愿意等待的最大时间。等待期间,一旦有消息到达,poll() 会立即返回,并从新开始计时。
  15. ConsumerRecords<Object, Object> records = consumer.poll(1000);
  16. System.out.println("the numbers of topic:" + records.count());
  17. for (ConsumerRecord<Object, Object> record : records)
  18. {
  19. Object value = record.value();
  20. System.out.println(value);
  21. }
  22. }
  23. }catch (Exception e)
  24. {
  25. // 异常处理
  26. e.printStackTrace();
  27. }finally {
  28. consumer.close();
  29. }
  30. }
  31. }

执行效果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/906332
推荐阅读
相关标签
  

闽ICP备14008679号