当前位置:   article > 正文

RabbitMQ重试机制_rabbitmq 重试

rabbitmq 重试

1、RabbitMQ重试机制的简介

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。如下图:

注意事项:

如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ 支持消息确认-ACK。

如果忘记了消息确认,那么后果很严重。当 Consumer 退出时候,Message 会一直重新分发。然后 RabbitMQ 会占用越来越多的内容,由于 RabbitMQ 会长时间运行,因此这个"内存泄漏"是致命的。

RabbitMQ 重试机制核心配置:

  1. spring:
  2. # 项目名称
  3. application:
  4. name: rabbitmq-consumer
  5. # RabbitMQ服务配置
  6. rabbitmq:
  7. host: 127.0.0.1
  8. port: 5672
  9. username: guest
  10. password: guest
  11. listener:
  12. simple:
  13. # 重试机制
  14. retry:
  15. enabled: true #是否开启消费者重试
  16. max-attempts: 5 #最大重试次数
  17. initial-interval: 5000ms #重试间隔时间(单位毫秒)
  18. max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
  19. multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

 

2、RabbitMQ重试机制的实现

下面将通过示例来讲解 RabbitMQ 重试机制的实现。首先需要创建两个 SpringBoot 项目并整合 RabbitMQ 客户端。

2.1 实现消息发送端

(1)创建第一个 SpringBoot 项目( rabbitmq-provider 消息发送项目)。

在pom.xml配置信息文件中,添加相关依赖文件:

  1. <!-- AMQP客户端 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务:

  1. spring:
  2. # 项目名称
  3. application:
  4. name: rabbitmq-provider
  5. # RabbitMQ服务配置
  6. rabbitmq:
  7. host: 127.0.0.1
  8. port: 5672
  9. username: guest
  10. password: guest

(2)配置队列

在 rabbitmq-provider(消息发送项目)中,配置队列名称,并将队列交由 IoC 管理,代码如下:

  1. package com.pjb.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. /**
  6. * RabbitMQ配置类
  7. * @author pan_junbiao
  8. **/
  9. @Configuration
  10. public class RabbitMqConfig
  11. {
  12. public static final String QUEUE_NAME = "queue_name"; //队列名称
  13. public static final String EXCHANGE_NAME = "exchange_name"; //交换器名称
  14. public static final String ROUTING_KEY = "routing_key"; //路由键
  15. /**
  16. * 队列
  17. */
  18. @Bean
  19. public Queue queue()
  20. {
  21. /**
  22. * 创建队列,参数说明:
  23. * String name:队列名称。
  24. * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  25. * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
  26. * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
  27. * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  28. * 当没有生产者或者消费者使用此队列,该队列会自动删除。
  29. * Map<String, Object> arguments:设置队列的其他一些参数。
  30. */
  31. return new Queue(QUEUE_NAME, true, false, false, null);
  32. }
  33. /**
  34. * Direct交换器
  35. */
  36. @Bean
  37. public DirectExchange exchange()
  38. {
  39. /**
  40. * 创建交换器,参数说明:
  41. * String name:交换器名称
  42. * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  43. * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  44. * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  45. */
  46. return new DirectExchange(EXCHANGE_NAME, true, false);
  47. }
  48. /**
  49. * 绑定
  50. */
  51. @Bean
  52. Binding binding(DirectExchange exchange, Queue queue)
  53. {
  54. //将队列和交换机绑定, 并设置用于匹配键:routingKey
  55. return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  56. }
  57. }

(3)创建发送者

在 rabbitmq-provider(消息发送项目)中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息,代码如下:

  1. package com.pjb;
  2. import com.pjb.config.RabbitMqConfig;
  3. import org.junit.jupiter.api.Test;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.boot.test.context.SpringBootTest;
  7. /**
  8. * RabbitMq测试类
  9. * @author pan_junbiao
  10. **/
  11. @SpringBootTest
  12. public class RabbitMqTest
  13. {
  14. @Autowired
  15. RabbitTemplate rabbitTemplate;
  16. @Test
  17. public void sendMessage()
  18. {
  19. String message = "您好,欢迎访问 pan_junbiao的博客";
  20. rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message);
  21. System.out.println("消息发送成功!");
  22. }
  23. }

2.2 实现消息接收端

(1)创建第二个 SpringBoot 项目( rabbitmq-consumer 消息接收项目)。

在pom.xml配置信息文件中,添加相关依赖文件:

  1. <!-- AMQP客户端 -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务,这里需要配置 RabbitMQ 重试机制:

  1. spring:
  2. # 项目名称
  3. application:
  4. name: rabbitmq-consumer
  5. # RabbitMQ服务配置
  6. rabbitmq:
  7. host: 127.0.0.1
  8. port: 5672
  9. username: guest
  10. password: guest
  11. listener:
  12. simple:
  13. # 重试机制
  14. retry:
  15. enabled: true #是否开启消费者重试
  16. max-attempts: 5 #最大重试次数
  17. initial-interval: 5000ms #重试间隔时间(单位毫秒)
  18. max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
  19. multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

(2)创建接收者

在 rabbitmq-consumer(消息接收项目)中,创建创建接收者,注意,发送者和接收者的 Queue 名称必须一致,否则不能接收消息。

接收者接收到消息后,打印输出消息,然后程序抛出运行时异常,观察现象。代码如下:

  1. package com.pjb.receiver;
  2. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.text.SimpleDateFormat;
  6. import java.util.Date;
  7. /**
  8. * 接收者
  9. * @author pan_junbiao
  10. **/
  11. @Component
  12. @RabbitListener(queues="queue_name")
  13. public class Receiver
  14. {
  15. @RabbitHandler
  16. public void process(String message)
  17. {
  18. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  19. System.out.println("接收消息: " + message + " 接收时间:" + sdf.format(new Date()));
  20. throw new RuntimeException();
  21. }
  22. }

特别注意:

如果在消息接收端的 application.yml 配置文件中没有添加 RabbitMQ 重试机制的相关配置,当接收端收到消息后程序抛出异常,那么发送端将得不到消息确认(ACK),此时发送端将会循环的发送消息,最终导致内存溢出。

执行结果:

从上述执行结果来看,当接收端重试5次后,将消息确认(ACK)。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/603944
推荐阅读
相关标签
  

闽ICP备14008679号