当前位置:   article > 正文

rabbitmq 延迟队列的使用_x-delayed-message

x-delayed-message

我们在工作中总有那么一下场景需要使用到延迟队列

举个常见的例子:订单x分钟后如果未付款自动取消、更改某项配置并且设定1天后才生效
针对上述场景我们都可以使用延迟队列来解决

实现延迟队列有如下2种方式

  • 死信(Dead Lettering)队列
  • 插件(rabbitmq-delayed-message-exchange)

死信(Dead Lettering)队列

有以下几种操作会导致消息进入死信(Dead Lettering)队列

  • 消费者使用basic.nack或者basic.reject并且将requeue参数设置为false否定消息时
  • 设置了消息的TTL并且过期时
  • 队列长度超过限制时消息被丢弃

如果没有配置死信那么消息将会被丢弃

如何配置

先瞜一眼官方的说明
image.png

将“dead-letter-exchange” 添加到策略定义中

spring boot中我们该如何呢
我们可以看到Queue参数最多的构造方法,最后有一个arguments字段
image.png
我们在这个字段传入“dead-letter-exchange”

// 死信队列的binding
// Binding binding
Map<String, Object> arguments = new HashMap<>(16);
arguments.put("x-dead-letter-exchange", binding.getExchange());
arguments.put("x-dead-letter-routing-key", binding.getRoutingKey());
// 可以设置TTL
// arguments.put("x-message-ttl", 10000);
// QueueBuilder.ttl(10000)也行
// Queue queue = new Queue("xxx", true, false, false, arguments);
Queue queue = QueueBuilder.durable(DEMO_QUEUE).withArguments(arguments).build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

死信队列声明和普通的队列没有区别

@Bean
public DirectExchange dlxDirectExchange() {
    return new DirectExchange(DLX_DIRECT_EXCHANGE);
}

@Bean
public Queue dlxQueue() {
    return new Queue(DLX_QUEUE);
}

@Bean
public Binding dlxHandlerBinding(@Qualifier("dlxQueue") Queue queue,
                                     @Qualifier("dlxDirectExchange") DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(DLX_HANDLER_ROUTING_KEY);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

发消息

紧接着在发送消息的时候指定TTL或者上面剩余两种操作都可以

// 单位是ms。假设是1秒,expiration="1000"
String finalExpiration = expiration;
// binding是上面绑定"x-dead-letter-exchange"队列所对应的binding
rabbitTemplate.convertAndSend(binding.getExchange(),
        binding.getRoutingKey(),
        dataMessage,
        message -> {
            MessageProperties messageProperties = message.getMessageProperties();
            // 设置消息过期时间
            messageProperties.setExpiration(finalExpiration);
            return message;
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

注:消息和队列的过期时间那个小取那个

如果我们先放一条10s过期的消息A,再放入一条5s过期的消息B
消息B会先进入死信队列吗?
答案是否定的,队列会遵循先入先出的规则。B在消息A过期后随着A一起进入死信队列被消费者消费
看起来并不是完美的延迟队列,下面我们看插件方式

延迟队列插件

在3.5.8以后官方推荐使用rabbitmq-delayed-message-exchange插件来解决延迟队列
官网插件地址:Community Plugins — RabbitMQ
image.png
GitHub:Releases · rabbitmq/rabbitmq-delayed-message-exchange

⚠️需要注意,插件对版本是有要求的
image.png

插件安装

将插件下载到rabbitmq的插件目录
然后执行如下命令即可
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
我们重新打开RabbitMQ Management可以看到exchange中有x-delayed-message类型,说明插件安装成功了
image.png

延迟队列的使用

只有交换机的定义有所区别,其余的没有区别

HashMap<String, Object> arguments = new HashMap<>(2);
// 需要指定delayed-type,可以是:direct、fanout、topic、headers
arguments.put("x-delayed-type", "direct");
// 参数依次含义是:交换机名称、交换机类型、持久化、自动删除、自定义交换机参数
CustomExchange exchange = new CustomExchange("exchange-name", "x-delayed-message", true, false, arguments);
  • 1
  • 2
  • 3
  • 4
  • 5

我们需要手动指定交换机类型为x-delayed-message并且需要一个参数x-delayed-type,参数值上面有写

// binding是绑定"x-delayed-message"类型的交换机对应的binding
rabbitTemplate.convertAndSend(binding.getExchange(),
        binding.getRoutingKey(),
        dataMessage,
        message -> {
            MessageProperties messageProperties = message.getMessageProperties();
            // 设置延时时间。单位:ms
            messageProperties.setDelay(1000);
            return message;
        });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/549152
推荐阅读
相关标签
  

闽ICP备14008679号