赞
踩
延时队列就是死信队列中的TTL过期的一种。延时队列内部是有序的,延时队列中的元素是希望在指定时间到了以后或之前取出和处理。延时队列就是用来存放需要在指定时间被处理的元素的队列。
应用场景:订单到时未支付自动取消、用户注册成功一定时间未登录提醒等。
基于SpringBoot的延时队列可能会出现消息不按时过期的情况。RabbitMQ在检查消息时,只会检查第一条消息是否过期,如果过期则丢到死信队列,如果第一条消息的延时很长,第二条消息的延时很短,第二个消息也不会优先处理。解决办法用RabbitMQ插件
创建两个队列QA和QB,两者队列TTL分别设置为10s和40s,然后创建一个交换机X和一个死信交换机Y,他们类型都是direct,创建一个死信队列QD。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- # rabiitmq 配置
- spring:
- rabbitmq:
- host: 192.168.10.109
- port: 5672
- username: admin
- password: 123
-
- //信道A
- public static final String CHANNEL_A="CA";
- //信道B
- public static final String CHANNEL_B="CB";
- //信道死信
- public static final String CHANNEL_DIE="CDIE";
- //自定义时间
- public static final String CHANNEL_SELF="CSELF";
-
- //声明普通交换机
- @Bean("exchangeX")
- public DirectExchange exchangeX(){
- return new DirectExchange(NORMAL_EXCHANGE);
- }
-
- //声明死信交换机
- @Bean("exchangeY")
- public DirectExchange exchangeY(){
- return new DirectExchange(DIE_EXCHANGE);
- }
-
- //声明普通队列
- @Bean("queueA")
- public Queue queueA(){
- Map<String, Object> arguments =new HashMap<>();
- return QueueBuilder.durable(QUEUE_A)
- // //死信交换机
- .deadLetterExchange(DIE_EXCHANGE)
- // //死信routingkey
- .deadLetterRoutingKey(CHANNEL_DIE)
- // //设置ttl时间
- .ttl(2000)
- .build();
- }
-
- //声明普通队列
- @Bean("queueB")
- public Queue queueB(){
- return QueueBuilder.durable(QUEUE_B)
- //死信交换机
- .deadLetterExchange(DIE_EXCHANGE)
- //死信routingkey
- .deadLetterRoutingKey(CHANNEL_DIE)
- //设置ttl时间
- .ttl(5000)
- .build();
- }
-
- //声明死信队列
- @Bean("queueDie")
- public Queue queueDie(){
- return QueueBuilder.durable(QUEUE_DIE).build();
- }
-
- //声明自定义时间
- @Bean("queueSelf")
- public Queue queueSelf(){
- return QueueBuilder.durable(QUEUE_SELF)
- //死信交换机
- .deadLetterExchange(DIE_EXCHANGE)
- //死信routingkey
- .deadLetterRoutingKey(CHANNEL_DIE)
- .build();
- }
-
- //绑定A
- @Bean
- public Binding queueABindingX(@Qualifier("queueA")Queue queueA,
- @Qualifier("exchangeX")DirectExchange exchangeX){
- return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_A);
- }
- //绑定B
- @Bean
- public Binding queueBBindingX(@Qualifier("queueB")Queue queueA,
- @Qualifier("exchangeX")DirectExchange exchangeX){
- return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_B);
- }
- //绑定self
- @Bean
- public Binding queueSelfBindingX(@Qualifier("queueSelf")Queue queueA,
- @Qualifier("exchangeX")DirectExchange exchangeX){
- return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_SELF);
- }
- //绑定DIE
- @Bean
- public Binding queueDieBindingX(@Qualifier("queueDie")Queue queueA,
- @Qualifier("exchangeY")DirectExchange exchangeX){
- return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_DIE);
- }
-
-
- }
- @RestController
- @RequestMapping("/product")
- public class ProductController {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping("/send")
- public String sendMsg(@RequestParam(value = "msg",required = true) String msg,
- @RequestParam(value = "time",required = false)Integer time){
- // 信道A
- System.out.println("信道A发送消息:"+msg);
- rabbitTemplate.convertAndSend(TtlConfig.NORMAL_EXCHANGE,TtlConfig.CHANNEL_A,"A的消息"+msg);
- //信道B
- System.out.println("信道B发送消息:"+msg);
- rabbitTemplate.convertAndSend(TtlConfig.NORMAL_EXCHANGE,TtlConfig.CHANNEL_B,"B的消息"+msg);
- if(null!=time && !"".equals(time)){
- //信道C
- System.out.println("信道自定义发送消息:"+msg+",时间:"+time+"秒");
- rabbitTemplate.convertAndSend(TtlConfig.NORMAL_EXCHANGE,TtlConfig.CHANNEL_SELF,"自定义的消息"+msg,message -> {
- //延时
- int i = time * 1000;
- message.getMessageProperties().setExpiration(String.valueOf(i));
- return message;
- });
- }
- return "发送成功";
- }
- }
- @Component
- public class TtlConsumer {
- //接收消息
- @RabbitListener(queues = TtlConfig.QUEUE_DIE)
- public void getMsg(Message msg, Channel channel){
- String body = new String(msg.getBody());
- System.out.println(body);
- }
- }
Community Plugins — RabbitMQ下载rabbitmq_delayed_message_exchange。
放到/usr/lib/rabbitmq/lib/rabbitmq_server-3.11.3/plugins/目录中,
执行命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启服务
- //基于插件的延时队列
- @Configuration
- public class DelayConfig {
- //交换机
- public static final String DELAY_EXCHANGE="delayexchange";
- //队列
- public static final String DELAY_QUEUE="delayqueue";
- //routingkey
- public static final String DELAY_KEY="delaykey";
-
- //交换机
- @Bean
- public CustomExchange delayExchange(){
- Map<String, Object> arguments=new HashMap<>();
- arguments.put("x-delayed-type","direct");
- //1、交换机名称。2、交换机类型。3、是否持久化。4、是否需要自动删除。5、其他参数。
- return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
- }
-
- //队列
- @Bean
- public Queue delayQueue(){
- return new Queue(DELAY_QUEUE);
- }
-
- //绑定
- @Bean
- public Binding delayBinding(@Qualifier("delayQueue")Queue queue,
- @Qualifier("delayExchange") CustomExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(DELAY_KEY).noargs();
- }
- }
- @GetMapping("/delayMsg")
- public String delayMsg(@RequestParam(value = "msg") String msg,
- @RequestParam(value = "time",required = false)Integer time){
- //信道C
- System.out.println("延时队列插件版消息:"+msg+",时间:"+time+"秒");
- rabbitTemplate.convertAndSend(DelayConfig.DELAY_EXCHANGE,DelayConfig.DELAY_KEY,"延时队列插件版消息:"+msg, message -> {
- //延时
- int i = time * 1000;
- message.getMessageProperties().setDelay(i);
- return message;
- });
- return "发送成功";
- }
- @Component
- public class DelayConsumer {
- //接收消息
- @RabbitListener(queues = DelayConfig.DELAY_QUEUE)
- public void getDelayMsg(Message msg, Channel channel){
- String body = new String(msg.getBody());
- System.out.println(body);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。