赞
踩
rabbitmq_delayed_message_exchange
假设我们正在设计一个线上售卖电影票的系统,用户购票后有 15 分钟时间进行付款,如果用户在 15 分钟内未付款,订单将自动取消并释放电影票库存。这里,我们可以利用 RabbitMQ 的延迟队列机制,在用户购票时发送一条延迟消息到 RabbitMQ,并设定延迟时间为 15 分钟。如果用户未在 15 分钟内完成付款,延迟消息将被消费者接收并处理订单取消的逻辑。
RabbitMQ 本身不直接支持延迟队列功能,需要借助 rabbitmq_delayed_message_exchange
插件来实现。该插件为 RabbitMQ 提供了一种新的消息交换机类型——x-delayed-message
,可以基于消息属性设置延迟时间,在设定的延迟时间后,将消息发送到目标队列。
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
# 将插件移动到 RabbitMQ 插件目录
mv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
我们将使用 x-delayed-message
类型的交换机,并设定延迟队列,用于处理延迟消息。
# 创建交换机
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message \
arguments='{"x-delayed-type":"direct"}'
# 创建队列
rabbitmqadmin declare queue name=delayed_queue
# 绑定交换机与队列
rabbitmqadmin declare binding source=delayed_exchange destination=delayed_queue routing_key=order.payment
在发布消息时,可以设置消息属性 x-delay
来指定延迟时间。
# 使用 rabbitmqadmin 发布延迟消息
rabbitmqadmin publish exchange=delayed_exchange routing_key=order.payment \
payload="{'order_id': '12345', 'status': 'PENDING_PAYMENT'}" \
properties='{"headers":{"x-delay":900000}}'
消费者将从延迟队列中消费消息并执行订单取消逻辑。
在 Spring Boot 项目中,可以通过以下配置来创建延迟交换机和队列。
@Configuration
public class RabbitConfig {
public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
public static final String DELAYED_QUEUE_NAME = "delayed_queue";
public static final String ROUTING_KEY = "order.payment";
// 创建延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 创建延迟队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 绑定延迟队列与交换机
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTING_KEY).noargs();
}
}
@Component
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrderMessage(String orderId) {
Map<String, Object> message = new HashMap<>();
message.put("order_id", orderId);
message.put("status", "PENDING_PAYMENT");
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay", 15 * 60 * 1000); // 延迟 15 分钟
Message msg = new Message(new ObjectMapper().writeValueAsBytes(message), messageProperties);
rabbitTemplate.convertAndSend(RabbitConfig.DELAYED_EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);
}
}
@Component
public class OrderConsumer {
@RabbitListener(queues = RabbitConfig.DELAYED_QUEUE_NAME)
public void processOrderCancellation(Message message) {
try {
Map<String, Object> orderMessage = new ObjectMapper().readValue(message.getBody(), Map.class);
String orderId = (String) orderMessage.get("order_id");
// 取消订单逻辑
System.out.println("Order " + orderId + " has been canceled due to non-payment.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
rabbitmq_delayed_message_exchange
插件与您的 RabbitMQ 版本兼容,否则可能导致插件无法加载。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。