当前位置:   article > 正文

rabbitmq延迟队列问题及其插件解决_c# rabbitmq expiration

c# rabbitmq expiration

1.背景

使用springboot整合rabbitmq,实现延迟队列,

目的很简单,在delay-queue延迟队列中设置自定义过期时间,然后当过期时间过去了以后,将消息发送到dead-letter-queue死信队列中,接下来,我们试一试看看会发生什么? 

2.项目搭建 

2.1 application.yml

  1. server:
  2. port: 8888
  3. #rabbitmq
  4. spring:
  5. rabbitmq:
  6. host: localhost
  7. username: guest
  8. password: guest
  9. port: 5672

2.2 pom.xml

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-amqp</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.boot</groupId>
  11. <artifactId>spring-boot-starter-test</artifactId>
  12. <scope>test</scope>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.projectlombok</groupId>
  16. <artifactId>lombok</artifactId>
  17. </dependency>

2.3  包目录结构如下

 2.4 把静态常量都声明在constant里面,方便调用,命名规则为类型xxxConstant,如ExchangeConstant,RoutingKeyConstant等;

constant.ExchangeConstant

  1. public class ExchangeConstant {
  2. /**
  3. * 直连交换机 direct-exchange
  4. */
  5. public static final String DirectExchange = "direct-exchange";
  6. /**
  7. * 死信交换机: dead-exchange
  8. */
  9. public static final String DeadExchange = "dead-exchange";
  10. }

constant.QueueConstant

  1. public class QueueConstant {
  2. /**
  3. * 死信队列 :QD
  4. */
  5. public static final String DeadQueue = "QD";
  6. /**
  7. * 普通队列 不设置其过期时间
  8. */
  9. public static final String CommonQueue = "common-queue";
  10. }

constant.RoutingKeyConstant

  1. public class RoutingKeyConstant {
  2. /**
  3. * 连接死信交换机的routingKey
  4. */
  5. public static final String DeadLetterRoutingKey = "YD";
  6. /**
  7. * 连接普通交换机的routingKey
  8. */
  9. public static final String CommonQueueRoutingKey = "common-queue-routing-key";
  10. }

RabbitmqConfig.java

  1. @Configuration
  2. public class RabbitmqConfig {
  3. /**
  4. * 声明direct交换机 DirectExchange
  5. * @return
  6. */
  7. @Bean
  8. public DirectExchange declareNormalDirectExchange(){
  9. return new DirectExchange(ExchangeConstant.DirectExchange,
  10. true,false,null);
  11. }
  12. /**
  13. * 声明direct 死信交换机 DeadExchange
  14. * @return
  15. */
  16. @Bean
  17. public DirectExchange declareDeadExchange(){
  18. return new DirectExchange(ExchangeConstant.DeadExchange,
  19. true,false,null);
  20. }
  21. /**
  22. * 声明死信队列QD
  23. * @return
  24. */
  25. @Bean
  26. public Queue declareQueueQD(){
  27. return QueueBuilder.durable(QueueConstant.DeadQueue)
  28. .deadLetterExchange(ExchangeConstant.DeadExchange)
  29. .deadLetterRoutingKey("YD")
  30. .build();
  31. }
  32. /**
  33. * 绑定死信交换机和其死信队列
  34. * @return
  35. */
  36. @Bean
  37. public Binding bindingDeadExchangeAndQueueD(){
  38. return BindingBuilder.bind(declareQueueQD())
  39. .to(declareDeadExchange())
  40. .with("YD");
  41. }
  42. /**
  43. * 声明普通队列
  44. * @return
  45. */
  46. @Bean
  47. public Queue declareCommonQueue(){
  48. return QueueBuilder.durable(QueueConstant.CommonQueue)
  49. .deadLetterExchange(ExchangeConstant.DeadExchange)
  50. .deadLetterRoutingKey(RoutingKeyConstant.DeadLetterRoutingKey)
  51. .build();
  52. }
  53. /**
  54. * 绑定交换机和普通队列
  55. * @return
  56. */
  57. @Bean
  58. public Binding commonQueueWithExchange(){
  59. return BindingBuilder.bind(declareCommonQueue())
  60. .to(declareNormalDirectExchange()).with(RoutingKeyConstant.CommonQueueRoutingKey);
  61. }
  62. }

 controller.ProducerController

