赞
踩
为什么要有延迟队列?
延迟消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
使用场景:
短信通知:下单成功后60s之后给用户发送短信通知。
失败重试:业务操作失败后,间隔一定的时间进行失败重试。
实现方式
1:Time To Live(TTL)、Dead Letter Exchanges(DLX)
RabbitMQ 提供了过期时间 TTL 机制,可以设置消息在队列中的存活时长。在消息到达过期时间时,会从当前队列中删除,并被 RabbitMQ 自动转发到对应的死信队列中。
然后再来消费该死信队列,这样就可以实现一个延迟队列的效果
2:利用 RabbitMQ 中的插件 x-delay-message
以下为实现过程
一:使用TTL的方式实现
===========================》配置类 @Configuration public class DirectExchangeConfiguration { /** * 延迟队列 * * @return Queue */ @Bean public Queue queueDelay11() { // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除 return new Queue( Message11.QUEUE_DELAY, true, false, false); } /** * 队列,绑定过期时间等 * * @return Queue */ @Bean public Queue queue11() { return QueueBuilder // durable: 是否持久化 .durable(Message11.QUEUE) // exclusive: 是否排它 .exclusive() // autoDelete: 是否自动删除 .autoDelete() // TTL 设置队列里的默认过期时间为 10 秒 .ttl(10 * 1000) // DLX .deadLetterExchange(Message11.EXCHANGE) .deadLetterRoutingKey(Message11.ROUTING_KEY_DELAY) .build(); } @Bean public DirectExchange exchange11() { // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它 return new DirectExchange(Message11.EXCHANGE, true, false); } /** * 创建 Binding * Exchange:Message11.EXCHANGE * Routing key:Message11.ROUTING_KEY * Queue:Message11.QUEUE * * @return Binding */ @Bean public Binding binding11() { return BindingBuilder .bind(queue11()).to(exchange11()) .with(Message11.ROUTING_KEY); } /** * 绑定延迟队列 * * @return Binding */ @Bean public Binding bindingDelay11() { return BindingBuilder .bind(queueDelay11()).to(exchange11()) .with(Message11.ROUTING_KEY_DELAY); } ======================》消息实体 @Data public class Message11 implements Serializable { /** * 普通队列 */ public static final String QUEUE = "QUEUE_11"; /** * 延迟队列 */ public static final String QUEUE_DELAY = "QUEUE_DELAY_11"; public static final String EXCHANGE = "EXCHANGE_11"; public static final String ROUTING_KEY = "ROUTING_KEY_11"; public static final String ROUTING_KEY_DELAY = "ROUTING_KEY_DELAY_11"; private String id; } =================================》生产者代码 @Component public class Producer11 { @Resource private RabbitTemplate rabbitTemplate; public void syncSend(String id, int delay) { Message11 message = new Message11(); message.setId(id); MessagePostProcessor postProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置消息的 TTL 过期时间 if (delay > 0) { message.getMessageProperties().setExpiration(String.valueOf(delay)); } return message; } }; rabbitTemplate.convertAndSend(Message11.EXCHANGE, Message11.ROUTING_KEY, message, postProcessor); } } ========================》消费者 @Component @RabbitListener(queues = Message11.QUEUE_DELAY) @Slf4j public class Consumer11 { @RabbitHandler public void onMessage(Message11 message) { log.info("[{}][Consumer11 onMessage][消息内容:{}]", LocalDateTime.now(), message); } } @Test void syncSend() { String id = UUID.randomUUID().toString(); int delay = 5000; producer11.syncSend(id, delay); log.info("[{}][test producer11 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay, id); String id2 = UUID.randomUUID().toString(); int delay2 = 2000; producer11.syncSend(id2, delay2); log.info("[{}][test producer11 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay2, id2); // 其实采用 ttl 这种方式会有一个问题,就是当一个队列中有多个不一样的过期时间的消息的时候,会形成阻塞,只有前一个被消费了才会轮到后一个 // 比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞 new CountDownLatch(1).await(); }
二:使用插件实现
此种方式需要安装mq的延迟第一列插件
安装方式如下连接使用docker安装rabbitMQ的延迟第一列插件
=============================》插件方式配置类 @Configuration public class PluginsExchangeConfiguration { @Bean public Queue queue12() { // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除 return new Queue( Message12.QUEUE, true, false, false); } /** * 创建一个延迟交换机 注意类型为 “x-delayed-message” * * @return 交换机 */ @Bean public CustomExchange exchange12() { Map<String, Object> args = new HashMap<>(1); args.put("x-delayed-type", "direct"); return new CustomExchange(Message12.EXCHANGE, "x-delayed-message", true, false, args); } @Bean public Binding binding12() { return BindingBuilder .bind(queue12()).to(exchange12()) .with(Message12.ROUTING_KEY) .noargs(); } } ======================================》生产者 @Component public class Producer12 { @Resource private RabbitTemplate rabbitTemplate; public void syncSend(String id, int delay) { Message12 message = new Message12(); message.setId(id); MessagePostProcessor postProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置过期时间 message.getMessageProperties().setHeader("x-delay", delay); return message; } }; rabbitTemplate.convertAndSend(Message12.EXCHANGE, Message12.ROUTING_KEY, message, postProcessor); } } =============================》消费者 @Component @RabbitListener(queues = Message12.QUEUE) @Slf4j public class Consumer12 { @RabbitHandler public void onMessage(Message12 message) { log.info("[{}][Consumer12 onMessage][消息内容:{}]", LocalDateTime.now(), message); } } @Test void syncSend() { String id = UUID.randomUUID().toString(); int delay = 5000; producer12.syncSend(id, delay); log.info("[{}][test producer12 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay, id); String id2 = UUID.randomUUID().toString(); int delay2 = 2000; producer12.syncSend(id2, delay2); log.info("[{}][test producer12 syncSend][延迟时间为:{}][id:{}] 发送成功", LocalDateTime.now(), delay2, id2); // 其实采用 ttl 这种方式会有一个问题,就是当一个队列中有多个不一样的过期时间的消息的时候,会形成阻塞,只有前一个被消费了才会轮到后一个 // 比如先发送了一个延迟20s的消息,后发送了一个延迟为2s的消息,如果第一个消息未到达则后一个消息会被阻塞 new CountDownLatch(1).await(); }
以上的是消费者并发消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。