赞
踩
项目中有一个简单的场景,涉及的用户量非常少,其实也不存在并发,都是客服管理在操作,而客服管理就那么几个人。客服需要在系统下单投放广告到广告屏。但是如果30分钟未确认投放则超时关闭。
订单30分钟未支付,系统自动超时关闭有哪些实现方案?
下文将基于MQ的延迟队列实现来介绍:
当我们在下单的时候,往MQ投递一个消息设置有效期为30分钟,但该消息失效的时候(没有被消费的情况下),执行我们客户端一个方法告诉我们该消息已经失效,这时候查询这笔订单是否有支付。
(消息过期投递到死信队列)
首先创建一个类DeadLetterMQConfig
(类名可随意),在类上加上@Component
注解
@Value("${mayikt.order.exchange}")
private String orderExchange;
@Value("${mayikt.order.queue}")
private String orderQueue;
@Value("${mayikt.order.routingKey}")
private String orderRoutingKey;
@Value("${mayikt.dlx.queue}")
private String dlxQueue;
@Value("${mayikt.dlx.queue}")
private String dlxQueue;
@Value("${mayikt.dlx.routingKey}")
private String dlxRoutingKey;
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(orderExchange);
}
@Bean
public Queue orderQueue() {
Map<String, Object> arguments = new HashMap<>(2);
// 绑定我们的死信交换机
arguments.put("x-dead-letter-exchange", dlxExchange);
// 绑定我们的路由key
arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
return new Queue(orderQueue, true, false, false, arguments);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingKey);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey);
}
@Component public class DeadLetterMQConfig { /** * 订单交换机 */ @Value("${mayikt.order.exchange}") private String orderExchange; /** * 订单队列 */ @Value("${mayikt.order.queue}") private String orderQueue; /** * 订单路由key */ @Value("${mayikt.order.routingKey}") private String orderRoutingKey; /** * 死信交换机 */ @Value("${mayikt.dlx.exchange}") private String dlxExchange; /** * 死信队列 */ @Value("${mayikt.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${mayikt.dlx.routingKey}") private String dlxRoutingKey; /** * 声明死信交换机 */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 声明死信队列 */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 声明订单业务交换机 */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 声明订单队列 核心操作一 */ @Bean public Queue orderQueue() { Map<String, Object> arguments = new HashMap<>(2); // 绑定我们的死信交换机 arguments.put("x-dead-letter-exchange", dlxExchange); // 绑定我们的路由key arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 绑定订单队列到订单交换机 */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingKey); } /** * 绑定死信队列到死信交换机 */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey); } }
server.port=8082 spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver #开启驼峰命名 譬如数据库create_time 自动映射pojo属性createTime mybatis.configuration.map-underscore-to-camel-case=true #配置virtual-host虚拟主机 spring.rabbitmq.virtual-host=/zhang_rabbit #ip地址 spring.rabbitmq.host=127.0.0.1 #用户名 密码 spring.rabbitmq.username=zhang spring.rabbitmq.password=zhang #连接端口号 spring.rabbitmq.port=5672 #spring.rabbitmq.publisher-confirm-type= #模拟演示死信队列 mayikt.dlx.exchange=mayikt_order_dlx_exchange mayikt.dlx.queue=mayikt_order_dlx_queue mayikt.dlx.routingKey=dlx ##备胎交换机 mayikt.order.exchange=mayikt_order_exchange mayikt.order.queue=mayikt_order_queue mayikt.order.routingKey=mayikt.order
MySQL数据库配置:
server.port=8082
: 指定内嵌Tomcat服务器的端口。spring.datasource.url
: 定义连接到MySQL数据库的JDBC URL。spring.datasource.username
和 spring.datasource.password
: 提供MySQL数据库连接的用户名和密码。spring.datasource.driver-class-name
: 指定MySQL的JDBC驱动程序类。MyBatis配置:
mybatis.configuration.map-underscore-to-camel-case=true
: 配置MyBatis将数据库列名中的下划线映射为Java对象属性的驼峰命名方式。RabbitMQ配置:
spring.rabbitmq.virtual-host
: 设置RabbitMQ的虚拟主机。spring.rabbitmq.host=127.0.0.1
: 指定RabbitMQ服务器的IP地址。spring.rabbitmq.username
和 spring.rabbitmq.password
: 设置RabbitMQ连接的用户名和密码。spring.rabbitmq.port=5672
: 指定RabbitMQ连接的端口号。死信队列和备胎交换机配置:
mayikt.dlx.exchange
, mayikt.dlx.queue
, mayikt.dlx.routingKey
: 配置死信队列的交换机、队列和路由键。mayikt.order.exchange
, mayikt.order.queue
, mayikt.order.routingKey
: 配置备胎交换机的交换机、队列和路由键。暂时设置消息10秒过期,验证消息是否加入死信队列
@RestController public class OrderController { @Autowired private OrderMapper orderMapper; @Autowired private RabbitTemplate rabbitTemplate; @Value("${mayikt.order.exchange}") private String orderExchange; //订单交换机 @Value("${mayikt.order.routingKey}") private String orderRoutingKey; //订单路由key @GetMapping("/addOrder") public String addOrder(){ String orderId=System.currentTimeMillis()+""; OrderEntity orderEntity=new OrderEntity("订单30分钟过期",orderId,0); //订单入库 int result= orderMapper.addOrder(orderEntity); if(result<=0){ return "fail"; } //rabbit投递消息 rabbitTemplate.convertAndSend(orderExchange,orderRoutingKey,orderId,messagePostProcessor()); return "success"; } //处理待发送消息 private MessagePostProcessor messagePostProcessor(){ return new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //设置有效期30分钟 //message.getMessageProperties().setExpiration("1800000"); message.getMessageProperties().setExpiration("10000"); return message; } }; } }
@Component //死信队列 public class OrderDlxConsumer { @Autowired private OrderMapper orderMapper; /** * 监听我们的死信队列 */ @RabbitListener(queues = "mayikt_order_dlx_queue") public void orderConsumer(String orderId) { System.out.println("死信队列获取消息:" + orderId); if (StringUtils.isEmpty(orderId)) { return; } //根据id查询 OrderEntity orderEntity = orderMapper.getOrder(orderId); if (null == orderEntity) { return; } //获取状态 Integer orderStatus=orderEntity.getOrderStatus(); //判断未支付 , 关闭订单 if(0==orderStatus){ orderMapper.updateStatus(orderId,2); //库存返还 } } }
http://localhost:8082/addOrder.订单入库成功.状态0未支付
10秒后,死信消费者处理 状态0未支付 变为2 已关闭
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。