当前位置:   article > 正文

硬卷消息中间件系列(八):RabbitMQ 重试机制详解

rabbitmq客户端重连机制

点击下方名片,设为星标

回复“1024”获取2TB学习资源!

前面介绍了 RabbitMQ 基础配置管理界面介绍常用 API消息收发与交换机消息确认机制 等相关的知识点,今天我将详细的为大家介绍 RabbitMQ 重试机制相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!

RabbitMQ重试机制的简介

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。如下图:aee7853d5ff7f97a43f94e8e8d46f668.png

注意事项
  • 如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ 支持消息确认-ACK。

  • 如果忘记了消息确认,那么后果很严重。当 Consumer 退出时候,Message 会一直重新分发。然后 RabbitMQ 会占用越来越多的内容,由于 RabbitMQ 会长时间运行,因此这个"内存泄漏"是致命的。

RabbitMQ的自动确认

自动确认分四种情况(第一就是正常消费,其他三种为异常情况)

  • 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。

  • 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。

  • 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)

  • 抛出其他的异常,消息会被拒绝,且requeue = true

我遇到的是第四种情况,导致mq消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环。消息未被确认时如下图所示:7e6a219ff43103e37ccef7edf8fd530d.png

重试机制有2种情况
  • 消息是自动确认时,如果抛出了异常导致多次重试都失败,消息被自动确认,消息就丢失了

  • 消息是手动确认时,如果抛出了异常导致多次重试都失败,消息没被确认,也无法nack,就一直是unacked状态,导致消息积压。

  • 更多关于消息中间件 RabbitMQ 系列的学习文章,请参阅:消息中间件 RabbitMQ ,本系列持续更新中。

核心配置
  1. spring:
  2.   # 项目名称
  3.   application:
  4.     name: rabbitmq-consumer
  5.   # RabbitMQ服务配置
  6.   rabbitmq:
  7.     host: 127.0.0.1
  8.     port: 5672
  9.     username: guest
  10.     password: guest
  11.     listener:
  12.       simple:
  13.         # 重试机制
  14.         retry:
  15.           enabled: true #是否开启消费者重试
  16.           max-attempts: 5 #最大重试次数
  17.           initial-interval: 5000ms #重试间隔时间(单位毫秒)
  18.           max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
  19.           multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

RabbitMQ重试机制的实现

下面将通过示例来讲解 RabbitMQ 重试机制的实现。首先需要创建两个 SpringBoot 项目并整合 RabbitMQ 客户端。

实现消息发送端
创建第一个 SpringBoot 项目( rabbitmq-provider 消息发送项目)

在pom.xml配置信息文件中,添加相关依赖文件:

  1. <!-- AMQP客户端 -->
  2. <dependency>
  3.     <groupId>org.springframework.boot</groupId>
  4.     <artifactId>spring-boot-starter-amqp</artifactId>
  5.     <version>2.4.1</version>
  6. </dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务:

  1. spring:
  2.   # 项目名称
  3.   application:
  4.     name: rabbitmq-provider
  5.   # RabbitMQ服务配置
  6.   rabbitmq:
  7.     host: 127.0.0.1
  8.     port: 5672
  9.     username: guest
  10.     password: guest
配置队列

在 rabbitmq-provider(消息发送项目)中,配置队列名称,并将队列交由 IoC 管理,代码如下:

  1. package com.pjb.config;
  2.  
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6.  
  7. /**
  8.  * RabbitMQ配置类
  9.  * @author pan_junbiao
  10.  **/
  11. @Configuration
  12. public class RabbitMqConfig
  13. {
  14.     public static final String QUEUE_NAME = "queue_name"//队列名称
  15.     public static final String EXCHANGE_NAME = "exchange_name"//交换器名称
  16.     public static final String ROUTING_KEY = "routing_key"//路由键
  17.  
  18.     /**
  19.      * 队列
  20.      */
  21.     @Bean
  22.     public Queue queue()
  23.     {
  24.         /**
  25.          * 创建队列,参数说明:
  26.          * String name:队列名称。
  27.          * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  28.          * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
  29.          * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
  30.          * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  31.          * 当没有生产者或者消费者使用此队列,该队列会自动删除。
  32.          * Map<String, Object> arguments:设置队列的其他一些参数。
  33.          */
  34.         return new Queue(QUEUE_NAME, truefalsefalse, null);
  35.     }
  36.  
  37.     /**
  38.      * Direct交换器
  39.      */
  40.     @Bean
  41.     public DirectExchange exchange()
  42.     {
  43.         /**
  44.          * 创建交换器,参数说明:
  45.          * String name:交换器名称
  46.          * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
  47.          * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  48.          * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
  49.          */
  50.         return new DirectExchange(EXCHANGE_NAME, truefalse);
  51.     }
  52.  
  53.     /**
  54.      * 绑定
  55.      */
  56.     @Bean
  57.     Binding binding(DirectExchange exchange, Queue queue)
  58.     {
  59.         //将队列和交换机绑定, 并设置用于匹配键:routingKey
  60.         return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  61.     }
  62. }

