赞
踩
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
延迟消息的实现有两种:死信交换机,延迟消息插件。
当一个队列中的消息满足下列情况之一时,就会成为死信 (dead letter):
1、消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false。
2、消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。3、要投递的队列消息堆积满了,最早的消息可能成为死信。
如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)。
原理:创建一个过期时间为30s的消息,投递到队列,由于simple.queue没有消费者,30会过期,就会投递到死信交换机,死信队列绑定的消费者30s后就会收到消息。
需要定义较多的死信交换机和队列,比较繁琐 ,适合用作一个兜底方案处理死信。
RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
1、点击进入https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载与RabbitMQ版本对应的延迟消息插件
2、将下载的延迟消息插件存放在 D:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.6\plugins 目录下
3、打开RabbitMQ Command Prompt (sbin dir)控制台,输入
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4、安装成功后打开 RabbitMQ Management,看到一下场景即为安装成功
第一步,创建交换机和队列,以及消费者
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "delay.queue", durable = "true"),
- exchange = @Exchange(value = "delay.direct", delayed = "ture"),
- key = "hi"
- ))
- public void listenDelayQueue(String msg) {
- log.info("接收到delay.queue的消息:{}", msg);
- }
第二步,发送者发送消息
- @Test
- void testSendDelayMessage() {
- rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setDelay(10000);
- return message;
- }
- });
- log.info("消息发送成功!!");
- }
设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
1、如果并发较高,30分钟可能堆积消息过多,对MQ压力很大。
2、大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源。解决思路:可以把30分钟的等待时间拆分成n个短的延迟时间,每隔一个时间检查一次,n个延迟时间刚好满足30分钟,如10s,10s,15s,15s,1m......
以下是优化的结果
第一步、定义消息,将消息封装为下面的格式
- @Data
- public class MultiDelayMessage<T> {
- /**
- * 消息体
- */
- private T data;
- /**
- * 记录延迟时间的集合
- */
- private List<Long> delayMillis;
-
- public MultiDelayMessage(T data, List<Long> delayMillis) {
- this.data = data;
- this.delayMillis = delayMillis;
- }
-
- public static <T> MultiDelayMessage<T> of(T data, Long... delayMillis) {
- return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
- }
-
- /**
- * 获取并移除下一个延迟时间
- *
- * @return 队列中的第一个延迟时间
- */
- public Long removeNextDelay() {
- return delayMillis.remove(0);
- }
-
- /**
- * 是否还有下一个延迟时间
- */
- public boolean hasNextDelay() {
- return !delayMillis.isEmpty();
- }
- }
第二步、交换机和消息队列
- public interface MqConstants {
-
- String DELAY_EXCHANGE = "trade.delay.topic";
- String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
- String DELAY_ORDER_ROUTING_KEY = "order.query";
- }
第三步,发送延迟消息
- // 5.延迟检测订单状态消息
- try {
- MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);
-
- rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setDelay(msg.removeNextDelay().intValue());
- return message;
- }
- });
- // rabbitTemplate.convertAndSend(
- // MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
- // new DelayMessageProcessor(msg.removeNextDelay().intValue())
- // );
- } catch (AmqpException e) {
- log.error("延迟消息发送异常!", e);
- }
第四步、消费者监听消息
- @Component
- @RequiredArgsConstructor
- public class OrderStatusCheckListener {
-
- private final IOrderService orderService;
- private final RabbitTemplate rabbitTemplate;
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
- exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true", type = ExchangeTypes.TOPIC),
- key = MqConstants.DELAY_ORDER_ROUTING_KEY
- ))
- public void listenOrderDelayMessage(MultiDelayMessage<Long> msg) {
- // 1.查询订单状态
- Order order = orderService.getById(msg.getData());
- // 2.判断是否已经支付
- if (order == null || order.getStatus() == 2) {
- // 订单不存在或者已经被处理
- return;
- }
- // TODO 3.去支付服务查询真正的支付状态
- boolean isPay = false; //模拟为支付服务未支付
- // 3.1.已支付,标记订单状态为已支付
- if (isPay) {
- orderService.markOrderPaySuccess(order.getId());
- return;
- }
-
- // 4.判断是否存在延迟时间
- if (msg.hasNextDelay()) {
- // 4.1.存在,重发延迟消息
- Long nextDelay = msg.removeNextDelay();
- rabbitTemplate.convertAndSend(
- MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY,
- msg, new DelayMessageProcessor(nextDelay.intValue()));
- return;
- }
- // 5.不存在,取消订单
- orderService.cancelOrder(order.getId());
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。