当前位置:   article > 正文

RabbitMQ (四)实现延迟消息_x-delayed-message

x-delayed-message

1 概述

常用的延迟消息实现方式有:

  • 利用 队列TTL + 死信队列 方式实现

  • 利用消息延迟插件实现

消息变成死信的原因有:​​​​

  • 消息过期。消息TTL或队列TTL

  • 消息被拒绝。消费者调用了 channel.basicNackchannel.basicReject ,并且设置 requeue=false

  • 队列满。

    当设置了最大队列长度或大小并达到最大值时,RabbitMQ 的默认行为是从队列前面丢弃或 dead-letter 消息(即队列中最早的消息)。要修改这种行为,请使用下面描述的 overflow 设置

    overflow

    常见参数说明

2 队列TTL + 死信队列方式

这里直接贴出 rabbitConfig 代码,其他的代码参考该文章:RabbitMQ (三)消息重试

1 RabbitConfig

主要操作:

  1. 创建死信队列和交换器,并绑定

  2. 创建队列,同时设置队列的TTL、绑定死信队列;创建交换器,并绑定,

  1. package com.fmi110.rabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import java.util.HashMap;
  8. /**
  9. * @author fmi110
  10. * @description rabbitMQ 配置类
  11. * @date 2021/7/1 15:08
  12. */
  13. @Configuration
  14. @Slf4j
  15. public class RabbitConfig {
  16.    String dlQueueName  = "my-queue-dl"; // 普通队列名称
  17.    String dlExchangeName = "my-exchange-dl"; // 死信交换器名称
  18.    String dlRoutingKey   = "rabbit.test";
  19.    String queueName = "retry-queue";
  20.    String exchangeName = "my-exchange"; // 普通交换器名称
  21.    /**
  22.     * 创建死信队列
  23.     *
  24.     * @return
  25.     */
  26.    @Bean
  27.    public Queue queueDL() {
  28.        return QueueBuilder
  29.               .durable(dlQueueName) // 持久化队列
  30.               .build();
  31.   }
  32.    /**
  33.     * 创建死信交换机
  34.     *
  35.     * @return
  36.     */
  37.    @Bean
  38.    public TopicExchange exchangeDL() {
  39.        return new TopicExchange(dlExchangeName, true, false);
  40.   }
  41.    /**
  42.     * 绑定操作
  43.     */
  44.    @Bean
  45.    public Binding bindQueueDL2ExchangeDL(Queue queueDL, TopicExchange exchangeDL) {
  46.        log.info(">>>> 队列与交换器绑定");
  47.        return BindingBuilder.bind(queueDL).to(exchangeDL).with(dlRoutingKey);
  48.   }
  49.    /**
  50.     * 创建持久化队列,同时绑定死信交换器
  51.     *
  52.     * @return
  53.     */
  54.    @Bean
  55.    public Queue queue() {
  56.        log.info(">>>> 创建队列 retry-queue");
  57.        HashMap<String, Object> params = new HashMap<>();
  58.        params.put("x-dead-letter-exchange", dlExchangeName);
  59.        params.put("x-dead-letter-routing-key", dlRoutingKey);
  60.        params.put("x-message-ttl", 10 * 1000); // 队列过期时间 10s
  61.        return QueueBuilder
  62.               .durable(queueName) // 持久化队列
  63.               .withArguments(params) // 关联死信交换器
  64.               .build();
  65.   }
  66.    /**
  67.     * 创建交换机
  68.     *
  69.     * @return
  70.     */
  71.    @Bean
  72.    public TopicExchange exchange() {
  73.        log.info(">>>> 创建交换器 my-exchange");
  74.        boolean durable    = true; // 持久化
  75.        boolean autoDelete = false; // 消费者全部解绑时不自动删除
  76.        return new TopicExchange(exchangeName, durable, autoDelete);
  77.   }
  78.    /**
  79.     * 绑定队列到交换机
  80.     *
  81.     * @param queue
  82.     * @param exchange
  83.     * @return
  84.     */
  85.    @Bean
  86.    public Binding bindQueue2Exchange(Queue queue, TopicExchange exchange) {
  87.        log.info(">>>> 队列与交换器绑定");
  88.        return BindingBuilder.bind(queue).to(exchange).with("rabbit.test");
  89.   }
  90. }

2 RabbitConsumer 消费者

