赞
踩
1.什么是死信队列
1)“死信”是RabbitMQ中的一种消息机制。
2)消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
3)死信队列
当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX(Dead-Letter-Exchange ) ,绑定 DLX 的队列就称之为死信队列。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
2.配置死信队列
\1. 配置业务队列,绑定到业务交换机上
\2. 为业务队列配置死信交换机和路由key
\3. 为死信交换机配置死信队列
**注意:**并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key
1. 什么是延时队列
延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;具体如下流程图所示:
1.创建业务队列,创建死信交换机、创建死信队列、绑定死信交换机和死信队列,绑定死信交换机和业务队列,绑定路由key;
代码采用的是spring-boot-starter-amqp,初始条件可以看rabbitmq安装、Java连接基础连接部分第二种方式;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //定义延时队列 @Bean("delayQueue") public Queue delayQueue(){ //设置死信交换机以及路由key return QueueBuilder.durable("delayQueue") //如果消息过时,则会投递给对应交换机my-dlx-exchange .withArgument("x-dead-letter-exchange","my-dlx-exchange") //如果消息过时,my-dlx-exchange会根据routing-key-delay投递到对应队列 .withArgument("x-dead-letter-routing-key","routing-key-delay").build(); } //定义死信队列 @Bean("dlxQueue") public Queue dlxQueue(){ return QueueBuilder.durable("dlxQueue").build(); } //定义死信交换机 @Bean("dlxExchange") public Exchange dlxExchange(){ return ExchangeBuilder.directExchange("my-dlx-exchange").build(); } //绑定死信交换机与队列 @Bean("dlxBinding") public Binding dlxBinding(@Qualifier("dlxQueue") Queue dlxQueue,@Qualifier("dlxExchange") Exchange dlxExchange){ return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("routing-key-delay").noargs(); } }
2.可以利用实现CommandLineRunner中的run方法在服务启动自动启动代码,当然也可以写一个业务层用http请求去访问,往队列里面塞数据
@Service @Slf4j public class TextNetworkService implements CommandLineRunner { @Resource private RabbitTemplate rabbitTemplate; private final static String QUEUE_NAME = "delayQueue"; @Override public void run(String... args) throws Exception { Map<String, String> map = new HashMap<>(); map.put("msg", "hello word"); rabbitTemplate.convertAndSend(QUEUE_NAME, map, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置消息过期时间 10S message.getMessageProperties().setExpiration("10000"); return message; } }); System.out.println("发送消息时间:" + new Date()); } }
3.接收消费死信队列消息
@Component
@RabbitListener(queues = "dlxQueue")
public class MessageListener {
@RabbitHandler
public void receive(Map<String,String> map){
System.out.println("接收到时间------"+new Date());
System.out.println("接收到信息=======" + map);
}
}
本文已结束,后续是rabbitmq的五种消息模型,不足之处,望海涵
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。