赞
踩
是RabbitMq确认消息是否成功被消费的一种机制。
1.none代表不确认:该模式下,只要队列获取到了消息,就默认已成功消费。该模式下,容易造成消息丢失的情况。
listener:
simple:
acknowledge-mode: none
2.manual手动确认: 该模式下需要在代码中进行手动确认消息。若出现异常,会触发消息的重试机制(默认重试三次),若重试结束后仍没有被确认,则消息状态会变成Unacked,如下图示:
2.1 配置方式
listener:
simple:
acknowledge-mode: manual
3.auto自动确认(默认模式):自动应答,该模式下若消费出现异常则会触发MQ的重试机制,而重试机制若没处理好则容易导致死循环。如下图示:
3.1 配置方式
listener:
simple:
acknowledge-mode: auto
3.2 消息接收确认重试机制的处理配置
避免重试机制导致的队列消费死循环的方法就是限制重试次数,或者使用手动应答等方式处理。
注意,在自动应答模式下,消息的最大重试次数容易造成消息的丢失。
listener:
simple:
acknowledge-mode: auto
retry:
enabled: true #开启重试
max-attempts: 3 #最大重试次数,默认3次,达到次数后,会进行消息移除。若绑定了死信队列,则会放入死信队列中
initial-interval: 2000ms #重试间隔时间
注意!,若想使用重试机制配置来限制重试的行为,那么在对应消费队列代码中,进行Nack的操作时,最后一个入参不能传入true(是否将消息重新发回队列中),否则照样会导致死循环。代码如下:
@RabbitListener(queues = RabbitMqConstants.FANOUT_EMAIL_QUEUE)
public void smsConsumerListener(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
try {
log.info("获取到队列消息:{}",msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
此处最后一个入参为true时,会导致死循环
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
针对异常消息,可以将其转移到死信队列中,这样对当前队列既不会造成消息的阻塞堆积,
也不会影响当前队列继续运行接收新的消息。
2.1 消费已确认方法,会确认唯一标识(deliveryTag)对应的消息
void basicAck(long deliveryTag, boolean multiple) throws IOException;
入参说明:
long deliveryTag:消息唯一标识,RabbitMQ自动生成的对应消息的唯一ID,
可从message.getMessageProperties().getDeliveryTag()方法中获得。
boolean multiple:是否批量退回,不开启就使用false,
开启批量退回需要增加自己的业务判断逻辑
(比如:攒够几条再批量回退,或者设置等待间隔等等)
2.2 消费不确认方法,不确认唯一标识(deliveryTag)对应的消息
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
入参说明:
long deliveryTag:同Ack
boolean multiple:同Ack
boolean requeue:是否重新退回到原消息队列,退回就使用true,不退回的话就使用false。
若是false,在没有绑定死信队列的情况下,则直接会将消息给丢弃掉。有死信队列则会将该消息转移到死信中去。
2.3完整yml配置
# 本服务端口 server: port: xxxxx # 本服务应用名称 spring: application: name: xxxx-xxxx # Nacos配置地址 cloud: nacos: discovery: server-addr: xxxxx #RabbitMq配置 rabbitmq: username: admin password: admin virtual-host: / host: 此处写你的RabbitMq服务地址 port: 此处写你的RabbitMq端口 listener: simple: acknowledge-mode: manual #开启手动确认,none代表不确认,manual才是手动确认,auto自动确认 retry: enabled: true #开启重试 max-attempts: 3 #最大重试次数,默认3次,达到次数后,会进行消息移除。若绑定了死信队列,则会放入死信队列中 initial-interval: 2000ms #重试间隔时间
具体代码:
2.6 常量类
package constants; /** * MQ常量类 */ public class RabbitMqConstants { /************************************FANOUT模式***************************************************/ /** * 发布订阅模式交换机 */ public static final String FANOUT_EXCHANGE = "fanout-exchange"; /** * 发布订阅模式死信推送队列 */ public static final String FANOUT_EMAIL_QUEUE_TO_DLX = "fanout.email.queue.to.dlx"; /************************************DeadLetter死信队列***************************************************/ /** * 死信队列交换机 */ public static final String DEAD_LETTER_EXCHANGE_DLX = "dead-letter-exchange"; /** * 死信队列 */ public static final String DEAD_LETTER_EMAIL_QUEUE_DLQ = "dead.letter.email.queue"; /** * 死信队列路由键 */ public static final String DEAD_LETTER_ROUTING_KEY_DLK = "dead.letter.email.routing.key"; }
2.5 Fanout队列绑定配置类
package com.rabbitmq.nacos.config; import constants.RabbitMqConstants; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutEmailToDlxConfig { /** * Fanout声明交换机 */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitMqConstants.FANOUT_EXCHANGE, true, false); } /** * Fanout声明队列 */ @Bean public Queue fanoutEmailToDlxQueue() { return QueueBuilder.durable(RabbitMqConstants.FANOUT_EMAIL_QUEUE_TO_DLX) .withArgument("x-dead-letter-exchange", RabbitMqConstants.DEAD_LETTER_EXCHANGE_DLX) .withArgument("x-dead-letter-routing-key", RabbitMqConstants.DEAD_LETTER_ROUTING_KEY_DLK) .build(); } /** * 绑定Fanout交换机与队列 */ @Bean public Binding fanoutEmailToDlxBinding() { return BindingBuilder.bind(fanoutEmailToDlxQueue()).to(fanoutExchange()); } }
2.6 死信队列绑定配置类
package com.rabbitmq.nacos.config; import constants.RabbitMqConstants; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Configuration public class DeadLetterBindConfig { @Resource private AmqpAdmin amqpAdmin; @PostConstruct public void initDeclared() { amqpAdmin.initialize(); } /** * DLX,全称为Dead-Letter-Exchange */ @Bean public TopicExchange deadLetterExchange() { return new TopicExchange(RabbitMqConstants.DEAD_LETTER_EXCHANGE_DLX); } /** * DLQ,全称为Dead-Letter-Queue */ @Bean public Queue deadLetterQueue() { return new Queue(RabbitMqConstants.DEAD_LETTER_EMAIL_QUEUE_DLQ, true); } /** * DLK,全称为Dead-Letter-Routing-Key */ @Bean public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(RabbitMqConstants.DEAD_LETTER_ROUTING_KEY_DLK); } }
2.7队列监听类
package com.rabbitmq.nacos.consumer.fanout; import com.rabbitmq.client.Channel; import constants.RabbitMqConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component public class FanoutEmailToDlxListener { @RabbitListener(queues = RabbitMqConstants.FANOUT_EMAIL_QUEUE_TO_DLX) public void fanoutEmailToDlxListener(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); try { log.info("获取到队列消息:{}", msg); int a = 1 / 0; log.info(""); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。