延迟消息通过队列的TTL产生,所以这里不应该设置普通队列的消费者,让消息过期然后自动转入死信队列,此时再进行消费以此实现延迟消息

  1. package com.fmi110.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * @author fmi110
  11. * @description 消息消费者
  12. * @date 2021/7/1 16:08
  13. */
  14. @Component
  15. @Slf4j
  16. public class RabbitConsumer {
  17.    /**
  18.     * 死信队列消费者
  19.     * @param data
  20.     * @param channel
  21.     * @param tag
  22.     * @throws Exception
  23.     */
  24. @RabbitListener(queues="my-queue-dl")
  25.    public void consumeDL(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
  26.        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
  27.        log.info(">>>> {} 死信队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);
  28.   }
  29. }

3 弊端

如上图所示实现了延迟10s的消息,但是如果需要实现延迟5s的消息,则需要新建一个TTL为5s的队列,所以如果延迟时间需要很多的话,就需要创建很多队列,实现起来比较麻烦。

再贴一段对消息设置TTL的代码:

  1.    AtomicInteger aint = new AtomicInteger();
  2.    public void send(String msg) {
  3.        String exchangeName = "my-exchange";
  4.        String routingKey   = "rabbit.test";
  5.        // rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
  6.        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  7.        MessageProperties messageProperties = new MessageProperties();
  8.        messageProperties.setCorrelationId(UUID.randomUUID().toString().replace("-", ""));
  9.        // TTL 为5s
  10.        int i = 9 * 1000;
  11.        if (aint.incrementAndGet() % 2 == 0) {
  12.            i = 5 * 1000;
  13.       }
  14.        msg = "message send at " + dateFormat.format(new Date()) +", expired at "+dateFormat.format(new Date().getTime()+i);
  15.        messageProperties.setExpiration(String.valueOf(i)); // 设置过期时间
  16.        Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
  17.        rabbitTemplate.send(exchangeName, routingKey, message);
  18.   }

可以看到消息的过期时间与期望的不一致。因为只有在头部的消息,系统才对其进行过期检测。所以如果消息不再队列头部,即使时间已经过期,也不会导致消息进入死信队列!!!

当同时设置了消息的TTL和队列的TTL时,过期时间谁小谁生效(队列头部的消息才进行TTL检测)。

3 使用延迟插件实现

插件的安装参考 docker安装rabbitMQ

1 RabbitConfig

使用延迟插件实现,需要创建延迟交换器,使用 CustomExchange 类创建,同时指定交换器类型为 x-delayed-message ,此外还需要设置属性 x-delayed-type ,创建的交换器如下图所示

  1. package com.fmi110.rabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.CustomExchange;
  6. import org.springframework.amqp.core.Queue;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.HashMap;
  10. /**
  11. * @author fmi110
  12. * @description 配置交换器、队列
  13. * @date 2021/7/3 9:58
  14. */
  15. @Slf4j
  16. @Configuration
  17. public class RabbitConfig2 {
  18.    String exchangeName = "delay-exchange";
  19.    String queueName    = "delay-queue";
  20.    String exchangeType = "x-delayed-message";
  21.    @Bean
  22.    public CustomExchange exchange() {
  23.        HashMap<String, Object> args = new HashMap<>();
  24.        args.put("x-delayed-type", "topic");
  25.        return new CustomExchange(exchangeName, exchangeType, true, false, args);
  26.   }
  27.    @Bean
  28.    public Queue queue() {
  29.        return new Queue(queueName, true, false, false);
  30.   }
  31.    @Bean
  32.    public Binding binding(CustomExchange exchange, Queue queue) {
  33.        return BindingBuilder.bind(queue)
  34.                             .to(exchange)
  35.                             .with("rabbit.delay")
  36.                             .noargs();
  37.   }
  38. }

2 RabbitProducer

这里开启了消息投递失败回调。测试中发现,使用延迟插件,虽然消息正常投递了,但是始终会报 “NO_ROUTER” 提示路由失败。虽然不影响功能。运行截图见后文。目前不确定是我设置问题还是框架的问题...

  1. package com.fmi110.rabbitmq;
  2. import com.rabbitmq.client.AMQP;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.core.MessageDeliveryMode;
  6. import org.springframework.amqp.core.MessageProperties;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.util.StringUtils;
  11. import javax.annotation.PostConstruct;
  12. import java.nio.charset.StandardCharsets;
  13. import java.text.SimpleDateFormat;
  14. import java.util.Date;
  15. import java.util.UUID;
  16. import java.util.concurrent.atomic.AtomicInteger;
  17. /**
  18. * @author fmi110
  19. * @description 消息生产者
  20. * @date 2021/7/1 15:08
  21. */
  22. @Component
  23. @Slf4j
  24. public class RabbitProducer {
  25.    @Autowired
  26.    RabbitTemplate rabbitTemplate;
  27.    /**
  28.     * 1 设置 confirm 回调,消息发送到 exchange 时回调
  29.     * 2 设置 return callback ,当路由规则无法匹配到消息队列时,回调
  30.     * <p>
  31.     * correlationData:消息发送时,传递的参数,里边只有一个id属性,标识消息用
  32.     */
  33.    @PostConstruct
  34.    public void enableConfirmCallback() {
  35.        // #1
  36.        /**
  37.         * 连接不上 exchange或exchange不存在时回调
  38.         */
  39.        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  40.            if (!ack) {
  41.                log.error("消息发送失败");
  42.                // TODO 记录日志,发送通知等逻辑
  43.           }
  44.       });
  45.        // #2
  46.        /**
  47.         * 消息投递到队列失败时,才会回调该方法
  48.         * message:发送的消息
  49.         * exchange:消息发往的交换器的名称
  50.         * routingKey:消息携带的路由关键字信息
  51.         */
  52.        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  53.            log.error("{},exchange={},routingKey={}",replyText,exchange,routingKey);
  54.            // TODO 路由失败后续处理逻辑
  55.       });
  56.   }
  57.    public void sendDelayMsg(String delay) {
  58.        int               delayInt          = StringUtils.isEmpty(delay) ? 0 : Integer.valueOf(delay);
  59.        String            exchangeName      = "delay-exchange";
  60.        String            routingKey        = "rabbit.delay";
  61.        SimpleDateFormat  dateFormat        = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  62.        String            msg               = "message send at " + dateFormat.format(new Date()) + ", expired at " + dateFormat.format(new Date().getTime() + delayInt * 1000);
  63. //       MessageProperties messageProperties = new MessageProperties();
  64. //       messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息
  65. //       messageProperties.setDelay(delayInt * 1000);
  66. //       Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
  67. //       rabbitTemplate.send(exchangeName,routingKey,message);
  68.        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg, message ->{
  69.            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化
  70.            message.getMessageProperties().setDelay(delayInt * 1000);   // 单位为毫秒
  71.            return message;
  72.       });
  73.   }
  74. }

