当前位置:   article > 正文

springboot做多消费者,实现能者多劳(公平竞争,公平分发)分发策略_springboot 多个消费者优化

springboot 多个消费者优化

  • 前言:此文章是基于springboot通过配置实现公平竞争、能者多劳分发策略,不关心消息怎么处理,消息怎么入队

  • 背景:我们目前为了让消息处理的更快,在不同的服务器中部署了两个一模一样的springboot项目,从而让消息更快的被消费
  • 问题:业务问题不方便透露,但最近运维的同事说,这个服务是不是挂了,消息咋还没处理,我就上了一台服务器看看了,发现通过ps -ef | grep jar,服务是正常的,接着监听日志发现,项目像卡了一样并没有往下执行代码,经排查,因为代码刚好执行到了事务注解方法的最后一行就不动了,看了看业务,就在消费者服务中有业务需要去对数据库update操作,接着看日志,好家伙,3w条数据的update,执行了一个小时
  • 原因:这时我就好奇,一个消费者干了一个小时,那另一个消费者应该会吧队列中的消息给消费掉才对吧,毕竟两个项目监听的是一个队列,但并不然,这时候两个springboot项目仅仅是监听了队列,这时候的消息分发模式其实是这样的,比如一下子陆续进来6条消息,那么两个消费者a和b,a会被分到消息1、3、5,而b会被分到2、4、6,这时a将消息处理完了,a不会帮b处理消息
  • 优化

  • 配置文件(application.yml)

  • 配置类(用于绑定消息)

  1. @Component
  2. @Slf4j
  3. public class RabbitMQConfig {
  4. private String queue;
  5. private String exchange;
  6. private String routingKey;
  7. @Value("${rabbitmq.config.queue}")
  8. public void setQueue(String queue) {
  9. log.info("队列名称="+queue);
  10. this.queue = queue;
  11. }
  12. @Value("${rabbitmq.config.exchange}")
  13. public void setExchange(String exchange) {
  14. log.info("交换机名称="+exchange);
  15. this.exchange = exchange;
  16. }
  17. @Value("${rabbitmq.config.routingKey}")
  18. public void setRoutingKey(String routingKey) {
  19. log.info("路由键="+routingKey);
  20. this.routingKey = routingKey;
  21. }
  22. @Bean
  23. public Queue myQueue() {
  24. return new Queue(this.queue, true);
  25. }
  26. @Bean
  27. public DirectExchange myExchange() {
  28. return new DirectExchange(this.exchange);
  29. }
  30. //将队列和交换机进行绑定,并指定路由键
  31. @Bean
  32. public Binding binding(Queue myQueue, DirectExchange myExchange) {
  33. return BindingBuilder.bind(myQueue).to(myExchange).with(this.routingKey);
  34. }
  35. }
  • 监听器

  1. @RabbitListener(queues = "你定义的队列名称")
  2. public void exportExcel(String msg, Message message, Channel channel) throws Exception{
  3. channel.basicQos(1);
  4. //如果消息为字符串1,那么这个消息模拟需要处理很长时间
  5. if ("1".equals(msg)){
  6. Thread.sleep(10000);
  7. }
  8. System.out.println(msg);
  9. //无论如何都要将消息确认
  10. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  11. }
  • 效果

我在本地跑了这两个消费者,并往队列中丢了10条消息,0~9
消费者1效果:

消费者2效果:

 消费者2读到了"1",并且堵塞了当前线程,消费者2就会消费完一条消息后一直从队列中拿消息进行消费,从而两个消费者不会出现干等的情况!
 

最后最后,如果此文章对你有帮助,请关注点赞!!

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/789226
推荐阅读
相关标签
  

闽ICP备14008679号