赞
踩
我们在工作中总有那么一下场景需要使用到延迟队列
举个常见的例子:订单x分钟后如果未付款自动取消、更改某项配置并且设定1天后才生效
针对上述场景我们都可以使用延迟队列来解决
实现延迟队列有如下2种方式
有以下几种操作会导致消息进入死信(Dead Lettering)队列
basic.nack
或者basic.reject
并且将requeue
参数设置为false
否定消息时TTL
并且过期时如果没有配置死信那么消息将会被丢弃
先瞜一眼官方的说明
将“dead-letter-exchange” 添加到策略定义中
在spring boot
中我们该如何呢
我们可以看到Queue
参数最多的构造方法,最后有一个arguments
字段
我们在这个字段传入“dead-letter-exchange”
// 死信队列的binding
// Binding binding
Map<String, Object> arguments = new HashMap<>(16);
arguments.put("x-dead-letter-exchange", binding.getExchange());
arguments.put("x-dead-letter-routing-key", binding.getRoutingKey());
// 可以设置TTL
// arguments.put("x-message-ttl", 10000);
// QueueBuilder.ttl(10000)也行
// Queue queue = new Queue("xxx", true, false, false, arguments);
Queue queue = QueueBuilder.durable(DEMO_QUEUE).withArguments(arguments).build();
死信队列声明和普通的队列没有区别
@Bean
public DirectExchange dlxDirectExchange() {
return new DirectExchange(DLX_DIRECT_EXCHANGE);
}
@Bean
public Queue dlxQueue() {
return new Queue(DLX_QUEUE);
}
@Bean
public Binding dlxHandlerBinding(@Qualifier("dlxQueue") Queue queue,
@Qualifier("dlxDirectExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DLX_HANDLER_ROUTING_KEY);
}
紧接着在发送消息的时候指定TTL
或者上面剩余两种操作都可以
// 单位是ms。假设是1秒,expiration="1000"
String finalExpiration = expiration;
// binding是上面绑定"x-dead-letter-exchange"队列所对应的binding
rabbitTemplate.convertAndSend(binding.getExchange(),
binding.getRoutingKey(),
dataMessage,
message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置消息过期时间
messageProperties.setExpiration(finalExpiration);
return message;
});
注:消息和队列的过期时间那个小取那个
如果我们先放一条10s过期的消息A,再放入一条5s过期的消息B
消息B会先进入死信队列吗?
答案是否定的,队列会遵循先入先出的规则。B在消息A过期后随着A一起进入死信队列被消费者消费
看起来并不是完美的延迟队列,下面我们看插件方式
在3.5.8以后官方推荐使用rabbitmq-delayed-message-exchange
插件来解决延迟队列
官网插件地址:Community Plugins — RabbitMQ
GitHub:Releases · rabbitmq/rabbitmq-delayed-message-exchange
⚠️需要注意,插件对版本是有要求的
将插件下载到rabbitmq
的插件目录
然后执行如下命令即可
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
我们重新打开RabbitMQ Management
可以看到exchange
中有x-delayed-message
类型,说明插件安装成功了
只有交换机的定义有所区别,其余的没有区别
HashMap<String, Object> arguments = new HashMap<>(2);
// 需要指定delayed-type,可以是:direct、fanout、topic、headers
arguments.put("x-delayed-type", "direct");
// 参数依次含义是:交换机名称、交换机类型、持久化、自动删除、自定义交换机参数
CustomExchange exchange = new CustomExchange("exchange-name", "x-delayed-message", true, false, arguments);
我们需要手动指定交换机类型为x-delayed-message
并且需要一个参数x-delayed-type
,参数值上面有写
// binding是绑定"x-delayed-message"类型的交换机对应的binding
rabbitTemplate.convertAndSend(binding.getExchange(),
binding.getRoutingKey(),
dataMessage,
message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置延时时间。单位:ms
messageProperties.setDelay(1000);
return message;
});
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。