当前位置:   article > 正文

RabbitMQ--延时队列_rabbitmq延迟队列

rabbitmq延迟队列

一、概念

        延时队列就是死信队列中的TTL过期的一种。延时队列内部是有序的,延时队列中的元素是希望在指定时间到了以后或之前取出和处理。延时队列就是用来存放需要在指定时间被处理的元素的队列。

        应用场景:订单到时未支付自动取消、用户注册成功一定时间未登录提醒等。

       

        基于SpringBoot的延时队列可能会出现消息不按时过期的情况。RabbitMQ在检查消息时,只会检查第一条消息是否过期,如果过期则丢到死信队列,如果第一条消息的延时很长,第二条消息的延时很短,第二个消息也不会优先处理。解决办法用RabbitMQ插件

        

二、基于SpringBoot+注解的方式

1、代码架构图

        创建两个队列QA和QB,两者队列TTL分别设置为10s和40s,然后创建一个交换机X和一个死信交换机Y,他们类型都是direct,创建一个死信队列QD。                       

2、基于SpringBoot。依赖、配置文件。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  1. # rabiitmq 配置
  2. spring:
  3. rabbitmq:
  4. host: 192.168.10.109
  5. port: 5672
  6. username: admin
  7. password: 123

3、配置类代码

  1. //信道A
  2. public static final String CHANNEL_A="CA";
  3. //信道B
  4. public static final String CHANNEL_B="CB";
  5. //信道死信
  6. public static final String CHANNEL_DIE="CDIE";
  7. //自定义时间
  8. public static final String CHANNEL_SELF="CSELF";
  9. //声明普通交换机
  10. @Bean("exchangeX")
  11. public DirectExchange exchangeX(){
  12. return new DirectExchange(NORMAL_EXCHANGE);
  13. }
  14. //声明死信交换机
  15. @Bean("exchangeY")
  16. public DirectExchange exchangeY(){
  17. return new DirectExchange(DIE_EXCHANGE);
  18. }
  19. //声明普通队列
  20. @Bean("queueA")
  21. public Queue queueA(){
  22. Map<String, Object> arguments =new HashMap<>();
  23. return QueueBuilder.durable(QUEUE_A)
  24. // //死信交换机
  25. .deadLetterExchange(DIE_EXCHANGE)
  26. // //死信routingkey
  27. .deadLetterRoutingKey(CHANNEL_DIE)
  28. // //设置ttl时间
  29. .ttl(2000)
  30. .build();
  31. }
  32. //声明普通队列
  33. @Bean("queueB")
  34. public Queue queueB(){
  35. return QueueBuilder.durable(QUEUE_B)
  36. //死信交换机
  37. .deadLetterExchange(DIE_EXCHANGE)
  38. //死信routingkey
  39. .deadLetterRoutingKey(CHANNEL_DIE)
  40. //设置ttl时间
  41. .ttl(5000)
  42. .build();
  43. }
  44. //声明死信队列
  45. @Bean("queueDie")
  46. public Queue queueDie(){
  47. return QueueBuilder.durable(QUEUE_DIE).build();
  48. }
  49. //声明自定义时间
  50. @Bean("queueSelf")
  51. public Queue queueSelf(){
  52. return QueueBuilder.durable(QUEUE_SELF)
  53. //死信交换机
  54. .deadLetterExchange(DIE_EXCHANGE)
  55. //死信routingkey
  56. .deadLetterRoutingKey(CHANNEL_DIE)
  57. .build();
  58. }
  59. //绑定A
  60. @Bean
  61. public Binding queueABindingX(@Qualifier("queueA")Queue queueA,
  62. @Qualifier("exchangeX")DirectExchange exchangeX){
  63. return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_A);
  64. }
  65. //绑定B
  66. @Bean
  67. public Binding queueBBindingX(@Qualifier("queueB")Queue queueA,
  68. @Qualifier("exchangeX")DirectExchange exchangeX){
  69. return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_B);
  70. }
  71. //绑定self
  72. @Bean
  73. public Binding queueSelfBindingX(@Qualifier("queueSelf")Queue queueA,
  74. @Qualifier("exchangeX")DirectExchange exchangeX){
  75. return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_SELF);
  76. }
  77. //绑定DIE
  78. @Bean
  79. public Binding queueDieBindingX(@Qualifier("queueDie")Queue queueA,
  80. @Qualifier("exchangeY")DirectExchange exchangeX){
  81. return BindingBuilder.bind(queueA).to(exchangeX).with(CHANNEL_DIE);
  82. }
  83. }

