赞
踩
死信可以理解成没有被正常消费的消息,在RabbitMQ中以下几种情况会被认定为死信:
这些消息会被发送到死信交换机并路由到死信队列中(在RabbitMQ中死信交换机和死信队列就是普通的交换机和队列)。其流转过程如下图
注意事项:基于死信队列实现的延迟消费不适合时间过于复杂的场景。比如,一个队列中第一条消息TTL为10s,第二条消息TTL为5s,由于RabbitMQ只会监听第一条消息,所以本应第二条消息先达到TTL会在第一条消息的TTL之后。对于该现象有两种解决方案:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
# rabbitmq配置信息 RabbitProperties类
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
# 开启confirm机制
publisher-confirm-type: correlated
# 开启return机制
publisher-returns: true
#全局配置,局部配置存在就以局部为准
listener:
simple:
acknowledge-mode: manual # 手动ACK
@Configuration
public class RabbitMQConfig {
/**
* 正常队列
*/
public static final String EXCHANGE = "boot-exchange";
public static final String QUEUE = "boot-queue";
public static final String ROUTING_KEY = "boot-rout";
/**
* 死信队列
*/
public static final String DEAD_EXCHANGE = "dead-exchange";
public static final String DEAD_QUEUE = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead-rout";
/**
* 声明死信交换机
*
* @return
*/
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.directExchange(DEAD_EXCHANGE).build();
}
/**
* 声明死信队列
*
* @return
*/
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
/**
* 绑定死信的队列和交换机
*
* @param deadExchange
* @param deadQueue
* @return
*/
@Bean
public Binding deadBind(Exchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
/**
* 声明交换机,同channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
*
* @return
*/
@Bean
public Exchange bootExchange() {
return ExchangeBuilder.directExchange(EXCHANGE).build();
}
/**
* 声明队列,同channel.queueDeclare(QUEUE, true, false, false, null);
* 绑定死信交换机及路由key
*
* @return
*/
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//声明队列属性有更改时需要删除队列
//给队列设置消息时长
//.ttl(10000)
//队列最大长度
.maxLength(1)
.build();
}
/**
* 绑定队列和交换机,同 channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
*
* @param bootExchange
* @param bootQueue
* @return
*/
@Bean
public Binding bootBind(Exchange bootExchange, Queue bootQueue) {
return BindingBuilder.bind(bootQueue).to(bootExchange).with(ROUTING_KEY).noargs();
}
}
@RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE)
public void listener_dead(String msg, Channel channel, Message message) throws IOException {
System.out.println("死信接收到消息" + msg);
System.out.println("唯一标识:" + message.getMessageProperties().getCorrelationId());
System.out.println("messageID:" + message.getMessageProperties().getMessageId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void listener(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息" + msg);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false)
}
@RabbitListener(queues = RabbitMQConfig.QUEUE)
public void listener(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息" + msg);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
@SpringBootTest
public class Publisher {
@Autowired
private RabbitTemplate template;
/**
* 5秒未被消费会路由到死信队列
*/
@Test
public void publish_expir() {
template.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTING_KEY, "hello expir dead", message -> {
message.getMessageProperties().setExpiration("5000");
return message;
});
}
}
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//声明队列属性有更改时需要删除队列
//给队列设置消息时长
.ttl(10000)
.build();
}
设置队列长度限制,当队列长度超过设置的阈值,消息便会路由到死信队列。
@Bean
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey(DEAD_ROUTING_KEY)
//声明队列属性有更改时需要删除队列
.maxLength(1)
.build();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。