当前位置:   article > 正文

RabbitMQ在SpringBoot中的高级应用(1)_springboot rabbitmq 高级配置

springboot rabbitmq 高级配置

启动RabbitMQ

        1. 在虚拟机中启动RabbitMQ,要先切换到root用户下: su root

        2.关闭防火墙: systemctl stop firewalld  

        3.rabbitmq-server start # 启用服务

        4.rabbitmq-server -detached # 后台启动

1.消息确认机制

  有两种确认的方式:
            自动ACK:RabbitMQ将消息发送给消费者后就会直接将消息删除,前提是消费者程序没有出现异常,有异常会重新发送,直至到达了最大重试次数后抛出异常后不在重试
            手动ACK:通过代码控制决定是否返回确认消息

        1)开启消息确认机制,在核心配置文件中添加以下配置

  1. # 发送者开启simple确认机制
  2. spring.rabbitmq.publisher-confirm-type=simple
  3. # 发送者开启return确认机制
  4. spring.rabbitmq.publisher-returns=true

     

  2)编写配置类

  1. import org.springframework.amqp.core.ReturnedMessage;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.context.annotation.Configuration;
  6. import javax.annotation.PostConstruct;
  7. //消息确认机制的接口 RabbitTemplate
  8. @Configuration
  9. public class PublishConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
  10. @Autowired
  11. private RabbitTemplate rt;
  12. @PostConstruct//加载回调方法
  13. public void initMethod(){
  14. rt.setConfirmCallback(this);
  15. rt.setReturnsCallback(this);
  16. }
  17. @Override//RabbitTemplate.ConfirmCallback如果消息被正常发送到交换机,则会调用该方法(自动回调)
  18. /*
  19. * CorrelationData相关数据,有一个id属性,表示消息的唯一标识
  20. * boolean表示当前消息投放到交换机中的状态,trur表示投放成功
  21. * String表示投送失败的原因
  22. * */
  23. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  24. if(ack){
  25. System.out.println("消息发送给交换机成功");
  26. }else {
  27. System.err.println("消息发送给交换机失败");
  28. }
  29. }
  30. /**
  31. * 消息往队列发送时成功,不会调用该方法 失败了会调用
  32. * @param returnedMessage 返回消息的内容
  33. * message发送的内容
  34. * replyCode响应码
  35. * replyText回应的内容
  36. * exchange交换机
  37. * reotingKey路由键
  38. * */
  39. @Override//RabbitTemplate.ReturnsCallback如果消息被正常从交换机发送到队列,则回调该方法(自动回调)
  40. public void returnedMessage(ReturnedMessage returnedMessage) {
  41. System.out.println("消息发送到交换机却没有到队列");
  42. System.out.println("消息内容"+returnedMessage.getMessage());
  43. System.out.println("响应码"+returnedMessage.getReplyCode());
  44. System.out.println("回应的内容"+returnedMessage.getReplyText());
  45. System.out.println("交换机"+returnedMessage.getExchange());
  46. System.out.println("路由键"+returnedMessage.getRoutingKey());
  47. }
  48. }

         这是没有消息确认配置类时的运行数据

         这是添加了消息确认配置类时的运行数据 ,可以看到我们是否将数据成功发送到交换机或队列

 持久化

        1.队列持久化         没有消费者连接该队列的时候,会被RabbitMQ自动删除        autoDelete = "true"     默认为false,不会被自动删除

         

  1. @RabbitListener(
  2. bindings = @QueueBinding(
  3. value = @Queue(value = "log_queue_error",autoDelete = "true"),
  4. exchange = @Exchange(value = "durable_exchange",type = "direct"),
  5. key = "error_log_key"
  6. )
  7. )
  8. public void druggConsumerError(String message){
  9. System.out.println("接收error级别的日志"+message);
  10. }
  11. @RabbitListener(
  12. bindings = @QueueBinding(
  13. value = @Queue(value = "log_queue_all",autoDelete = "false"),
  14. exchange = @Exchange(value = "durable_exchange",type = "direct"),
  15. key = {"error_log_key","info_log_key","debug_log_key","waring_log_key"}
  16. )
  17. )
  18. public void drugConsumerAll(String message){
  19. System.out.println("接收all级别的日志"+message);
  20. }

      ​​

 

  2.交换机持久化

  1. @RabbitListener(
  2. bindings = @QueueBinding(
  3. value = @Queue(value = "email_queue",autoDelete = "true"),
  4. exchange = @Exchange(value = "temp_exce",type = "topic",autoDelete="true"),
  5. key="em_key"
  6. )
  7. )
  8. public void emailConsumer(Object o){
  9. System.out.println("接收到邮件:"+o);
  10. }

         如果当前的交换机没有被任何的队列所映射,会被RabbitMQ自动删除,关闭项目就相当于没有映射

消费端限流

        控制消费端的消费速度,方式数据过大造成服务端宕机,通过编写配置文件,控制一次推送的消息数量,来减少一次数据太大冲垮服务器

        1.编写核心配置文件application.properties

  1. # 消费端限流实现
  2. #开启手动签收(手动ACK)
  3. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  4. #一次接收3条消息(在单个请求中处理的消息个数)
  5. spring.rabbitmq.listener.simple.prefetch=3
  6. #消费者最小数量
  7. spring.rabbitmq.listener.simple.concurrency=1
  8. #消费者最大数量
  9. spring.rabbitmq.listener.simple.max-concurrency=10

        2.创建交换机和队列

  1. // channel标识信道,封装了RabbitMQ通过的相关配置信息,如果当前的消息被成功消费,通过信道进行标记,
  2. //获取到相应ACK的确认信息
  3. public void currentLimitingConsumer(Message message, Channel channel) throws InterruptedException, IOException {
  4. Thread.sleep(3000);//睡眠3秒钟
  5. long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息的标记,消息的唯一标识
  6. //手动确认消息是否接收,通过消息的id来指定该条消息被成功处理
  7. channel.basicAck(deliveryTag,true);//true表示对应这条消息被消费了
  8. String s = new String(message.getBody());
  9. System.out.println("消费者1::接收到的消息"+s);
  10. }
  11. @RabbitListener(
  12. bindings = @QueueBinding(
  13. value = @Queue("current_limiting_queue"),
  14. exchange = @Exchange(value = "cle",type = "topic"),
  15. key = "current.limiting.#"
  16. )
  17. )
  18. public void currentLimitingConsumer2(Message message, Channel channel) throws InterruptedException, IOException {
  19. Thread.sleep(3000);//睡眠3秒钟
  20. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  21. channel.basicAck(deliveryTag,true);
  22. String s = new String(message.getBody());
  23. System.out.println("消费者2::接收到的消息"+s);
  24. }

        3.创建消息发送者,我们将100条消息发送给cle交换机

  1. @Test//消息限流
  2. public void currentLimitingPushLisher(){
  3. for (int i = 1; i <101 ; i++) {
  4. re.convertAndSend("cle","current.limiting.xm","消息限流:"+i);
  5. }
  6. }

         正如我们的配置配置文件所写,一次性只能接收三条消息

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/1005680
推荐阅读
相关标签
  

闽ICP备14008679号