赞
踩
rabbitMQ为自带了消息重试机制:当消费者消费消息失败时,可以选择将消息重新“推送”给消费者,直至消息消费成功为止。
开启自带的重试机制,需要如下几个配置:
1 开启消费者手动应答机制,对应的springboot配置项:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
2 消费异常时,设置消息重新入列
- boolean multiple = false; // 单条确认
- boolean requeue = true; // 重新进入队列,谨慎设置!!!很容易导致死循环,cpu 100%
- channel.basicNack(tag, multiple, requeue);
以下是运行例子:
消费者代码如下:
- package com.fmi110.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.atomic.AtomicInteger;
-
- /**
- * @author fmi110
- * @description 消息消费者
- * @date 2021/7/1 16:08
- */
- @Component
- @Slf4j
- public class RabbitConsumer {
-
- AtomicInteger count = new AtomicInteger();
-
- @RabbitListener(queues="my-queue")
- public void consumer1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
-
- log.info(">>>> consumer1 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);
-
- try {
- Thread.currentThread().sleep(1000);
- int i = 1/0;
- channel.basicAck(tag,true); // 确认消息消费成功
- } catch (Exception e) {
- log.error(">>>>消费异常,消息重新进入队列并消费");
- boolean multiple = false; // 单条确认
- boolean requeue = true; // 重新进入队列,谨慎设置!!!
- channel.basicNack(tag, multiple, requeue);
- }
- }
-
- @RabbitListener(queues="my-queue")
- public void consumer2(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
- log.info(">>>> consumer2 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);
-
- try {
- Thread.currentThread().sleep(1000);
- int i = 1/0;
- channel.basicAck(tag,true); // 确认消息消费成功
- } catch (Exception e) {
- log.error(">>>>消费异常,消息重新进入队列并消费");
- boolean multiple = false; // 单条确认
- boolean requeue = true;
- channel.basicNack(tag, multiple, requeue);
- }
- }
- }
-
这里模拟了两个消费者 consumer1
、consumer2
,并在逻辑中人为设置异常 int 1/0
, 在异常捕获中通过
channel.basicNack(tag, false, true);
设置消息重新进入队列,最终推给消费者再次消费。运行结果如下:
日志里包含了几个信息点:
消费者每次只消费一条消息,因为我设置了
spring.rabbitmq.listener.simple.prefetch=1
消息推送使用的 round-robin 算法
rabbitMQ的消费方式有推和拉两种方式,springboot创建的消费者模式使用的推的方式消费
this.channel.basicConsume()
如运行日志所示,重进进入队列的消息,会在队列头部,直接再次推送给消费者消费,如果是因为代码逻辑问题,将会导致消息一直消费失败,导致死循环!!!
比较合理的做法是,重试一定次数消费后,如果仍然失败,则终止重试,将消费异常的消息保存,并上报异常,由人工介入处理。
一个比较合理的重试机制如下:
消息消费出现异常时,借助springboot提供的重试机制进行重试
因为使用的spring-retry,所以方法中必须抛出异常,否则spring-retry不会被触发!!!
重试仍然失败时,消息转发到死信队列,死信队列的消费者记录并上报异常信息
要实现消息消费失败自动转发到死信队列,则rabbitmq在创建消息队列时,需要指定与之绑定的死信队列
完整的实例代码如下:
这里注释掉了 spring.rabbitmq.listener.simple.acknowledge-mode=manual
,这样在消息消费失败时,会自动转到死信队列,如果开启手动确认机制,必须调用 chanel.basicNack(tag,false,false)
消息才会进入死信队列!!!
- # 应用名称
- spring.application.name=rabbitmq
- server.port=8080
- server.servlet.context-path=/
-
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- # 指定连接的虚拟主机,可以在rabbitMQ控制台查看对应的虚拟主机的名字
- spring.rabbitmq.virtual-host=my_vhost
- spring.rabbitmq.username=admin
- spring.rabbitmq.password=admin
-
- spring.rabbitmq.listener.simple.prefetch=1
-
- # 开启 publish-comfirm 机制和消息路由匹配失败退回机制
- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.publisher-confirm-type=correlated
- # 开启消费者应答 ack 机制
- # spring.rabbitmq.listener.simple.acknowledge-mode=manual
- # 开启spring提供的retry
- spring.rabbitmq.listener.simple.retry.enabled=true
- spring.rabbitmq.listener.simple.retry.max-attempts=3
- spring.rabbitmq.listener.simple.retry.initial-interval=3000
主要在程序启动时,做如下设置:
创建死信队列和死信交换器,并将死信队列绑定到死信交换器。
创建普通队列和普通交换器,并将普通队列绑定到普通交换器,同时将死信队列与普通队列关联,这样当消息消费失败时,消息会进入死信队列(使用了自动 ack模式)。
- package com.fmi110.rabbitmq.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
-
-
- /**
- * @author fmi110
- * @description rabbitMQ 配置类
- * @date 2021/7/1 15:08
- */
- @Configuration
- @Slf4j
- public class RabbitConfig {
-
- String dlQueueName = "my-queue-dl"; // 普通队列名称
- String dlExchangeName = "my-exchange-dl"; // 死信交换器名称
- String dlRoutingKey = "rabbit.test";
-
- String queueName = "retry-queue";
- String exchangeName = "my-exchange"; // 普通交换器名称
-
- /**
- * 创建死信队列
- *
- * @return
- */
- @Bean
- public Queue queueDL() {
-
- return QueueBuilder
- .durable(dlQueueName) // 持久化队列
- .build();
- }
-
- /**
- * 创建死信交换机
- *
- * @return
- */
- @Bean
- public TopicExchange exchangeDL() {
- return new TopicExchange(dlExchangeName, true, false);
- }
-
- /**
- * 绑定操作
- */
- @Bean
- public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {
- log.info(">>>> 队列与交换器绑定");
- return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);
- }
-
- /**
- * 创建持久化队列,同时绑定死信交换器
- *
- * @return
- */
- @Bean
- public Queue queue() {
- log.info(">>>> 创建队列 retry-queue");
- HashMap<String, Object> params = new HashMap<>();
- params.put("x-dead-letter-exchange", dlExchangeName);
- params.put("x-dead-letter-routing-key", dlRoutingKey);
-
- return QueueBuilder
- .durable(queueName) // 持久化队列
- .withArguments(params) // 关联死信交换器
- .build();
- }
-
-
- /**
- * 创建交换机
- *
- * @return
- */
- @Bean
- public TopicExchange exchange() {
- log.info(">>>> 创建交换器 my-exchange");
- boolean durable = true; // 持久化
- boolean autoDelete = false; // 消费者全部解绑时不自动删除
- return new TopicExchange(exchangeName, durable, autoDelete);
- }
-
- /**
- * 绑定队列到交换机
- *
- * @param queue
- * @param exchange
- * @return
- */
- @Bean
- public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {
- log.info(">>>> 队列与交换器绑定");
- return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");
- }
-
- // /**
- // * spring-retry重试机制:当重试次数达到最大,消息仍然消费失败时回调。
- // * 如果开启这个类,则死信队列失效,消息消费失败,即使配置了死信队列,消息也不会进入死信队列。
- // * 重试失败回调和死信队列只能二选一!!!spring 提供回调实现类有如下几个:
- // * RejectAndDontRequeueRecoverer :消费失败,并且消息不再入列,spring默认使用。
- // * ImmediateRequeueMessageRecoverer :将消息重新入列
- // * RepublishMessageRecoverer:转发消息到指定的队列,
- // * @return
- // */
- // @Bean
- // public MessageRecoverer messageRecoverer(){
- // return new MessageRecoverer() {
- // @Override
- // public void recover(Message message, Throwable cause) {
- // log.info(message.toString());
- // log.info("spring-retry重试次数达到最大,消息仍然失败的回调");
- // // TODO: 记录错误信息并上报
- // }
- // };
- // }
- }
这里为了保证消息能确保消息发送,配置了 confirm 确认机制
- package com.fmi110.rabbitmq;
-
- import com.rabbitmq.client.AMQP;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
-
- /**
- * @author fmi110
- * @description 消息生产者
- * @date 2021/7/1 15:08
- */
- @Component
- @Slf4j
- public class RabbitProducer {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- /**
- * 1 设置 confirm 回调,消息发送到 exchange 时回调
- * 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调
- *
- * correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用
- */
- @PostConstruct
- public void enableConfirmCallback(){
- // #1
- /**
- * 连接不上 exchange或exchange不存在时回调
- */
- rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
- if (!ack) {
- log.error("消息发送失败");
- // TODO 记录日志,发送通知等逻辑
- }
- });
-
- // #2
- /**
- * 消息投递到队列失败时,才会回调该方法
- * message:发送的消息
- * exchange:消息发往的交换器的名称
- * routingKey:消息携带的路由关键字信息
- */
- rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{
- log.error("消息路由失败");
- // TODO 路由失败后续处理逻辑
- });
- }
-
- public void send(String msg){
- String exchangeName = "my-exchange";
- // String routingKey = "aaa.xxx";
- String routingKey = "rabbit.test";
- rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
- }
- }
- package com.fmi110.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- import java.util.concurrent.atomic.AtomicInteger;
-
-
- /**
- * @author fmi110
- * @description 消息消费者
- * @date 2021/7/1 16:08
- */
- @Component
- @Slf4j
- public class RabbitConsumer {
-
- AtomicInteger count = new AtomicInteger();
-
- /**
- * 普通队列消费者
- * @param data
- * @param channel
- * @param tag
- * @throws Exception
- */
- @RabbitListener(queues="retry-queue")
- public void consumer(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
-
- log.info(">>>> consumer 消费 tag = {},次数count={},消息内容 : {}",tag, count.incrementAndGet(),data);
- // TODO 消息处理逻辑
- throw new RuntimeException("抛出异常,模拟消费失败,触发spring-retry");
- }
-
- /**
- * 死信队列消费者
- * @param data
- * @param channel
- * @param tag
- * @throws Exception
- */
- @RabbitListener(queues="my-queue-dl")
- public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
- log.info(">>>> 死信队列消费 tag = {},消息内容 : {}",tag,data);
- // channel.basicNack(tag, false, false);
- }
- }
用于触发发送消息
- package com.fmi110.rabbitmq.controller;
-
-
- import com.fmi110.rabbitmq.RabbitProducer;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.ResponseBody;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.HashMap;
-
- @RestController
- public class TestController {
- @Autowired
- RabbitProducer rabbitProducer;
-
- @GetMapping("/test")
- public Object test() {
-
- rabbitProducer.send("this is a message");
-
- HashMap<String, Object> result = new HashMap<>();
- result.put("code", 0);
- result.put("msg", "success");
- return result;
- }
- }
运行日志如下:
: >>>> consumer 消费 tag = 1,次数count=1,消息内容 : this is a message : >>>> consumer 消费 tag = 1,次数count=2,消息内容 : this is a message : >>>> consumer 消费 tag = 1,次数count=3,消息内容 : this is a message o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'this is a message' MessageProperties [headers={spring_listener_return_correlation=2840e95b-8544-4ed8-b3ed-8ba02aee2729}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=my-exchange, receivedRoutingKey=rabbit.test, deliveryTag=1, consumerTag=amq.ctag-a5AZEb9AYpOzL6mQJQIvaQ, consumerQueue=retry-queue]) ... Caused by: java.lang.RuntimeException: 抛出异常,模拟消费失败,触发spring-retry at com.fmi110.rabbitmq.RabbitConsumer.consumer(RabbitConsumer.java:36) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_181] .... : >>>> 死信队列消费 tag = 1,消息内容 : this is a message
从日志可看出,普通队列的消费者一共消费了三次仍然失败,最后回调spring提供 RejectAndDontRequeueRecoverer
,然后消息进入死信队列被消费。
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <scope>compile</scope>
- </dependency>
-
- </dependencies>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。