当前位置:   article > 正文

springboot整合rabbitMQ系列9 消费者Consumer限流_springboot rabbitmq 消费者最小数量

springboot rabbitmq 消费者最小数量

场景:请求瞬间增多,每秒5000个请求,防止A系统挂掉

å¨è¿éæå¥å¾çæè¿°
注意:一定要开启手动ack确认

1 application.yml配置文件

  1. server:
  2. port: 8021
  3. spring:
  4. #给项目来个名字
  5. application:
  6. name: rabbitmq-test
  7. #配置rabbitMq 服务器
  8. rabbitmq:
  9. host: 127.0.0.1
  10. port: 5672
  11. username: need
  12. password: 123456
  13. #虚拟host 可以不设置,使用server默认host
  14. virtual-host: /testhost
  15. listener:
  16. simple:
  17. acknowledge-mode: manual #手动ACK
  18. max-concurrency: 10 #消费之最大数量
  19. concurrency: 1 #消费者最小数量
  20. prefetch: 2 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
  21. direct:
  22. acknowledge-mode: manual #手动ACK

2 消费者代码

        a:消息序列化转换

  1. package org.example.service_b.config;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  4. import org.springframework.amqp.support.converter.MessageConverter;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. /**
  8. * 此代码添加 在消费者的项目中
  9. */
  10. @Configuration
  11. public class RabbitmqGlobalConfig {
  12. /**
  13. * 当发送的消息为pojo时,为报转换异常
  14. * 解决方法:添加这个类进行序列化解析
  15. * 会自动识别
  16. * @param objectMapper json序列化实现类
  17. * @return mq 消息序列化工具
  18. */
  19. @Bean
  20. public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
  21. return new Jackson2JsonMessageConverter(objectMapper);
  22. }
  23. }

        b:接收消息 

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.io.IOException;
  6. @Component
  7. public class DirectReceiver_1 {
  8. @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
  9. /**
  10. * 注意:后4个参数,需要生产者发送消息时加上,否则为报错,注解里添加 required=false 或 删除参数
  11. */
  12. public void process(Message message,
  13. Channel channel,
  14. @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
  15. @Header(AmqpHeaders.MESSAGE_ID) String messageId,
  16. @Header(AmqpHeaders.CONSUMER_TAG) String consumerTag,
  17. CorrelationData correlationData) throws IOException, InterruptedException {
  18. //long deliveryTag = message.getMessageProperties().getDeliveryTag();
  19. //
  20. //@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag
  21. //设置3秒的睡眠好看变化情况
  22. Thread.sleep(1000);
  23. try {
  24. String msgbody = new String(message.getBody());
  25. //1.接收转换消息
  26. System.out.println("DirectReceiver消费者 1 收到消息 : " + msgbody + " 编号: " + deliveryTag);
  27. //3. 手动签收
  28. channel.basicAck(deliveryTag, true);
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }

演示效果,注意看Unacked,并且查看idea控制台,消息是慢慢的读的

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/774533
推荐阅读
相关标签
  

闽ICP备14008679号