当前位置:   article > 正文

RabbitMQ(五)之延迟消息_rabbitmq延迟消息原理

rabbitmq延迟消息原理

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务

延迟消息的实现有两种:死信交换机,延迟消息插件。

一、死信交换机 

当一个队列中的消息满足下列情况之一时,就会成为死信 (dead letter):

1、消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false。
2、消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。

3、要投递的队列消息堆积满了,最早的消息可能成为死信。

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)。

 

原理:创建一个过期时间为30s的消息,投递到队列,由于simple.queue没有消费者,30会过期,就会投递到死信交换机,死信队列绑定的消费者30s后就会收到消息。

需要定义较多的死信交换机和队列,比较繁琐 ,适合用作一个兜底方案处理死信。

二、延迟消息插件(推荐使用)

RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。 

2.1、插件安装

1、点击进入https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载与RabbitMQ版本对应的延迟消息插件


2、将下载的延迟消息插件存放在 D:\Program Files\RabbitMQ Server\rabbitmq_server-3.12.6\plugins 目录下

3、打开RabbitMQ Command Prompt (sbin dir)控制台,输入
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

 

4、安装成功后打开 RabbitMQ Management,看到一下场景即为安装成功

2.2、创建提供消息延迟功能的交换机

第一步,创建交换机和队列,以及消费者

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(value = "delay.queue", durable = "true"),
  3. exchange = @Exchange(value = "delay.direct", delayed = "ture"),
  4. key = "hi"
  5. ))
  6. public void listenDelayQueue(String msg) {
  7. log.info("接收到delay.queue的消息:{}", msg);
  8. }

第二步,发送者发送消息

 

  1. @Test
  2. void testSendDelayMessage() {
  3. rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor() {
  4. @Override
  5. public Message postProcessMessage(Message message) throws AmqpException {
  6. message.getMessageProperties().setDelay(10000);
  7. return message;
  8. }
  9. });
  10. log.info("消息发送成功!!");
  11. }

三、利用延迟消息实现取消超时订单 

 

设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

1、如果并发较高,30分钟可能堆积消息过多,对MQ压力很大。
2、大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源。

解决思路:可以把30分钟的等待时间拆分成n个短的延迟时间,每隔一个时间检查一次,n个延迟时间刚好满足30分钟,如10s,10s,15s,15s,1m......

以下是优化的结果

第一步、定义消息,将消息封装为下面的格式

  1. @Data
  2. public class MultiDelayMessage<T> {
  3. /**
  4. * 消息体
  5. */
  6. private T data;
  7. /**
  8. * 记录延迟时间的集合
  9. */
  10. private List<Long> delayMillis;
  11. public MultiDelayMessage(T data, List<Long> delayMillis) {
  12. this.data = data;
  13. this.delayMillis = delayMillis;
  14. }
  15. public static <T> MultiDelayMessage<T> of(T data, Long... delayMillis) {
  16. return new MultiDelayMessage<>(data, CollUtils.newArrayList(delayMillis));
  17. }
  18. /**
  19. * 获取并移除下一个延迟时间
  20. *
  21. * @return 队列中的第一个延迟时间
  22. */
  23. public Long removeNextDelay() {
  24. return delayMillis.remove(0);
  25. }
  26. /**
  27. * 是否还有下一个延迟时间
  28. */
  29. public boolean hasNextDelay() {
  30. return !delayMillis.isEmpty();
  31. }
  32. }

第二步、交换机和消息队列

  1. public interface MqConstants {
  2. String DELAY_EXCHANGE = "trade.delay.topic";
  3. String DELAY_ORDER_QUEUE = "trade.order.delay.queue";
  4. String DELAY_ORDER_ROUTING_KEY = "order.query";
  5. }

第三步,发送延迟消息

  1. // 5.延迟检测订单状态消息
  2. try {
  3. MultiDelayMessage<Long> msg = MultiDelayMessage.of(order.getId(), 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L);
  4. rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg, new MessagePostProcessor() {
  5. @Override
  6. public Message postProcessMessage(Message message) throws AmqpException {
  7. message.getMessageProperties().setDelay(msg.removeNextDelay().intValue());
  8. return message;
  9. }
  10. });
  11. // rabbitTemplate.convertAndSend(
  12. // MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,
  13. // new DelayMessageProcessor(msg.removeNextDelay().intValue())
  14. // );
  15. } catch (AmqpException e) {
  16. log.error("延迟消息发送异常!", e);
  17. }

第四步、消费者监听消息

  1. @Component
  2. @RequiredArgsConstructor
  3. public class OrderStatusCheckListener {
  4. private final IOrderService orderService;
  5. private final RabbitTemplate rabbitTemplate;
  6. @RabbitListener(bindings = @QueueBinding(
  7. value = @Queue(value = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),
  8. exchange = @Exchange(value = MqConstants.DELAY_EXCHANGE, delayed = "true", type = ExchangeTypes.TOPIC),
  9. key = MqConstants.DELAY_ORDER_ROUTING_KEY
  10. ))
  11. public void listenOrderDelayMessage(MultiDelayMessage<Long> msg) {
  12. // 1.查询订单状态
  13. Order order = orderService.getById(msg.getData());
  14. // 2.判断是否已经支付
  15. if (order == null || order.getStatus() == 2) {
  16. // 订单不存在或者已经被处理
  17. return;
  18. }
  19. // TODO 3.去支付服务查询真正的支付状态
  20. boolean isPay = false; //模拟为支付服务未支付
  21. // 3.1.已支付,标记订单状态为已支付
  22. if (isPay) {
  23. orderService.markOrderPaySuccess(order.getId());
  24. return;
  25. }
  26. // 4.判断是否存在延迟时间
  27. if (msg.hasNextDelay()) {
  28. // 4.1.存在,重发延迟消息
  29. Long nextDelay = msg.removeNextDelay();
  30. rabbitTemplate.convertAndSend(
  31. MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY,
  32. msg, new DelayMessageProcessor(nextDelay.intValue()));
  33. return;
  34. }
  35. // 5.不存在,取消订单
  36. orderService.cancelOrder(order.getId());
  37. }
  38. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/778061
推荐阅读
相关标签
  

闽ICP备14008679号