赞
踩
实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消费,完成类似“延时”的效果。
下面的示例代码演示了如何在Spring Boot中使用RabbitMQ设置一个订单,然后在15分钟后自动取消。
1. 添加 RabbitMQ 依赖:
在你的pom.xml
中加入Spring Boot对RabbitMQ的Starter:
xml
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
2. 配置队列、交换器、绑定和容器
我们创建一个配置类,定义一个正常的队列和一个死信队列,以及相应的交换机和队列的绑定:
java
- @Configuration
- public class RabbitMQConfig {
-
- /* 正常队列配置 */
- @Bean
- public Queue orderQueue() {
- Map<String, Object> args = new HashMap<>();
- args.put("x-dead-letter-exchange", "dead_exchange"); // 队列消息过期后发送的交换器
- args.put("x-dead-letter-routing-key", "dead"); // 队列消息过期后发送的路由键
- return new Queue("order_queue", true, false, false, args);
- }
-
- @Bean
- public DirectExchange orderExchange() {
- return new DirectExchange("order_exchange");
- }
-
- @Bean
- public Binding orderBinding() {
- return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");
- }
-
- /* 死信队列配置 */
- @Bean
- public Queue deadQueue() {
- return new Queue("dead_queue");
- }
-
- @Bean
- public DirectExchange deadExchange() {
- return new DirectExchange("dead_exchange");
- }
-
- @Bean
- public Binding deadBinding() {
- return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
- }
- }

3. 发送延时消息
创建订单时,我们发送一个延时消息到队列:
java
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- public void createOrder(String orderId) {
- //订单创建业务...
- rabbitTemplate.convertAndSend(
- "order_exchange",
- "order",
- orderId,
- message -> {
- message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); // 15分钟
- return message;
- }
- );
- }
4. 消费死信队列中的消息
然后,我们需要消费死信队列中的消息,进行订单取消的操作:
java
- @RabbitListener(queues = "dead_queue")
- public void processDeadLetter(String orderId) {
- // 订单取消业务...
- }
java
- Map<String, Object> args = new HashMap<>();
- args.put("x-message-ttl", 60000); // 设置60s过期时间
- Channel channel = ...;
- channel.queueDeclare("my_queue", false, false, false, args);
java
- byte[] messageBodyBytes = "Hello, world!".getBytes();
- AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build(); // 设置60s过期时间
- Channel channel = ...;
- channel.basicPublish("my_exchange", "my_routing_key", props, messageBodyBytes);
5、RabbitMQ中的死信队列如何工作?
“死信队列”用于接收不可路由的消息,或者由于一些原因不能正确处理的消息。这样能防止原始队列堵塞,也可以进一步处理这些消息。
当以下情况发生时,消息会被投递到死信队列:
在声明队列时,可以通过"x-dead-letter-exchange"和"x-dead-letter-routing-key"来设置死信交换器和路由键。
6、如何确保消费者在消费消息时不会发生重复消费的情况?
确保消费者在消费消息时不发生重复消费,一般可以通过以下方式实现:
7、设置RabbitMQ中消息的优先级
在RabbitMQ中,对消息的优先级的支持是通过队列来实现的。在声明队列的时候,可以通过x-max-priority
参数来指定队列支持的最大优先级。然后在发布消息的时候,可以通过basicProperties
的priority
字段来指定消息的优先级。如下:
java
- // 声明队列时设置最大优先级
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-max-priority", 10);
- channel.queueDeclare("my_queue", false, false, false, args);
-
- // 发布消息时设置优先级
- AMQP.BasicProperties props = new AMQP.BasicProperties().builder().priority(5).build();
- channel.basicPublish("", "my_queue", props, "Hello world".getBytes());
注意,优先级较高的消息将会优先被消费,但是并不保证完全按照优先级顺序消费。
8、RabbitMQ中如何处理消费者异常断开连接的情况?
当RabbitMQ检测到消费者(如一个TCP连接)异常断开,例如因为消费者主机崩溃或因为网络问题,它将关闭该消费者的连接,并将消费者未确认的任何消息重新放入队列。如果你希望一个消息在消费者断开连接时不被再次放入队列,你可以设置该消费者的autoAck
参数为true
(也就是无需显示确认)。但一般情况下,我们推荐消费者在正确处理消息后发送一个确认应答(basicAck
)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。