当前位置:   article > 正文

死信交换机&延迟队列

死信交换机

说明:在MQ中,当一个队列中的消息出现以下情况时,就成为了死信(Dead Letter);

  • 被消费者拒绝消费或者声明失败,并且requeue设置为false,即不再重新入队列;

  • 队列中的消息存满,消息无法再入队列;

  • 消息过期

此时,可以通过指定死信交换机,把这些消息路由到一个专门存放死信的队列中,以消息超时过期为例:

死信交换机&延迟队列

设置一个有超时的队列,该队列中的消息超过10秒未被消费成为死信,并把消息由指定的死信交换机路由到指定的队列;

/**
 * 超时队列
 */
@Configuration
public class DelayConfig {

    /**
     * 创建延迟交换机
     * @return
     */
    @Bean
    public DirectExchange delayExchange(){
        return new DirectExchange("delay.direct",true,false);
    }

    /**
     * 创建延迟队列,该队列的消息需要延迟10秒,超时未消费后路由到死信交换机(death.delay.direct)那里去
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return QueueBuilder.durable("delay.queue")
                .ttl(10000)
                .deadLetterExchange("death.delay.direct")
                .build();
    }

    /**
     * 绑定
     * @return
     */
    @Bean
    public Binding dlBinding(){
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

接收死信交换机路由过来的消息

    /**
     * 接收死信交换机(death.delay.direct)路由过来队列(death.delay.queue)消息,路由关键字是delay
     * @param deathLetterMessage
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name ="death.delay.queue", durable = "true"),
            exchange = @Exchange(name = "death.delay.direct"),
            key = "delay"
    ))
    public void getDeathLetterMessage(String deathLetterMessage){
        System.out.println("deathLetterMessage = " + deathLetterMessage);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

发送一条消息

    /**
     * 发送消息到延迟队列中
     */
    @Test
    public void testDelayQueue(){
        String message = "hello, delay queue";

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

延迟10秒后,被转发到死信交换机,路由到存放死信的队列并打印

在这里插入图片描述

另外,如果给队列设置了超时,同时也给消息设置了超时,执行效果是,队列超时和消息超时,哪个短以哪个为准

    /**
     * 发送消息到延迟队列中
     */
    @Test
    public void testDelayQueue(){
        // 设置消息的超时为5000ms,即5秒
        Message message = MessageBuilder.withBody("hello , delay queue".getBytes(StandardCharsets.UTF_8))
                .setExpiration("5000")
                .build();

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

延迟五秒,打印消息内容

在这里插入图片描述

插件使用

因为延迟队列的使用场景非常多,所以RabbitMQ的官方也推出了一个插件,实现延迟队列的效果。(插件下载地址:https://www.rabbitmq.com/community-plugins.html)

在这里插入图片描述

第一步:上传到服务器

输入下面的命令,数据卷名改成自己创建mq容器时,挂载的数据卷名

docker volume inspect 数据卷名
  • 1

找到容器挂载的数据卷路径

在这里插入图片描述

将下载后的插件复制到这里

在这里插入图片描述

第二步:安装插件

安装插件需要进入到MQ容器中,输入下面的命令

docker exec -it 容器名 bash
  • 1

输入下面的命令,安装插件

rabbitmq-plugins enable 插件名
  • 1

安装成功

在这里插入图片描述

第三步:使用

使用插件,不需要额外指定死信交换机,可直接在原队列、原消息上设置

在队列上设置,使用delayed属性

    /**
     * 使用插件实现延迟队列
     * @param delayMessage
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name ="plugins.delay.queue", durable = "true"),
            exchange = @Exchange(name = "plugins.delay.direct", delayed = "true"),
            key = "plugins"
    ))
    public void getDelayMessage(String delayMessage){
        System.out.println("delayMessage = " + delayMessage);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如果使用@Bean注解方式创建,使用delayed()

    @Bean
    public DirectExchange delayedExchange(){
        return ExchangeBuilder
                .directExchange("plugins.delay.direct")
                .delayed()
                .durable(true)
                .build();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

消息设置,使用setHeader(“x-delay”, duration)方法

    /**
     * 发送消息到延迟队列中
     */
    @Test
    public void testPluginsDelayQueue(){
        // 设置消息的超时为5000ms,即5秒
        Message message = MessageBuilder.withBody("hello , plugins delay queue".getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay", 5000)
                .build();

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("plugins.delay.direct","plugins",message,correlationData);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

延迟5秒,接收到消息

在这里插入图片描述

总结

延迟队列可通过死信交换机和插件的方式实现,可应用于订单未支付,超时失效、预约等场景

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/598251
推荐阅读
相关标签
  

闽ICP备14008679号