4、生产者

  1. @RestController
  2. @RequestMapping("/product")
  3. public class ProductController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @GetMapping("/send")
  7. public String sendMsg(@RequestParam(value = "msg",required = true) String msg,
  8. @RequestParam(value = "time",required = false)Integer time){
  9. // 信道A
  10. System.out.println("信道A发送消息:"+msg);
  11. rabbitTemplate.convertAndSend(TtlConfig.NORMAL_EXCHANGE,TtlConfig.CHANNEL_A,"A的消息"+msg);
  12. //信道B
  13. System.out.println("信道B发送消息:"+msg);
  14. rabbitTemplate.convertAndSend(TtlConfig.NORMAL_EXCHANGE,TtlConfig.CHANNEL_B,"B的消息"+msg);
  15. if(null!=time && !"".equals(time)){
  16. //信道C
  17. System.out.println("信道自定义发送消息:"+msg+",时间:"+time+"秒");
  18. rabbitTemplate.convertAndSend(TtlConfig.NORMAL_EXCHANGE,TtlConfig.CHANNEL_SELF,"自定义的消息"+msg,message -> {
  19. //延时
  20. int i = time * 1000;
  21. message.getMessageProperties().setExpiration(String.valueOf(i));
  22. return message;
  23. });
  24. }
  25. return "发送成功";
  26. }
  27. }

5、接收者

  1. @Component
  2. public class TtlConsumer {
  3. //接收消息
  4. @RabbitListener(queues = TtlConfig.QUEUE_DIE)
  5. public void getMsg(Message msg, Channel channel){
  6. String body = new String(msg.getBody());
  7. System.out.println(body);
  8. }
  9. }

三、基于插件的RabbitMQ案例       

1、安装

        Community Plugins — RabbitMQ下载rabbitmq_delayed_message_exchange。

        放到/usr/lib/rabbitmq/lib/rabbitmq_server-3.11.3/plugins/目录中,

        执行命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

        重启服务        

2、代码架构图

        

 3、配置类

        

  1. //基于插件的延时队列
  2. @Configuration
  3. public class DelayConfig {
  4. //交换机
  5. public static final String DELAY_EXCHANGE="delayexchange";
  6. //队列
  7. public static final String DELAY_QUEUE="delayqueue";
  8. //routingkey
  9. public static final String DELAY_KEY="delaykey";
  10. //交换机
  11. @Bean
  12. public CustomExchange delayExchange(){
  13. Map<String, Object> arguments=new HashMap<>();
  14. arguments.put("x-delayed-type","direct");
  15. //1、交换机名称。2、交换机类型。3、是否持久化。4、是否需要自动删除。5、其他参数。
  16. return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
  17. }
  18. //队列
  19. @Bean
  20. public Queue delayQueue(){
  21. return new Queue(DELAY_QUEUE);
  22. }
  23. //绑定
  24. @Bean
  25. public Binding delayBinding(@Qualifier("delayQueue")Queue queue,
  26. @Qualifier("delayExchange") CustomExchange exchange){
  27. return BindingBuilder.bind(queue).to(exchange).with(DELAY_KEY).noargs();
  28. }
  29. }

4、生产者

  1. @GetMapping("/delayMsg")
  2. public String delayMsg(@RequestParam(value = "msg") String msg,
  3. @RequestParam(value = "time",required = false)Integer time){
  4. //信道C
  5. System.out.println("延时队列插件版消息:"+msg+",时间:"+time+"秒");
  6. rabbitTemplate.convertAndSend(DelayConfig.DELAY_EXCHANGE,DelayConfig.DELAY_KEY,"延时队列插件版消息:"+msg, message -> {
  7. //延时
  8. int i = time * 1000;
  9. message.getMessageProperties().setDelay(i);
  10. return message;
  11. });
  12. return "发送成功";
  13. }

5、接收消息

  1. @Component
  2. public class DelayConsumer {
  3. //接收消息
  4. @RabbitListener(queues = DelayConfig.DELAY_QUEUE)
  5. public void getDelayMsg(Message msg, Channel channel){
  6. String body = new String(msg.getBody());
  7. System.out.println(body);
  8. }
  9. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/578966
推荐阅读
相关标签
  

闽ICP备14008679号