注意: 以前我们如果声明一个队列消息过期时间为10秒,40秒,我们通常这么做,反正底层也是封装成参数”x-message-ttl";

还有一种,我们可能会声明一个map,放进这个队列声明里面,但是这两种,都是提前声明好的消息过期时间

 

 反而,我们下面这种写法,将我们让消息的生产者声明过期时间,也就是自定义声明过期时间

  1. @RestController
  2. @Slf4j
  3. @RequestMapping("/delay")
  4. public class ProducerController {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @PostMapping("/ttl/{time}/{message}")
  8. public void sendExpireTimeWithMessage(@PathVariable Integer time,@PathVariable String message){
  9. rabbitTemplate.convertAndSend(ExchangeConstant.DirectExchange, RoutingKeyConstant.CommonQueueRoutingKey,
  10. message, msg -> {
  11. int expiration = time * 1000;
  12. msg.getMessageProperties().setExpiration(String.valueOf(expiration));
  13. return msg;
  14. }
  15. );
  16. log.info("发送的时间为:{},延时时间为:{},消息内容为:{}", LocalDateTime.now(),time,message);
  17. }
  18. }

然后,我们消费者端 consumer.DeadLetterConsumer.java中,我们查看以下它的接收时间

  1. @Component
  2. @Slf4j
  3. public class DeadLetterConsumer {
  4. @RabbitListener(queues = QueueConstant.DeadQueue)
  5. public void receiveMessageWithTime(Message message) throws IOException {
  6. String msg = new String(message.getBody());
  7. log.info("接收到死信队列中的时间为:{},消息为:{}",
  8. LocalDateTime.now(),msg);
  9. }
  10. }

3. 问题显现

发送两条,先发送延迟为2s,在发送延迟为30s的,

结果如下,完全没有问题

 但是,如果先发30s,再发2s的延迟,则可见端倪,不妨见下

 

我们希望,即便我2秒延迟后发于30s延迟,但是我只是延迟2s发送,后发归后发,正常依旧应该在我发送完2s的这条请求之后,2s之后就能收到这条消息,但是并没有,它在30s之后跟这个刚才延时为30s的请求一起过来了!! 这大坑子 

4.问题解决

4.1 安装一个插件

说明:我的rabbitmq版本为3.10.2,且为brew方式安装,虽然位置不太一样,但是方式是一样的

网址:延迟队列插件github网址

注意:我们不是下载他的GitHub代码,而是下载它release下面的版本的后缀是 .ez的文件,没有错,就是这个!!!

放到安装到rabbitmq的plugins 目录下

brew安装的rabbitmq的plugin目录为

 在plugins的目录下,打开cmd,键入

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

注意,无关版本号,就是这句命令,如果提示文件找不到,多半是因为导入的插件有问题 ,执行如下为

我们要的就是那个delayed_message_exchange ,然后重启rabbitmq,

如果你也是用brew安装的,则为,

  1. brew services restart rabbitmq
  2. ##或者分步执行
  3. brew services stop rabbitmq
  4. brew services start rabbitmq

然后,在rabbitmq的页面,可以看到,

  

4.2 代码解决

前戏过多,现在我们用代码解决以下,

主要的实现在于,声明并且使用我们自定义的这个交换机类型,这个交换机会让我们的延迟信息在延迟之后再放入延迟队列,并且让消费者消费,注意,以前我们是让过期的队列中的信息放到死信队列,现在呢,我们不要声明两个队列了,只声明一个延迟队列,并且让消费者消费它

简单说,我们以前是作用在队列上的延迟队列,现在让他它作用在交换机上

执行流程图如下

代码如下:

