赞
踩
1.传统处理超时订单
采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
然后再做其他的业务操作
2.rabbitMQ延时队列方案
一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失
rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换器(DLX)和设置过期时间(TTL)结合起来实现延迟队列
1.TTL
TTL是Time To Live的缩写, 也就是生存时间。
RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
如果两种方式一起使用消息对TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。
设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。
设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。
2.DLX和死信队列
DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。
死信队列是指队列(正常)上的消息(过期)变成死信后,能够后发送到另外一个交换机(DLX),然后被路由到一个队列上,
这个队列,就是死信队列
成为死信一般有以下几种情况:
消息被拒绝(basic.reject or basic.nack)且带requeue=false参数
消息的TTL-存活时间已经过期
队列长度限制被超越(队列满)
注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,
注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明
x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键
3. 延迟队列
通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费
4. 开发步骤
1.生产者创建一个正常消息,并添加消息过期时间/死信交换机/死信路由键这3个参数
QueueDelayConfig
package com.lyl.rabbitmqprovider.rabbitmq; import org.omg.CORBA.PUBLIC_MEMBER; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @authorlyl * @site * @company * @create 2019-12-25 15:26 */ @Configuration public class QueueDelayConfig { /** * 定义正常的队列、交换机、路由键 */ public static final String NORMAL_QUEUE="normal-queue"; public static final String NORMAL_EXCHANGE="normal-exchange"; public static final String NORMAL_ROUTINGKEY="normal_routingkey"; /** * 定义死信的队列、交换机、路由键 */ public static final String DELAY_QUEUE="delay-queue"; public static final String DELAY_EXCHANGE="delay-exchange"; public static final String DELAY_ROUTINGKEY="delay-routingkey"; /** * 定义正常队列 * @return */ @Bean public Queue normalQueue(){ // 设置消息过期时间/死信交换机/死信路由键这3个参数 Map<String,Object> map = new HashMap<String,Object>(); map.put("x-message-ttl", 15000);//message在该队列queue的存活时间最大为10秒 map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX) map.put("x-dead-letter-routing-key", DELAY_ROUTINGKEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键 return new Queue(NORMAL_QUEUE, true, false, false, map); } @Bean public DirectExchange normalExchange(){ return new DirectExchange(NORMAL_EXCHANGE,true,false); } @Bean public Binding normalRoutingKey(){ return BindingBuilder.bind(normalQueue()) .to(normalExchange()) .with(NORMAL_ROUTINGKEY); } @Bean public Queue delayQueue(){ return new Queue(DELAY_QUEUE,true); } @Bean public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE); } @Bean public Binding delayRoutingKey(){ return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with(DELAY_ROUTINGKEY); } }
2.消费者A
正常情况下,由消费者A去消费队列“normal-queue”中的消息,但实际上没有,而是等消息过期
3.消费者B
消息过期后,变成死信,根据配置会被投递到DLX,然后根据死信路由键投到死信队列(即延时队列)中
QueueRecevier
package com.lyl.rabbitmqconsumer.controller; import com.lyl.commonvo.vo.OrderVo; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @authorlyl * @site * @company * @create 2019-12-25 16:53 */ @Component @Slf4j @RabbitListener(queues = {"delay-queue"}) public class QueueRecevier { @RabbitHandler public void handlerMessage(OrderVo orderVo){ log.info("QueueRecevier.handlerMessage,data={}",orderVo); } }
5. 子模块间共享Model
1.创建公共子模块common
添加公共的JavaBean对象,并使用lombok简化代码
@Data:会为类的所有属性自动生成setter/getter、equals、canEqual、hashCode、toString方法
@NoArgsConstructor:无参构造器
@AllArgsConstructor:全参构造器
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.lyl</groupId> <artifactId>commonvo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>commonvo</name> <description>Demo project for Spring Boot</description> <packaging>jar</packaging> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.主模块
<!-- 添加子模块 -->
<modules>
<module>rabbitmq-provider</module>
<module>rabbitmq-consumer</module>
<module>common</module>
</modules>
3.各子模块
<!-- 1.packaging模式改为jar -->
<packaging>jar</packaging>
4.配置公共common模块
在主模块的POM的中添加公共子模块common
<dependencies>
<!--添加子模块common-->
<dependency>
<groupId>com.lyl</groupId>
<artifactId>common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
...
</dependencies>
6. json转换
1.生产者
SendController
package com.lyl.rabbitmqprovider.controller; import com.lyl.commonvo.vo.OrderVo; import com.lyl.rabbitmqprovider.rabbitmq.QueueDelayConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; /** * @authorlyl * @site * @company * @create 2019-12-25 16:12 */ @RestController @Slf4j public class SendController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/sender") public Map<String,Object> sender(){ // Map<String,Object> data = this.createData(); OrderVo orderVo = new OrderVo(); orderVo.setOrderId(1); orderVo.setOrderNo("P001"); rabbitTemplate.convertAndSend(QueueDelayConfig.NORMAL_EXCHANGE, QueueDelayConfig.NORMAL_ROUTINGKEY,orderVo); Map<String,Object> result=new HashMap<String,Object>(); result.put("msg","ok"); result.put("code","1"); return result; } private Map<String,Object> createData(){ Map<String,Object> map = new HashMap<String,Object>(); String date=LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); map.put("msg","hello,rabbitmq"); map.put("success",true); map.put("createdate",date); return map; } }
QueueProviderMessageConvert
package com.lyl.rabbitmqprovider.rabbitmq; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @authorlyl * @site * @company * @create 2019-12-26 16:28 */ @Configuration public class QueueProviderMessageConvert { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);//指定json转换器 return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
2.消费者
QueueRecevierMessageConvert
package com.lyl.rabbitmqconsumer.rabbitmq; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @authorlyl * @site * @company * @create 2019-12-26 16:42 */ @Configuration public class QueueRecevierMessageConvert { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jackson2JsonMessageConverter); return factory; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。