赞
踩
在项目中,我们经常会遇到需要进行延迟的场景比如 延迟计算,延迟重试,延迟关闭订单等等,延迟的技术方案多种多样,我这里列举RabbitMq进行延迟的方案之一:TTL+死信
public static final String CHECK_POST_TTL_EXCHANGE = "check-post-ttl-exchange";
public static final String CHECK_POST = "check-post";
public static final String CHECK_POST_TTL_QUEUE = "check-post-ttl-queue";
public static final String CHECK_POST_DEAD_EXCHANGE = "check-post-dead-exchange";
public static final String CHECK_POST_DEAD_QUEUE = "check-post-dead-queue";
实际就是一个普通的队列,我这里定义为fanout类型
@Bean
public FanoutExchange checkPostTtlExchange() {
return new FanoutExchange(CHECK_POST_TTL_EXCHANGE);
}
可定义统一的消息存活时间,死信交换机的名字,死信交换机的路由键
@Bean
public Queue checkPostTtlQueue() {
Map<String, Object> args = new HashMap<>(8);
// 消息存活时间 ,key 固定 value 必须为Int值,此种方式则消息存活时间固定
args.put("x-message-ttl", 10000);
//设置死信交换机,value为死信交换机的名字
args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
//设置死信 routing_key,value为死信路由键的名字
args.put("x-dead-letter-routing-key", CHECK_POST);
return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
}
@Bean
Binding ttlBind() {
return BindingBuilder.bind(checkPostTtlQueue()).to((checkPostTtlExchange()));
}
死信交换机实际也就是一个普通的交换机,我们这里需要将其申明为直连类型交换机,需要结合路由键一起使用
/**
* 死信交换机
* @return
*/
@Bean
DirectExchange checkPostDeadExchange() {
return new DirectExchange(CHECK_POST_DEAD_EXCHANGE, true, false);
}
死信队列实际也是一个普通的队列
/**
* 死信队列
* @return
*/
@Bean
Queue checkPostDeadQueue() {
return new Queue(CHECK_POST_DEAD_QUEUE, true, false, false);
}
/**
* 绑定死信队列和死信交换机
* @return
*/
@Bean
Binding dlxBinding() {
return BindingBuilder.bind(checkPostDeadQueue()).to(checkPostDeadExchange()).with(CHECK_POST);
}
我们定义的TTL交换机,实际就是一个普普通通的交换机,与其绑定的TTL队列额外加入了配置参数`x-dead-letter-exchange` 申明了死信交换机的名字,配置参数`x-dead-letter-routing-key` 申明了死信交换机的路由键
定义的死信交换机本质上是一个普通的直连交换机,交换机名字需与TTL队列中配置属性`x-dead-letter-exchange`指定的值一致,死信队列则是一个普通的队列,其使用的路由键需与TTL队列中配置属性`dead-letter-routing-key`指定的值一致
我们将需要延迟的消息发至TTL交换机中,TTL交换机将消息发送至绑定的TTL队列,且我们不设置消费者去监听这个TTL队列的消息,当消息在TTL队列中存活指定的时间后(上方TTL队列中x-message-ttl
属性)消息将再次发送到x-dead-letter-exchange
申明的死信交换机中,如果有队列(我们称之为死信队列)绑定了死信交换机且路由键为TTL队列中配置的x-dead-letter-routing-key
,则会接收到消息,我们只监听对应的死信队列消息,就完成了消息的延迟
固定延迟无法根据消息选择延迟时间,适合延迟时间统一的场景,重点在于TTL队列申明中,配置属性x-message-ttl
,其配置以队列为基准,该队列中,所有消息均为指定的存活时间
配置队列中消息将固定延迟10s
@Bean
public Queue checkPostTtlQueue() {
Map<String, Object> args = new HashMap<>(8);
// 消息存活时间 ,key 固定 value 必须为Int值,此种方式则消息存活时间固定
args.put("x-message-ttl", 10000);
//设置死信交换机,value为死信交换机的名字
args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
//设置死信 routing_key,value为死信路由键的名字
args.put("x-dead-letter-routing-key", CHECK_POST);
return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
}
private void addDisableScheduleTimelyConfig(Integer configId, int expirationTime) {
final long start = System.currentTimeMillis();
int finalExpirationTime = expirationTime + 30000;
rabbitTemplate.convertAndSend(RabbitMqConfig.CHECK_POST_TTL_EXCHANGE, "", configId.toString(), message -> {
// 由于TTL队列中申明了x-message-ttl 为10000ms,下方延迟时间配置不会生效
message.getMessageProperties().setExpiration((Integer.toString(finalExpirationTime)));
message.getMessageProperties().setContentEncoding("UTF-8");
System.out.println("延迟消息发送时间:" + LocalDateTime.now());
return message;
});
}
死信队列消费者
@RabbitListener(queues = RabbitMqConfig.CHECK_POST_DEAD_QUEUE)
public void flushCheckConfigState(Message message) {
try {
System.out.println("死信队列消费者接受:" + LocalDateTime.now());
} catch (Exception exception) {
exception.printStackTrace();
log.error("处理死信消息失败:{}",new String(message.getBody()));
}
}
动态延迟即延迟消息不固定,可为每一条消息设置延迟时间,属性配置以队列中的消息为基准
需要去除TTL队列申明中的中x-message-ttl
配置属性
@Bean
public Queue checkPostTtlQueue() {
Map<String, Object> args = new HashMap<>(4);
//设置死信交换机
args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", CHECK_POST);
return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
}
发送消息指明延迟时间,我这一条消息是延迟了90000ms,即一分半
private void addDisableScheduleTimelyConfig(Integer configId, int expirationTime) {
int finalExpirationTime = expirationTime + 30000;
rabbitTemplate.convertAndSend(RabbitMqConfig.CHECK_POST_TTL_EXCHANGE, "", configId.toString(), message -> {
// 由于TTL队列中申明了x-message-ttl 为10000ms,下方延迟时间配置不会生效
message.getMessageProperties().setExpiration((Integer.toString(finalExpirationTime)));
message.getMessageProperties().setContentEncoding("UTF-8");
System.out.println("延迟消息发送时间:" + LocalDateTime.now());
return message;
});
}
发现从消息发送,到消息接收,中间时间间隔90000ms左右,说明延迟消息生效
假设生产者 生产的消息延时情况如下:
从延迟的合理性上来讲,消费者接收消息应该是 先收到Id为2的数据 然后是Id为3的数据,最后才是Id为1的数据
消费者接收情况:
发现瞬时接收到了三条不同延迟的消息,且与延迟长短无关,首先是接收到了延迟时间最长的Id1数据,然后接着是2 3,导致2消息多延迟了36秒,3消息多延迟了
经过测试发现,如果设置了动态延迟
(消息延迟根据消息自定义,消息延迟时间有长有短),当前一个消息延迟时间很长的话,会阻塞下一个延迟时间很短的消息。
先用一条消息的存活时间是1小时,后面又进了一条消息存活时间是10分钟,结果10分钟到了后,发现这条消息并没有被转发到消费延时过期消息的队列。
即:使用 TTL(Time-To-Live)和死信队列的组合,有时可能会导致队头阻塞问题
会导致什么问题呢?
一个过期时间较长的消息先进入队列后,队列中很有可能会堆积一堆后进的但过期时间相对较短的消息,且当先进的过期时间较长的消息转发至死信队列后,后边较短的消息均会被瞬间转发,造成消费延时以及峰谷式消费压力过大
原因是虽然我们为每个消息都设置了不同的过期时间(TTL)。但所有延时消息都还在一个队列里,不会对每一个消息是否过期进行检测,而是消息入列按照顺序(先进先出)依次进行检测,即从队列的头消息开始检测,如果前面消息的延迟时间很长,就会阻塞后进的消息了。
注意查看项目是否有动态延迟的需求无动态延迟
无:
则使用基于队列属性x-message-ttl
设置固定延迟即可,此时的TTL+死信因过期时间配置一致,先来队列的消息会先过期,自然不会阻塞下一个消息
有:
RabbitMq提供了一个延迟插件,使用延迟插件发送的动态延迟消息不会有以上问题,可参考我的另一篇博文Springboot项目使用RabbitMQ 实现延迟队列,延迟消息 以及 docker 安装 rabbitmq并添加延迟队列插件
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。