更多关于消息中间件 RabbitMQ 系列的学习文章,请参阅:消息中间件 RabbitMQ ,本系列持续更新中。

创建发送者

在 rabbitmq-provider(消息发送项目)中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息,代码如下:

  1. package com.pjb;
  2.  
  3. import com.pjb.config.RabbitMqConfig;
  4. import org.junit.jupiter.api.Test;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8.  
  9. /**
  10.  * RabbitMq测试类
  11.  * @author pan_junbiao
  12.  **/
  13. @SpringBootTest
  14. public class RabbitMqTest
  15. {
  16.     @Autowired
  17.     RabbitTemplate rabbitTemplate;
  18.  
  19.     @Test
  20.     public void sendMessage()
  21.     {
  22.         String message = "您好,欢迎访问 pan_junbiao的博客";
  23.         rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME, RabbitMqConfig.ROUTING_KEY, message);
  24.         System.out.println("消息发送成功!");
  25.     }
  26. }
实现消息接收端
创建第二个 SpringBoot 项目( rabbitmq-consumer 消息接收项目)

在pom.xml配置信息文件中,添加相关依赖文件:

  1. <!-- AMQP客户端 -->
  2. <dependency>
  3.     <groupId>org.springframework.boot</groupId>
  4.     <artifactId>spring-boot-starter-amqp</artifactId>
  5.     <version>2.4.1</version>
  6. </dependency>

在 application.yml 配置文件中配置 RabbitMQ 服务,这里需要配置 RabbitMQ 重试机制:

  1. spring:
  2.   # 项目名称
  3.   application:
  4.     name: rabbitmq-consumer
  5.   # RabbitMQ服务配置
  6.   rabbitmq:
  7.     host: 127.0.0.1
  8.     port: 5672
  9.     username: guest
  10.     password: guest
  11.     listener:
  12.       simple:
  13.         # 重试机制
  14.         retry:
  15.           enabled: true #是否开启消费者重试
  16.           max-attempts: 5 #最大重试次数
  17.           initial-interval: 5000ms #重试间隔时间(单位毫秒)
  18.           max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
  19.           multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
创建接收者

在 rabbitmq-consumer(消息接收项目)中,创建创建接收者,注意,发送者和接收者的 Queue 名称必须一致,否则不能接收消息。

接收者接收到消息后,打印输出消息,然后程序抛出运行时异常,观察现象。代码如下:

  1. package com.pjb.receiver;
  2.  
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6.  
  7. import java.text.SimpleDateFormat;
  8. import java.util.Date;
  9.  
  10. /**
  11.  * 接收者
  12.  * @author pan_junbiao
  13.  **/
  14. @Component
  15. @RabbitListener(queues="queue_name")
  16. public class Receiver
  17. {
  18.     @RabbitHandler
  19.     public void process(String message)
  20.     {
  21.         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  22.         System.out.println("接收消息: " + message + " 接收时间:" + sdf.format(new Date()));
  23.         throw new RuntimeException();
  24.     }
  25. }

特别注意:

  • 如果在消息接收端的 application.yml 配置文件中没有添加 RabbitMQ 重试机制的相关配置,当接收端收到消息后程序抛出异常,那么发送端将得不到消息确认(ACK),此时发送端将会循环的发送消息,最终导致内存溢出。

执行结果

7801faa1e2137224d7b715d32a6bb9f0.png从上述执行结果来看,当接收端重试5次后,将消息确认(ACK)。更多关于消息中间件 RabbitMQ 系列的学习文章,请参阅:消息中间件 RabbitMQ ,本系列持续更新中。

参考文章:https://blog.csdn.net/pan_junbiao/article/details

/113038421 https://blog.csdn.net/feiying0canglang/article/

details/127368401

读者专属技术群

构建高质量的技术交流社群,欢迎从事后端开发、运维技术进群(备注岗位,已在技术交流群的请勿重复添加)。主要以技术交流、内推、行业探讨为主,请文明发言。广告人士勿入,切勿轻信私聊,防止被骗。

扫码加我好友,拉你进群

c662d4f891a209c79b28e8609c4f5d1d.jpeg

推荐阅读 点击标题可跳转

扔掉 K8S Dashboard,来试试这几款UI管理工具!

面试官:运行 100 万个并发任务需要多少内存?

上班第一件事!立马卸载这个软件

雷军的代码再登热搜!ChatGPT都称赞他实力惊人

再见 MySQL 5.7 !

一款比Elasticsearch更牛逼的轻量级搜索引擎

f050a9b3d29bb2107b0d19e6fa7562db.png

PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下在看,加个星标,这样每次新文章推送才会第一时间出现在你的订阅列表里。点在看支持我们吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/911097
推荐阅读
相关标签
  

闽ICP备14008679号