赞
踩
RabbitMQ库存解锁的场景有很多,以下是一些常见的场景:
订单取消和订单回滚。下订单成功,订单过期没有支付被系统自动取消、被用户手动取消。都要解锁库存。
下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚;之前锁定的库存就要自动解锁。
订单系统需要通知库存系统,如果想要解锁库存,可以通过stock.locked路由键发送一个消息给交换机stock-event-exchange,消息内容包括哪个订单、哪些商品、多少库存
如图:
spring: rabbitmq: host: 127.0.0.1 port: 5672 # 虚拟主机 virtual-host: /kmall # 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】 publisher-confirm-type: correlated # 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】 publisher-returns: true # 消息在没有被队列接收时是否强行退回 template: mandatory: true # 消费者手动确认模式,关闭自动确认,否则会消息丢失 listener: simple: acknowledge-mode: manual
// 开启rabbit
@EnableRabbit
// 开启feign
@EnableFeignClients(basePackages = "com.koo.modules.ware.feign")
// 开启服务注册功能
@EnableDiscoveryClient
@SpringBootApplication
public class WareApplication {
public static void main(String[] args) {
SpringApplication.run(WareApplication.class, args);
}
}
@Configuration public class MyRabbitConfig { @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { // TODO 封装RabbitTemplate RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(rabbitTemplate); return rabbitTemplate; } @Bean public MessageConverter messageConverter() { // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式 return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback * <p> * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) */ //@PostConstruct // (MyRabbitConfig对象创建完成以后,执行这个方法) public void initRabbitTemplate(RabbitTemplate rabbitTemplate) { /** * 发送消息触发confirmCallback回调 * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null) * @param ack:消息是否成功收到(ack=true,消息抵达Broker) * @param cause:失败的原因 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("发送消息触发confirmCallback回调" + "\ncorrelationData ===> " + correlationData + "\nack ===> " + ack + "" + "\ncause ===> " + cause); System.out.println("================================================="); }); /** * 消息未到达队列触发returnCallback回调 * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message:投递失败的消息详细信息 * @param replyCode:回复的状态码 * @param replyText:回复的文本内容 * @param exchange:接收消息的交换机 * @param routingKey:接收消息的路由键 */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 需要修改数据库 消息的状态【后期定期重发消息】 System.out.println("消息未到达队列触发returnCallback回调" + "\nmessage ===> " + message + "\nreplyCode ===> " + replyCode + "\nreplyText ===> " + replyText + "\nexchange ===> " + exchange + "\nroutingKey ===> " + routingKey); System.out.println("=================================================="); }); } }
@Configuration public class MyRabbitMQConfig { /** * 交换机 * Topic,可以绑定多个队列 */ @Bean public Exchange stockEventExchange() { //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments return new TopicExchange("stock-event-exchange", true, false); } /** * 死信队列 * 释放库存 */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments return new Queue("stock.release.stock.queue", true, false, false); } /** * 延时队列 * 锁定库存 */ @Bean public Queue stockDelay() { HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 1.5分钟 arguments.put("x-message-ttl", 90000); return new Queue("stock.delay.queue", true, false, false, arguments); } /** * 绑定:交换机与死信队列 * 释放库存 */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map<String, Object> arguments return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); } /** * 绑定:交换机与延时队列 * 锁定库存 */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } }
CREATE TABLE `kmall-ware`.`wms_ware_sku` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`sku_id` bigint(20) NULL DEFAULT NULL COMMENT 'sku_id',
`ware_id` bigint(20) NULL DEFAULT NULL COMMENT '仓库id',
`stock` int(11) NULL DEFAULT NULL COMMENT '库存数',
`sku_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'sku_name',
`stock_locked` int(11) NULL DEFAULT 0 COMMENT '锁定库存',
PRIMARY KEY (`id`) USING BTREE,
INDEX `sku_id`(`sku_id`) USING BTREE,
INDEX `ware_id`(`ware_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 12 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '商品库存' ROW_FORMAT = Dynamic;
@Data @TableName("wms_ware_sku") public class WareSkuEntity implements Serializable { private static final long serialVersionUID = 1L; /** * id */ @TableId private Long id; /** * sku_id */ private Long skuId; /** * 仓库id */ private Long wareId; /** * 库存数 */ private Integer stock; /** * sku_name */ private String skuName; /** * 锁定库存 */ private Integer stockLocked; }
CREATE TABLE `kmall-ware`.`wms_ware_order_task` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id', `order_id` bigint(20) NULL DEFAULT NULL COMMENT 'order_id', `order_sn` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'order_sn', `consignee` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '收货人', `consignee_tel` char(15) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '收货人电话', `delivery_address` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '配送地址', `order_comment` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '订单备注', `payment_way` tinyint(1) NULL DEFAULT NULL COMMENT '付款方式【 1:在线付款 2:货到付款】', `task_status` tinyint(2) NULL DEFAULT NULL COMMENT '任务状态', `order_body` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '订单描述', `tracking_no` char(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '物流单号', `create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT 'create_time', `ware_id` bigint(20) NULL DEFAULT NULL COMMENT '仓库id', `task_comment` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '工作单备注', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 35 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '库存工作单' ROW_FORMAT = Dynamic;
CREATE TABLE `kmall-ware`.`wms_ware_order_task_detail` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`sku_id` bigint(20) NULL DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 'sku_name',
`sku_num` int(11) NULL DEFAULT NULL COMMENT '购买个数',
`task_id` bigint(20) NULL DEFAULT NULL COMMENT '工作单id',
`ware_id` bigint(20) NULL DEFAULT NULL COMMENT '仓库id',
`lock_status` int(1) NULL DEFAULT NULL COMMENT '1-已锁定 2-已解锁 3-扣减',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 50 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '库存工作单' ROW_FORMAT = Dynamic;
实体类:
@Data @TableName("wms_ware_order_task") public class WareOrderTaskEntity implements Serializable { private static final long serialVersionUID = 1L; /** * id */ @TableId private Long id; /** * order_id */ private Long orderId; /** * order_sn */ private String orderSn; /** * 收货人 */ private String consignee; /** * 收货人电话 */ private String consigneeTel; /** * 配送地址 */ private String deliveryAddress; /** * 订单备注 */ private String orderComment; /** * 付款方式【 1:在线付款 2:货到付款】 */ private Integer paymentWay; /** * 任务状态 */ private Integer taskStatus; /** * 订单描述 */ private String orderBody; /** * 物流单号 */ private String trackingNo; /** * create_time */ private Date createTime; /** * 仓库id */ private Long wareId; /** * 工作单备注 */ private String taskComment; }
@NoArgsConstructor @AllArgsConstructor @Data @TableName("wms_ware_order_task_detail") public class WareOrderTaskDetailEntity implements Serializable { private static final long serialVersionUID = 1L; /** * id */ @TableId private Long id; /** * sku_id */ private Long skuId; /** * sku_name */ private String skuName; /** * 购买个数 */ private Integer skuNum; /** * 工作单id */ private Long taskId; /** * 仓库id */ private Long wareId; /** * 1-已锁定 2-已解锁 3-扣减 */ private Integer lockStatus; }
@Data public class OrderItemVO { private Long skuId; private Boolean check; private String title; private String image; /** * 商品套餐属性 */ private List<String> skuAttrValues; private BigDecimal price; private Integer count; private BigDecimal totalPrice; /** 商品重量 **/ private BigDecimal weight = new BigDecimal("0.085"); }
@Data
public class WareSkuLockTO {
private String orderSn;
/** 需要锁住的所有库存信息 **/
private List<OrderItemVO> locks;
}
@PostMapping(value = "/lock/order")
public R orderLockStock(@RequestBody WareSkuLockTO lockTO) {
try {
wareSkuService.orderLockStock(lockTO);
return R.ok();
} catch (NoStockException e) {
return R.error(NO_STOCK_EXCEPTION.getCode(), NO_STOCK_EXCEPTION.getMsg());
}
}
库存锁定,sql执行锁定锁定
@Transactional @Override public Boolean orderLockStock(WareSkuLockTO lockTO) { // 按照收货地址找到就近仓库,锁定库存(暂未实现) // 采用方案:获取每项商品在哪些仓库有库存,轮询尝试锁定,任一商品锁定失败回滚 // 1.往库存工作单存储当前锁定(本地事务表) WareOrderTaskEntity taskEntity = new WareOrderTaskEntity(); taskEntity.setOrderSn(lockTO.getOrderSn()); orderTaskService.save(taskEntity); // 2.封装待锁定库存项Map Map<Long, OrderItemVO> lockItemMap = lockTO.getLocks().stream().collect(Collectors.toMap(key -> key.getSkuId(), val -> val)); // 3.查询(库存 - 库存锁定 >= 待锁定库存数)的仓库 List<WareSkuEntity> wareSkuEntities = baseMapper.selectListHasSkuStock(lockItemMap.keySet()); List<WareSkuEntity> wareList = wareSkuEntities.stream().filter(entity -> (entity.getStock() - entity.getStockLocked()) >= lockItemMap.get(entity.getSkuId()).getCount()).collect(Collectors.toList()); // 4.判断是否查询到仓库 if (CollectionUtils.isEmpty(wareList)) { // 匹配失败,所有商品项没有库存 Set<Long> skuIds = lockItemMap.keySet(); throw new NoStockException(skuIds); } // 5.将查询出的仓库数据封装成Map,key:skuId val:wareId Map<Long, List<WareSkuEntity>> wareMap = wareList.stream().collect(Collectors.groupingBy(key -> key.getSkuId())); // 6.判断是否为每一个商品项至少匹配了一个仓库 List<WareOrderTaskDetailEntity> taskDetails = new ArrayList<>();// 库存锁定工作单详情 Map<Long, StockLockedTO> lockedMessageMap = new HashMap<>();// 库存锁定工作单消息 if (wareMap.size() < lockTO.getLocks().size()) { // 匹配失败,部分商品没有库存 Set<Long> skuIds = lockItemMap.keySet(); skuIds.removeAll(wareMap.keySet());// 求商品项差集 throw new NoStockException(skuIds); } else { // 所有商品都存在有库存的仓库 // 7.锁定库存 for (Map.Entry<Long, List<WareSkuEntity>> entry : wareMap.entrySet()) { Boolean skuStocked = false; Long skuId = entry.getKey();// 商品 OrderItemVO item = lockItemMap.get(skuId); Integer count = item.getCount();// 待锁定个数 List<WareSkuEntity> hasStockWares = entry.getValue();// 有足够库存的仓库 for (WareSkuEntity ware : hasStockWares) { Long num = baseMapper.lockSkuStock(skuId, ware.getWareId(), count); if (num == 1) { // 8. 锁定成功,跳出循环 skuStocked = true; // 创建库存锁定工作单详情(每一件商品锁定详情) WareOrderTaskDetailEntity taskDetail = new WareOrderTaskDetailEntity(null, skuId, item.getTitle(), count, taskEntity.getId(), ware.getWareId(), WareOrderTaskConstant.LockStatusEnum.LOCKED.getCode()); taskDetails.add(taskDetail); //9。 创建库存锁定工作单消息(每一件商品一条消息) StockDetailTO detailMessage = new StockDetailTO(); BeanUtils.copyProperties(taskDetail, detailMessage); StockLockedTO lockedMessage = new StockLockedTO(taskEntity.getId(), detailMessage); lockedMessageMap.put(skuId, lockedMessage); break; } } if (!skuStocked) { // 匹配失败,当前商品所有仓库都未锁定成功 throw new NoStockException(skuId); } } } // 10.往库存工作单详情存储当前锁定(本地事务表) orderTaskDetailService.saveBatch(taskDetails); // 11.发送消息 for (WareOrderTaskDetailEntity taskDetail : taskDetails) { StockLockedTO message = lockedMessageMap.get(taskDetail.getSkuId()); message.getDetail().setId(taskDetail.getId());// 存储库存详情ID rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", message); } return true; }
/** * 解锁库存,监听死信队列 * * @author: charlin **/ @Slf4j @RabbitListener(queues = "stock.release.stock.queue") @Component public class StockReleaseListener { @Autowired private WareSkuService wareSkuService; /** * 库存解锁(监听死信队列) * 场景: * 1.下订单成功【需要解锁】(订单过期未支付、被用户手动取消、其他业务调用失败(订单回滚)) * 2.下订单失败【无需解锁】(库存锁定失败(库存锁定已回滚,但消息已发出)) * <p> * 注意:需要开启手动确认,不要删除消息,当前解锁失败需要重复解锁 */ @RabbitHandler public void handleStockLockedRelease(StockLockedTO locked, Message message, Channel channel) throws IOException { log.debug("库存解锁,库存工作单详情ID:" + locked.getDetail().getId()); //当前消息是否重新派发过来 // Boolean redelivered = message.getMessageProperties().getRedelivered(); try { // 解锁库存 wareSkuService.unLockStock(locked); // 解锁成功,手动确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 解锁失败,消息入队 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } /** * 客户取消订单,监听到消息 */ @RabbitHandler public void handleOrderCloseRelease(OrderTO orderTo, Message message, Channel channel) throws IOException { log.debug("订单关闭准备解锁库存,订单号:" + orderTo.getOrderSn()); try { wareSkuService.unLockStock(orderTo); // 手动删除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 解锁失败 将消息重新放回队列,让别人消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
/** * 库存解锁 */ @Override public void unLockStock(StockLockedTO locked) throws Exception { StockDetailTO taskDetailTO = locked.getDetail();// 库存工作单详情TO WareOrderTaskDetailEntity taskDetail = orderTaskDetailService.getById(taskDetailTO.getId());// 库存工作单详情Entity if (taskDetail != null) { // 1.工作单未回滚,需要解锁 WareOrderTaskEntity task = orderTaskService.getById(locked.getId());// 库存工作单Entity R r = orderFeignService.getOrderByOrderSn(task.getOrderSn());// 订单Entity if (r.getCode() == 0) { // 订单数据返回成功 OrderTO order = r.getData(new TypeReference<OrderTO>() { }); if (order == null || OrderConstant.OrderStatusEnum.CANCLED.getCode().equals(order.getStatus())) { // 2.订单已回滚 || 订单未回滚已取消状态 if (WareOrderTaskConstant.LockStatusEnum.LOCKED.getCode().equals(taskDetail.getLockStatus())) { // 订单已锁定状态,需要解锁(消息确认) unLockStock(taskDetailTO.getSkuId(), taskDetailTO.getWareId(), taskDetailTO.getSkuNum(), taskDetailTO.getId()); } else { // 订单其他状态,不可解锁(消息确认) } } } else { // 订单远程调用失败(消息重新入队) throw new Exception(); } } else { // 3.无库存锁定工作单记录,已回滚,无需解锁(消息确认) } } /** * 库存解锁 * 订单解锁触发,防止库存解锁消息优先于订单解锁消息到期,导致库存无法解锁 */ @Transactional @Override public void unLockStock(OrderTO order) { String orderSn = order.getOrderSn();// 订单号 // 1.根据订单号查询库存锁定工作单 WareOrderTaskEntity task = orderTaskService.getOrderTaskByOrderSn(orderSn); // 2.按照工作单查询未解锁的库存,进行解锁 List<WareOrderTaskDetailEntity> taskDetails = orderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>() .eq("task_id", task.getId()) .eq("lock_status", WareOrderTaskConstant.LockStatusEnum.LOCKED.getCode()));// 并发问题 // 3.解锁库存 for (WareOrderTaskDetailEntity taskDetail : taskDetails) { unLockStock(taskDetail.getSkuId(), taskDetail.getWareId(), taskDetail.getSkuNum(), taskDetail.getId()); } } /** * 库存解锁 * 1.sql执行释放锁定 * 2.更新库存工作单状态为已解锁 * * @param skuId * @param wareId * @param count */ public void unLockStock(Long skuId, Long wareId, Integer count, Long taskDetailId) { // 1.库存解锁 baseMapper.unLockStock(skuId, wareId, count); // 2.更新工作单的状态 已解锁 WareOrderTaskDetailEntity taskDetail = new WareOrderTaskDetailEntity(); taskDetail.setId(taskDetailId); taskDetail.setLockStatus(WareOrderTaskConstant.LockStatusEnum.UNLOCKED.getCode()); orderTaskDetailService.updateById(taskDetail); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。