当前位置:   article > 正文

RabbitMQ之延迟队列

RabbitMQ之延迟队列

为什么要有延迟队列?
延迟消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
使用场景:
短信通知:下单成功后60s之后给用户发送短信通知。
失败重试:业务操作失败后,间隔一定的时间进行失败重试。
实现方式
1:Time To Live(TTL)、Dead Letter Exchanges(DLX)
RabbitMQ 提供了过期时间 TTL 机制,可以设置消息在队列中的存活时长。在消息到达过期时间时,会从当前队列中删除,并被 RabbitMQ 自动转发到对应的死信队列中。
然后再来消费该死信队列,这样就可以实现一个延迟队列的效果
2:利用 RabbitMQ 中的插件 x-delay-message
以下为实现过程
一:使用TTL的方式实现

===========================》配置类
@Configuration
public class DirectExchangeConfiguration {
    /**
     * 延迟队列
     *
     * @return Queue
     */
    @Bean
    public Queue queueDelay11() {
        // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除
        return new Queue(
                Message11.QUEUE_DELAY,
                true,
                false,
                false);
    }

    /**
     * 队列,绑定过期时间等
     *
     * @return Queue
     */
    @Bean
    public Queue queue11() {
        return QueueBuilder
                // durable: 是否持久化
                .durable(Message11.QUEUE)
                // exclusive: 是否排它
                .exclusive()
                // autoDelete: 是否自动删除
                .autoDelete()
                // TTL 设置队列里的默认过期时间为 10 秒
                .ttl(10 * 1000)
                // DLX
                .deadLetterExchange(Message11.EXCHANGE)
                .deadLetterRoutingKey(Message11.ROUTING_KEY_DELAY)
                .build();
    }

    @Bean
    public DirectExchange exchange11() {
        // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
        return new DirectExchange(Message11.EXCHANGE,
                true,
                false);
    }

    /**
     * 创建 Binding
     * Exchange:Message11.EXCHANGE
     * Routing key:Message11.ROUTING_KEY
     * Queue:Message11.QUEUE
     *
     * @return Binding
     */
    @Bean
    public Binding binding11() {
        return BindingBuilder
                .bind(queue11()).to(exchange11())
                .with(Message11.ROUTING_KEY);
    }

    /**
     * 绑定延迟队列
     *
     * @return Binding
     */
    @Bean
    public Binding bindingDelay11() {
        return BindingBuilder
                .bind(queueDelay11()).to(exchange11())
                .with(Message11.ROUTING_KEY_DELAY);
    }
======================》消息实体
@Data
public class Message11 implements Serializable {
    /**
     * 普通队列
     */
    public static final String QUEUE = "QUEUE_11";
    /**
     * 延迟队列
     */
    public static final String QUEUE_DELAY = "QUEUE_DELAY_11";

    public static final String EXCHANGE = "EXCHANGE_11";

    public static final String ROUTING_KEY = "ROUTING_KEY_11";
    public static final String ROUTING_KEY_DELAY = "ROUTING_KEY_DELAY_11";

    private String id;
}
=================================》生产者代码
@Component
public class Producer11 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void syncSend(String id, int delay) {
        Message11 message = new Message11();
        message.setId(id);

        MessagePostProcessor postProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息的 TTL 过期时间
                if (delay > 0) {
                    message.getMessageProperties().setExpiration(String.valueOf(delay));
                }
                return message;
            }
        };

        rabbitTemplate.convertAndSend(Message11.EXCHANGE, Message11.ROUTING_KEY, message, postProcessor);
    }
}
========================》消费者
@Component
@RabbitListener(queues = Message11.QUEUE_DELAY)
@Slf4j
public class Consumer11 {

    @RabbitHandler
    public void onMessage(Message11 message) {
        log.info("[{}][Consumer11 onMessage][消息内容:{}]", LocalDateTime.now(), message);
    }
}
  @Test
    void syncSend() {
        String id = UUID.randomUUID().toString();
        int delay = 5000;
        producer11.syncSend(id, delay);
        log.info("[{}][test producer11 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay, id);

        String id2 = UUID.randomUUID().toString();
        int delay2 = 2000;
        producer11.syncSend(id2, delay2);
        log.info("[{}][test producer11 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay2, id2);
        // 其实采用 ttl 这种方式会有一个问题,就是当一个队列中有多个不一样的过期时间的消息的时候,会形成阻塞,只有前一个被消费了才会轮到后一个
        // 比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞
        new CountDownLatch(1).await();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143

二:使用插件实现
此种方式需要安装mq的延迟第一列插件
安装方式如下连接使用docker安装rabbitMQ的延迟第一列插件

=============================》插件方式配置类
@Configuration
public class PluginsExchangeConfiguration {

    @Bean
    public Queue queue12() {
        // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除
        return new Queue(
                Message12.QUEUE,
                true,
                false,
                false);
    }

    /**
     * 创建一个延迟交换机 注意类型为 “x-delayed-message”
     *
     * @return 交换机
     */
    @Bean
    public CustomExchange exchange12() {
        Map<String, Object> args = new HashMap<>(1);
        args.put("x-delayed-type", "direct");
        return new CustomExchange(Message12.EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding binding12() {
        return BindingBuilder
                .bind(queue12()).to(exchange12())
                .with(Message12.ROUTING_KEY)
                .noargs();
    }
}
======================================》生产者
@Component
public class Producer12 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void syncSend(String id, int delay) {
        Message12 message = new Message12();
        message.setId(id);

        MessagePostProcessor postProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置过期时间
                message.getMessageProperties().setHeader("x-delay", delay);
                return message;
            }
        };

        rabbitTemplate.convertAndSend(Message12.EXCHANGE, Message12.ROUTING_KEY, message, postProcessor);
    }
}
=============================》消费者
@Component
@RabbitListener(queues = Message12.QUEUE)
@Slf4j
public class Consumer12 {

    @RabbitHandler
    public void onMessage(Message12 message) {
        log.info("[{}][Consumer12 onMessage][消息内容:{}]", LocalDateTime.now(), message);
    }
}
@Test
    void syncSend() {
        String id = UUID.randomUUID().toString();
        int delay = 5000;
        producer12.syncSend(id, delay);
        log.info("[{}][test producer12 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay, id);

        String id2 = UUID.randomUUID().toString();
        int delay2 = 2000;
        producer12.syncSend(id2, delay2);
        log.info("[{}][test producer12 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay2, id2);
        // 其实采用 ttl 这种方式会有一个问题,就是当一个队列中有多个不一样的过期时间的消息的时候,会形成阻塞,只有前一个被消费了才会轮到后一个
        // 比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞
        new CountDownLatch(1).await();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

以上的是消费者并发消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/546332
推荐阅读
相关标签
  

闽ICP备14008679号