赞
踩
当一个消息再队列里变为死信时,它会被重新publish到另一个exchange交换机上,这个exchange就为DLX。因此我们只需要在声明正常的业务队列时添加一个可选的"x-dead-letter-exchange"参数,值为死信交换机,死信就会被rabbitmq重新publish到配置的这个交换机上,我们接着监听这个交换机就可以了。
package com.lank.demo.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitmqConfig { //死信交换机,队列,路由相关配置 public static final String DLK_EXCHANGE = "dlk.exchange"; public static final String DLK_ROUTEKEY = "dlk.routeKey"; public static final String DLK_QUEUE = "dlk.queue"; //业务交换机,队列,路由相关配置 public static final String DEMO_EXCHANGE = "demo.exchange"; public static final String DEMO_QUEUE = "demo.queue"; public static final String DEMO_ROUTEKEY = "demo.routeKey"; //延时插件DelayedMessagePlugin的交换机,队列,路由相关配置 public static final String DMP_EXCHANGE = "dmp.exchange"; public static final String DMP_ROUTEKEY = "dmp.routeKey"; public static final String DMP_QUEUE = "dmp.queue"; @Bean public DirectExchange demoExchange(){ return new DirectExchange(DEMO_EXCHANGE,true,false); } @Bean public Queue demoQueue(){ //只需要在声明业务队列时添加x-dead-letter-exchange,值为死信交换机 Map<String,Object> map = new HashMap<>(1); map.put("x-dead-letter-exchange",DLK_EXCHANGE); //该参数x-dead-letter-routing-key可以修改该死信的路由key,不设置则使用原消息的路由key map.put("x-dead-letter-routing-key",DLK_ROUTEKEY); return new Queue(DEMO_QUEUE,true,false,false,map); } @Bean public Binding demoBind(){ return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY); } @Bean public DirectExchange dlkExchange(){ return new DirectExchange(DLK_EXCHANGE,true,false); } @Bean public Queue dlkQueue(){ return new Queue(DLK_QUEUE,true,false,false); } @Bean public Binding dlkBind(){ return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY); } //延迟插件使用 //1、声明一个类型为x-delayed-message的交换机 //2、参数添加一个x-delayed-type值为交换机的类型用于路由key的映射 @Bean public CustomExchange dmpExchange(){ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments); } @Bean public Queue dmpQueue(){ return new Queue(DMP_QUEUE,true,false,false); } @Bean public Binding dmpBind(){ return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs(); } }
package com.lank.demo.rabbitmq; import com.lank.demo.config.RabbitmqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** 1. @author lank 2. @since 2020/12/14 10:33 */ @Component @Slf4j public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; //使用死信队列发送消息方法封装 public void send(String message,Integer time){ String ttl = String.valueOf(time*1000); //exchange和routingKey都为业务的就可以,只需要设置消息的过期时间 rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置消息的过期时间,是以毫秒为单位的 message.getMessageProperties().setExpiration(ttl); return message; } }); log.info("使用死信队列消息:{}发送成功,过期时间:{}秒。",message,time); } //使用延迟插件发送消息方法封装 public void send2(String message,Integer time){ rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //使用延迟插件只需要在消息的header中添加x-delay属性,值为过期时间,单位毫秒 message.getMessageProperties().setHeader("x-delay",time*1000); return message; } }); log.info("使用延迟插件发送消息:{}发送成功,过期时间:{}秒。",message,time); } }
package com.lank.demo.rabbitmq; import com.lank.demo.config.RabbitmqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class MessageReceiver { @RabbitHandler @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE) public void onMessage(Message message){ log.info("使用死信队列,收到消息:{}",new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE) public void onMessage2(Message message){ log.info("使用延迟插件,收到消息:{}",new String(message.getBody())); } }
package com.lank.demo.controller; import com.lank.demo.rabbitmq.MessageSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class MessageController { @Autowired public MessageSender messageSender; //死信队列controller @GetMapping("/send") public String send(@RequestParam String msg,Integer time){ messageSender.send(msg,time); return "ok"; } //延迟插件controller @GetMapping("/send2") public String sendByPlugin(@RequestParam String msg,Integer time){ messageSender.send2(msg,time); return "ok"; } }
server.port=4399
#virtual-host使用默认的/就好,如果需要/demo需自己在控制台添加
spring.rabbitmq.virtual-host=/demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2020-12-16 22:47:28.071 INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用死信队列消息:hello发送成功,过期时间:5秒。
2020-12-16 22:47:33.145 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver : 使用死信队列,收到消息:hello
当我往死信队列中发送两条不同过期时间的消息时,如果先发送的消息A的过期时间大于后发送的消息B的过期时间时,由于消息的顺序消费,消息B过期后并不会立即重新publish到死信交换机,而是会等到消息A过期后一起被消费。
依次发送两个请求http://localhost:4399/send?msg=消息A&time=30和http://localhost:4399/send?msg=消息B&time=10,消息A先发送,过期时间30S,消息B后发送,过期时间10S,我们想要的结果应该是10S收到消息B,30S后收到消息A,但结果并不是,控制台输出如下:
2020-12-16 22:54:47.339 INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用死信队列消息:消息A发送成功,过期时间:30秒。
2020-12-16 22:54:54.278 INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用死信队列消息:消息B发送成功,过期时间:10秒。
2020-12-16 22:55:17.356 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver : 使用死信队列,收到消息:消息A
2020-12-16 22:55:17.357 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver : 使用死信队列,收到消息:消息B
消息A30S后被成功消费,紧接着消息B被消费。因此当我们使用死信队列时应该注意是否消息的过期时间都是一样的,比如订单超过10分钟未支付修改其状态。如果当一个队列各个消息的过期时间不一致时,使用死信队列就可能达不到延时的作用。这时候我们可以使用延时插件来实现这需求。
RabbitMQ Delayed Message Plugin是一个rabbitmq的插件,所以使用前需要安装它,可以参考的GitHub地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
2020-12-16 23:31:19.819 INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用延迟插件发送消息:消息A发送成功,过期时间:30秒。
2020-12-16 23:31:27.673 INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender : 使用延迟插件发送消息:消息B发送成功,过期时间:10秒。
2020-12-16 23:31:37.833 INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver : 使用延迟插件,收到消息:消息B
2020-12-16 23:31:49.917 INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver : 使用延迟插件,收到消息:消息A
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。