当前位置:   article > 正文

springboot集成kafka多线程消费端的实现_springboot kafka多线程消费

springboot kafka多线程消费

1.引用jar包

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2.kafka配置文件

  1. spring:
  2. kafka:
  3. consumer:
  4. enable-auto-commit: true
  5. group-id: applog
  6. auto-offset-reset: latest
  7. bootstrap-servers: 127.0.0.1:9092
  8. #集群使用逗号隔开

3.创建kafka工具类ConsumerHandler

  1. package com.netintech.kafka.kafkajob;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.lang.reflect.Constructor;
  6. import java.util.Arrays;
  7. import java.util.Properties;
  8. import java.util.concurrent.*;
  9. public class ConsumerHandler {
  10. // 本例中使用一个consumer将消息放入后端队列,你当然可以使用前一种方法中的多实例按照某张规则同时把消息放入后端队列
  11. private KafkaConsumer<String, String> consumer;
  12. private ExecutorService executors;
  13. private Properties props;
  14. public ConsumerHandler(String servers,String commit,String intervalms,String timeoutms,String groupId, String topic) {
  15. props = new Properties();
  16. props.put("bootstrap.servers", servers);
  17. props.put("group.id", groupId);
  18. props.put("enable.auto.commit", commit);
  19. props.put("auto.commit.interval.ms", intervalms);
  20. props.put("session.timeout.ms", timeoutms);
  21. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  23. consumer = new KafkaConsumer<>(props);
  24. consumer.subscribe(Arrays.asList(topic));
  25. }
  26. public void execute(int workerNum,Class<?> cls, String topic) throws Exception {
  27. executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  28. new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
  29. Constructor constructor = cls.getConstructor(new Class[]{ConsumerRecord.class});
  30. while (true) {
  31. //kafka为空重连
  32. if(consumer!=null){
  33. ConsumerRecords<String, String> records = consumer.poll(200);
  34. for (final ConsumerRecord record : records) {
  35. Runnable object =(Runnable) constructor.newInstance(new Object[]{record});
  36. executors.submit(object);
  37. }
  38. }else{
  39. consumer = new KafkaConsumer<>(props);
  40. consumer.subscribe(Arrays.asList(topic));
  41. }
  42. }
  43. }
  44. public void stop(){
  45. if (consumer != null) {
  46. consumer.wakeup();
  47. }
  48. }
  49. public void shutdown() {
  50. if (consumer != null) {
  51. consumer.close();
  52. }
  53. if (executors != null) {
  54. executors.shutdown();
  55. }
  56. try {
  57. if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
  58. System.out.println("Timeout.... Ignore for this case");
  59. }
  60. } catch (InterruptedException ignored) {
  61. System.out.println("Other thread interrupted this shutdown, ignore for this case.");
  62. Thread.currentThread().interrupt();
  63. }
  64. }
  65. }

4.创建多线程消费任务类OneWork

  1. package com.netintech.kafka.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.netintech.kafka.bean.Test;
  4. import com.netintech.kafka.service.TestService;
  5. import com.netintech.kafka.task.SendVehicleInfo;
  6. import com.netintech.kafka.utils.SpringUtils;
  7. import org.apache.kafka.clients.consumer.ConsumerRecord;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.transaction.annotation.Transactional;
  11. /**
  12. * 多线程kafka消费类
  13. */
  14. public class OneWork implements Runnable {
  15. //日志类
  16. private static final Logger LOG = LoggerFactory.getLogger(OneWork.class);
  17. private ConsumerRecord<String, String> consumerRecord;
  18. public OneWork(ConsumerRecord record) {
  19. this.consumerRecord = record;
  20. }
  21. @Override
  22. public void run() {
  23. try{
  24. //执行消费数据处理方法consumerRecord.value()--消费数据
  25. SendVehicleInfo.jsonToWebService(consumerRecord.value());
  26. }catch (Exception e){
  27. LOG.info("异常错误信息:"+e.getMessage());
  28. }
  29. }
  30. }

5.开启消费

  1. //获取的配置文件中的配置
  2. @Value("${spring.kafka.consumer.bootstrap-servers}")
  3. private String servers;
  4. @Value("${spring.kafka.consumer.enable-auto-commit}")
  5. private String commit;
  6. @Value("${auto.commit.interval.ms}")
  7. private String intervalms;
  8. @Value("${session.timeout.ms}")
  9. private String timeoutms;
  10. 开启消费的方法
  11. ConsumerHandler consumers = new ConsumerHandler( servers,commit,intervalms,timeoutms,消费组, 消费主题topic);
  12. //反射多线程消费任务类传入执行方法
  13. Class<?> cl = Class.forName("com.demo.kafka.impl.OneWork");
  14. //开始消费 num线程数量 topic 消费主题
  15. consumers.execute(num,cl,topic);

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/419806
推荐阅读
相关标签
  

闽ICP备14008679号