当前位置:   article > 正文

RabbitMQ(2)springboot整合RabbitMQ=ttl延迟队列_springboot rabbitmq ttl

springboot rabbitmq ttl

延迟队列简介

        延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。

使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

        这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

        如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

环境搭建

一、添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter</artifactId>
  5. </dependency>
  6. <!--RabbitMQ 依赖-->
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-amqp</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-web</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.boot</groupId>
  17. <artifactId>spring-boot-starter-test</artifactId>
  18. <scope>test</scope>
  19. </dependency>
  20. <dependency>
  21. <groupId>com.alibaba</groupId>
  22. <artifactId>fastjson</artifactId>
  23. <version>1.2.47</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.projectlombok</groupId>
  27. <artifactId>lombok</artifactId>
  28. </dependency>
  29. <!--swagger-->
  30. <dependency>
  31. <groupId>io.springfox</groupId>
  32. <artifactId>springfox-swagger2</artifactId>
  33. <version>3.0.0</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>io.springfox</groupId>
  37. <artifactId>springfox-swagger-ui</artifactId>
  38. <version>3.0.0</version>
  39. </dependency>
  40. <!--RabbitMQ 测试依赖-->
  41. <dependency>
  42. <groupId>org.springframework.amqp</groupId>
  43. <artifactId>spring-rabbit-test</artifactId>
  44. <scope>test</scope>
  45. </dependency>
  46. </dependencies>

二、编写配置文件

  1. # 应用名称
  2. spring.application.name=springboot-rabbitmq
  3. server.port=8080
  4. spring.rabbitmq.host=127.0.0.1
  5. spring.rabbitmq.port=5672
  6. spring.rabbitmq.username=admin
  7. spring.rabbitmq.password=123
  8. #虚拟主机
  9. spring.rabbitmq.virtual-host=/

三、编写swagger配置类

  1. package com.zww.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import springfox.documentation.builders.ApiInfoBuilder;
  5. import springfox.documentation.service.ApiInfo;
  6. import springfox.documentation.service.Contact;
  7. import springfox.documentation.spi.DocumentationType;
  8. import springfox.documentation.spring.web.plugins.Docket;
  9. import springfox.documentation.swagger2.annotations.EnableSwagger2;
  10. @Configuration
  11. @EnableSwagger2
  12. public class SwaggerConfig {
  13. @Bean
  14. public Docket webApiConfig() {
  15. return new Docket(DocumentationType.SWAGGER_2)
  16. .groupName("webApi")
  17. .apiInfo(webApiInfo())
  18. .select()
  19. .build();
  20. }
  21. private ApiInfo webApiInfo() {
  22. return new ApiInfoBuilder()
  23. .title("rabbitmq 接口文档")
  24. .description("本文档描述了 rabbitmq 微服务接口定义")
  25. .version("1.0")
  26. .contact(new Contact("enjoy6288", "http://zww.com",
  27. "15588888880@qq.com"))
  28. .build();
  29. }
  30. }

实现延迟队列

        创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:

原先配置队列信息,写在了生产者和消费者代码中,现在可写咋配置类中,生产者只发消息,消费者只接受消息

一、编写延迟队列配置类,创建交换机和队列

  1. package com.zww.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. @Configuration
  9. public class TtlQueueConfig {
  10. public static final String X_EXCHANGE = "X";
  11. public static final String QUEUE_A = "QA";
  12. public static final String QUEUE_B = "QB";
  13. //死信交换机
  14. public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
  15. //死信队列
  16. public static final String DEAD_LETTER_QUEUE = "QD";
  17. // 声明 xExchange
  18. @Bean("xExchange")
  19. public DirectExchange xExchange() {
  20. return new DirectExchange(X_EXCHANGE);
  21. }
  22. // 声明 死信队列交换机
  23. @Bean("yExchange")
  24. public DirectExchange yExchange() {
  25. return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
  26. }
  27. //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
  28. @Bean("queueA")
  29. public Queue queueA() {
  30. Map<String, Object> args = new HashMap<>(3);
  31. //声明当前队列绑定的死信交换机
  32. args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
  33. //声明当前队列的死信路由 key
  34. args.put("x-dead-letter-routing-key", "YD");
  35. //声明队列的 TTL
  36. args.put("x-message-ttl", 10000);
  37. return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
  38. }
  39. // 声明队列 A 绑定 X 交换机
  40. @Bean
  41. public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
  42. @Qualifier("xExchange") DirectExchange xExchange) {
  43. return BindingBuilder.bind(queueA).to(xExchange).with("XA");
  44. }
  45. //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
  46. @Bean("queueB")
  47. public Queue queueB() {
  48. Map<String, Object> args = new HashMap<>(3);
  49. //声明当前队列绑定的死信交换机
  50. args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
  51. //声明当前队列的死信路由 key
  52. args.put("x-dead-letter-routing-key", "YD");
  53. //声明队列的 TTL
  54. args.put("x-message-ttl", 40000);
  55. return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
  56. }
  57. //声明队列 B 绑定 X 交换机
  58. @Bean
  59. public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
  60. @Qualifier("xExchange") DirectExchange xExchange) {
  61. return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
  62. }
  63. //声明死信队列 QD
  64. @Bean("queueD")
  65. public Queue queueD() {
  66. return new Queue(DEAD_LETTER_QUEUE);
  67. }
  68. //声明死信队列 QD 绑定关系
  69. @Bean
  70. public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
  71. @Qualifier("yExchange") DirectExchange yExchange) {
  72. return BindingBuilder.bind(queueD).to(yExchange).with("YD");
  73. }
  74. }

 二、消费者

  1. package com.zww.controller;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Controller;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import java.util.Date;
  10. //发送延迟消息
  11. @Slf4j
  12. @Controller
  13. @RequestMapping("/wwtt")
  14. public class SendMsgController {
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. //发消息
  18. @GetMapping("/senMsg/{message}")
  19. public void senMsg(@PathVariable String message){
  20. log.info("当前时间:{},发送一条信息给l两个ttl队列:{}",new Date().toString(),message);
  21. rabbitTemplate.convertAndSend("X","XA","消息来自wwtt为10s的队列:" + message);
  22. rabbitTemplate.convertAndSend("X","XB","消息来自wwtt为40s的队列:" + message);
  23. }
  24. }

