赞
踩
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
注意RabbitMQ并没有延时队列慨念,其实是通过死信实现
应用场景:订单30分钟未支付取消…
创建springboot Maven项目,其pom文件内容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hong</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq</name> <description>springboot整合Rabbitmq</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--RabbitMQ 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> </dependency> <!--swagger--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>3.0.0</version> </dependency> <!--RabbitMQ 测试依赖--> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.4.2</version> </plugin> </plugins> </build> </project>
spring.rabbitmq.host=10.211.55.4
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
package com.hong.springboot.rabbitmq.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; /** * @Description: Swagger配置类 * @Author: hong * @Date: 2024-01-24 10:38 * @Version: 1.0 **/ @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo() { return new ApiInfoBuilder() .title("rabbitmq接口文档") .description("本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .contact(new Contact("JAVA小生不才", "https://blog.csdn.net/qq_41596346?spm=1011.2423.3001.5343", "hst1406959716@163.com")) .build(); } }
package com.hong.springboot.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @Description: TTL队列配置类 * @Author: hong * @Date: 2024-01-24 09:57 * @Version: 1.0 **/ @Configuration public class TTLQueeueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QUEUE_A"; public static final String QUEUE_B = "QUEUE_B"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; // 声明 xExchange @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } // 声明 死信队列交换机 @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明队列 A ttl 为 10s 并绑定到对应的死信交换机 @Bean("queueA") public Queue queueA() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // 声明队列 A 绑定 X 交换机 @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //声明队列 B ttl 为 40s 并绑定到对应的死信交换机 @Bean("queueB") public Queue queueB() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } //声明死信队列 QD @Bean("queueD") public Queue queueD() { return new Queue(DEAD_LETTER_QUEUE); } //声明队列 B 绑定 X 交换机 @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } //声明死信队列 QD 绑定关系 @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
package com.hong.springboot.rabbitmq.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * @Description: 消息生产者 * @Author: hong * @Date: 2024-01-28 10:15 * @Version: 1.0 **/ @Slf4j @RequestMapping("ttl") @RestController public class SendMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的QA队列: " + message); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为10S的QB队列: " + message); } }
package com.hong.springboot.rabbitmq.consumer; import com.hong.springboot.rabbitmq.config.TTLQueeueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; /** * @Description: 死信消费者 * @Author: hong * @Date: 2024-01-28 10:22 * @Version: 1.0 **/ @Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg); } }
启动项目,发起http://localhost:8080/ttl/sendMsg/JAVA%E5%B0%8F%E7%94%9F%E4%B8%8D%E6%89%8D请求
QA和QB的ttl都写死啦,若是再来1个时间不同的咋处理
代码中添加如下代码
//QC 不设置ttl 消息自带ttl public static final String QUEUE_C = "QC"; //声明队列 C 死信交换机 @Bean("queueC") public Queue queueC() { Map<String, Object> args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //没有声明 TTL 属性 return QueueBuilder.durable(QUEUE_C).withArguments(args).build(); } //声明队列C绑定 X 交换机 @Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }
添加如下方法
/**
* 延时队列优化
* @param message 消息
* @param ttlTime 延时的毫秒
*/
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列C:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , ttlTime, message);
}
同时发多个消息如http://localhost:8080/ttl/sendExpirationMsg/消息中间件RABBITMQ1/20000和http://localhost:8080/ttl/sendExpirationMsg/消息中间件RABBITMQ2/2000,按理说应该是消息中间件RABBITMQ2的先收到,但实际结果并非如此
这种在消息属性上设置TTL,消息可能并不会按时消亡,因为RabbitMQ只会检查第一个消息是否过期,若过期则丢给死信队列;若第一个消息的延时时间很长且第二个消息的延时时间很短,第二个消息并不会优先执行
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。