静态常量

       

 DelayRabbitmqConfig.java: 我们新建一个配置类

  1. package com.lin.rabbitmq.config;
  2. import com.lin.rabbitmq.constant.ExchangeConstant;
  3. import com.lin.rabbitmq.constant.QueueConstant;
  4. import com.lin.rabbitmq.constant.RoutingKeyConstant;
  5. import org.springframework.amqp.core.*;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import java.awt.image.DirectColorModel;
  9. import java.util.HashMap;
  10. /**
  11. * @program: rabbitmq-springboot
  12. * @description: 基于插件的延迟队列
  13. * @author: linsz
  14. * @create: 2022-07-18 23:58
  15. **/
  16. @Configuration
  17. public class DelayRabbitmqConfig {
  18. /**
  19. * 声明延迟队列
  20. * @return
  21. */
  22. @Bean
  23. public Queue delayQueue(){
  24. return new Queue(QueueConstant.DelayQueue,
  25. true,false,false);
  26. }
  27. /**
  28. * 声明延迟自定义交换机类型
  29. * @return
  30. */
  31. @Bean
  32. public CustomExchange delayCustomExchange(){
  33. HashMap<String, Object> args = new HashMap<>();
  34. // 设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递
  35. args.put("x-delayed-type","direct");
  36. return new CustomExchange(ExchangeConstant.DelayCustomerExchange,
  37. "x-delayed-message",true,false,args);
  38. }
  39. /**
  40. * 绑定延迟交换机和队列
  41. * @return
  42. */
  43. @Bean
  44. public Binding delayQueueAndCustomExchange(){
  45. return BindingBuilder.bind(delayQueue())
  46. .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs();
  47. }
  48. }

新建DelayQueueBasedPluginsController.java

  1. package com.lin.rabbitmq.controller;
  2. import com.lin.rabbitmq.constant.ExchangeConstant;
  3. import com.lin.rabbitmq.constant.RoutingKeyConstant;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.AmqpException;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.core.MessagePostProcessor;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.web.bind.annotation.PathVariable;
  12. import org.springframework.web.bind.annotation.PostMapping;
  13. import org.springframework.web.bind.annotation.RequestMapping;
  14. import org.springframework.web.bind.annotation.RestController;
  15. import java.time.LocalDateTime;
  16. /**
  17. * @program: rabbitmq-springboot
  18. * @description: 基于插件的延迟队列
  19. * @author: linsz
  20. * @create: 2022-07-19 00:22
  21. **/
  22. @RestController
  23. @Slf4j
  24. @RequestMapping("/send")
  25. public class DelayQueueBasedPluginsController {
  26. @Autowired
  27. private RabbitTemplate rabbitTemplate;
  28. @PostMapping("/delay/{time}/{message}")
  29. public void delayQueueProducer(@PathVariable String message, @PathVariable String time){
  30. rabbitTemplate.convertAndSend(ExchangeConstant.DelayCustomerExchange,
  31. RoutingKeyConstant.DelayCustomerRoutingKey,
  32. message, msg -> {
  33. int delay = Integer.parseInt(time) * 1000;
  34. msg.getMessageProperties().setDelay(delay);
  35. return msg;
  36. });
  37. log.info("在时间:{},发送延迟时间为:{},延迟队列发送消息:{}", LocalDateTime.now(),time,message);
  38. }
  39. }

新建DelayConsumer.java

  1. package com.lin.rabbitmq.consumer;
  2. import com.lin.rabbitmq.constant.QueueConstant;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.time.LocalDateTime;
  8. /**
  9. * @program: rabbitmq-springboot
  10. * @description: 基于插件的延迟队列消费端
  11. * @author: linsz
  12. * @create: 2022-07-19 00:30
  13. **/
  14. @Component
  15. @Slf4j
  16. public class DelayConsumer {
  17. @RabbitListener(queues = QueueConstant.DelayQueue)
  18. public void delayConsumer(Message message){
  19. log.info("在时间为:{},消费端接收到一条消息为:{}", LocalDateTime.now(),new String(message.getBody()));
  20. }
  21. }

分别发送:

 

 

 结果如下:

 使用这种自定义交换机类型,原来的会不会受影响呢

我们先跑延迟为5s,在跑延迟为20s的请求

显然不会,所以说,该插件实现是增强了原来的延迟队列的功能,而不是抛弃,其实从其原理也能推断出它不会让以前的延迟短的先发或者是延迟长的先发有异常;

对于这些消息都先放到一个交换机,等到你延迟时间结束时,放到消息延迟队列,至于你是否先发还是后发间隔几毫秒,无所谓!!! 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/401006?site
推荐阅读
相关标签
  

闽ICP备14008679号