三、消费者

  1. package com.zww.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.util.Date;
  8. //队列 TTL 消费者
  9. @Component
  10. @Slf4j
  11. public class DeadLetterQueueConsumer {
  12. @RabbitListener(queues = "QD")
  13. public void receiveD(Message message, Channel channel) throws Exception{
  14. String msg = new String(message.getBody());
  15. log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);
  16. }
  17. }

四、运行测试访问 http://localhost:8080/ttl/senMsg/aa

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

延迟队列优化

在这里新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间

 

一、在原有的延迟队列配置类中添加以下代码

  1. //新的普通队列QC
  2. public static final String QUEUE_QC = "QC";
  3. //声明QC
  4. @Bean("queueC")
  5. public Queue queueC(){
  6. Map<String, Object> arguments = new HashMap<>();
  7. //设置死信交换机
  8. arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
  9. //设置死信RoutingKey
  10. arguments.put("x-dead-letter-routing-key","YD");
  11. return QueueBuilder.durable(QUEUE_QC).withArguments(arguments).build();
  12. }
  13. @Bean
  14. public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange")DirectExchange xExchange){
  15. return BindingBuilder.bind(queueC).to(xExchange).with("XC");
  16. }

 二、生产者添加代码

  1. //开始发消息 消息TTL
  2. @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
  3. public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
  4. log.info("当前时间:{},发送一条时长{}毫秒TTL信息队列QC:{}",new Date().toString(),ttlTime,message);
  5. rabbitTemplate.convertAndSend("X","XC",message,msg ->{
  6. //发送消息的延迟时长
  7. msg.getMessageProperties().setExpiration(ttlTime);
  8. return msg;
  9. });
  10. }

三、运行测试

发起请求
http://localhost:8080/ttl/sendExpirationMsg/aa/20000
http://localhost:8080/ttl/sendExpirationMsg/bb/2000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。这也就是为什么第二个延时2秒,却后执行。

插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。

安装延时队列插件

可去官网下载 Community Plugins — RabbitMQ 插件,放置到 RabbitMQ 的插件目录。

进入 RabbitMQ 的安装目录下的 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins目录,执行下面命令让该插件生效,然后重启 RabbitMQ

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange #安装
  2. systemctl restart rabbitmq-server #重启rabbitmq服务

 

代码实现

在这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下: 

1、配置文件类代码:

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

  1. package com.zww.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.CustomExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.beans.factory.annotation.Qualifier;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. @Configuration
  12. public class DelayedQueueConfig {
  13. //队列
  14. public static final String DELAYED_QUEUE_NAME = "delayed.queue";
  15. //交换机
  16. public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
  17. //RoutingKey
  18. public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
  19. //声明队列
  20. @Bean
  21. public Queue delayedQueue(){
  22. return new Queue(DELAYED_QUEUE_NAME);
  23. }
  24. //声明交换机
  25. @Bean
  26. public CustomExchange delayedExchange(){
  27. Map<String, Object> arguments = new HashMap<>();
  28. arguments.put("x-delayed-type","direct");//定义直接类型
  29. /*
  30. * 自定义交换机
  31. * 1.交换机名称
  32. * 2.交换机类型
  33. * 3.是否需要持久化
  34. * 4.是否需要自动删除
  35. * 5.其他参数
  36. * */
  37. return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
  38. }
  39. //绑定
  40. @Bean
  41. public Binding delayedQueueBindingDelayedExchange(
  42. @Qualifier("delayedQueue") Queue delayedQueue,
  43. @Qualifier("delayedExchange") CustomExchange delayedExchange
  44. ) {
  45. return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
  46. }
  47. }

2.生产者添加以下代码

  1. //基于插件发送的延迟时间的队列
  2. @GetMapping("/sendDelayMsg/{message}/{delayTime}")
  3. public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
  4. log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date(),delayTime,message);
  5. rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg ->{
  6. //发送消息的时候 延迟时长
  7. msg.getMessageProperties().setDelay(delayTime);
  8. return msg;
  9. });
  10. }

3.消费者

  1. package com.zww.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.util.Date;
  8. //队列 TTL 消费者
  9. @Component
  10. @Slf4j
  11. public class DeadLetterQueueConsumer {
  12. @RabbitListener(queues = "QD")
  13. public void receiveD(Message message, Channel channel) throws Exception{
  14. String msg = new String(message.getBody());
  15. log.info("当前时间:{},收到死信队列的消息:{}",new Date(),msg);
  16. }
  17. }

4.运行测试

1.访问 http://localhost:8080/ttl/sendDelayMsg/aa/20000
2.访问 http://localhost:8080/ttl/sendDelayMsg/bb/20000

第二个消息被先消费掉了,符合预期

总结

        延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

        当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/572405
推荐阅读
相关标签
  

闽ICP备14008679号