3 RabbitConsumer

消费者,指定监听对应的消息队列即可。

  1. package com.fmi110.rabbitmq;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import java.text.SimpleDateFormat;
  9. import java.util.Date;
  10. import java.util.concurrent.atomic.AtomicInteger;
  11. /**
  12. * @author fmi110
  13. * @description 消息消费者
  14. * @date 2021/7/1 16:08
  15. */
  16. @Component
  17. @Slf4j
  18. public class RabbitConsumer {
  19.    @RabbitListener(queues="delay-queue")
  20.    public void consumeDelay(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
  21.        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
  22.        log.info(">>>> {} 延迟队列消费 tag = {},消息内容 : {}", dateFormat.format(new Date()), tag, data);
  23.   }
  24. }

4 controller

  1. package com.fmi110.rabbitmq.controller;
  2. import com.fmi110.rabbitmq.RabbitProducer;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.ResponseBody;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import sun.rmi.runtime.Log;
  11. import java.text.SimpleDateFormat;
  12. import java.util.Date;
  13. import java.util.HashMap;
  14. @Slf4j
  15. @RestController
  16. public class TestController {
  17.    @Autowired
  18.    RabbitProducer rabbitProducer;
  19.    @GetMapping("/delay")
  20.    public Object delay(String delay) {
  21.        rabbitProducer.sendDelayMsg(delay); // 发送消息
  22.        HashMap<String, Object> result = new HashMap<>();
  23.        result.put("code", 0);
  24.        result.put("msg", "success");
  25.        return result;
  26.   }
  27. }

5 依赖

    
  1.    <dependency>
  2.            <groupId>org.springframework.boot</groupId>
  3.            <artifactId>spring-boot-starter-web</artifactId>
  4.        </dependency>
  5.        <dependency>
  6.            <groupId>org.springframework.boot</groupId>
  7.            <artifactId>spring-boot-starter-amqp</artifactId>
  8.        </dependency>
  9.        <dependency>
  10.            <groupId>org.projectlombok</groupId>
  11.            <artifactId>lombok</artifactId>
  12.            <scope>compile</scope>
  13.        </dependency>

6 运行截图

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/126990
推荐阅读
相关标签
  

闽ICP备14008679号