赞
踩
订单服务涉及许多方面,分布式事务,分布式锁,例如订单超时未支付要取消订单,订单如何防止重复提交,如何防止超卖、这里都会使用到。
public OrderConfirmVO confirmOrder(Long skuId) { Long memberId = SecurityUtils.getMemberId(); // 解决子线程无法获取HttpServletRequest请求对象中数据的问题,子线程指getOrderItemsFuture,getMemberAddressFuture,generateOrderTokenFuture //feign远程调用会被拦截提取attributes并转发 RequestAttributes attributes = RequestContextHolder.getRequestAttributes(); RequestContextHolder.setRequestAttributes(attributes, true); // 获取订单商品 //使用了 CompletableFuture 来实现异步执行获取订单项的操作 CompletableFuture<List<OrderItemDTO>> getOrderItemsFuture = CompletableFuture.supplyAsync( () -> this.getOrderItems(skuId, memberId), threadPoolExecutor) .exceptionally(ex -> { log.error("Failed to get order items: {}", ex.toString()); return null; }); // 用户收货地址 CompletableFuture<List<MemberAddressDTO>> getMemberAddressFuture = CompletableFuture.supplyAsync(() -> { Result<List<MemberAddressDTO>> getMemberAddressResult = memberFeignClient.listMemberAddresses(memberId); if (Result.isSuccess(getMemberAddressResult)) { return getMemberAddressResult.getData(); } return null; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to get addresses for memberId {} : {}", memberId, ex.toString()); return null; }); // 生成唯一令牌,防止重复提交(原理:提交会消耗令牌,令牌被消耗无法再次提交) CompletableFuture<String> generateOrderTokenFuture = CompletableFuture.supplyAsync(() -> { String orderToken = this.generateTradeNo(memberId); redisTemplate.opsForValue().set(OrderConstants.ORDER_TOKEN_PREFIX + orderToken, orderToken); return orderToken; }, threadPoolExecutor).exceptionally(ex -> { log.error("Failed to generate order token ."); return null; }); //CompletableFuture.allOf 方法,可以等待所有 CompletableFuture 对象都完成再进行后续操作, // 确保获取和设置属性的操作都能够成功执行。这样可以避免程序出现异常, CompletableFuture.allOf(getOrderItemsFuture, getMemberAddressFuture, generateOrderTokenFuture).join(); OrderConfirmVO orderConfirmVO = new OrderConfirmVO(); orderConfirmVO.setOrderItems(getOrderItemsFuture.join()); orderConfirmVO.setAddresses(getMemberAddressFuture.join()); orderConfirmVO.setOrderToken(generateOrderTokenFuture.join()); log.info("Order confirm response for skuId {}: {}", skuId, orderConfirmVO); return orderConfirmVO; }
通过生成唯一令牌解决,方法:
根据自定义方法generateTradeNo生成订单号,将时间戳+3位随机数+5位id(由会员id组成,不够5位补0,超过5位保留后5位)组成订单号
private String generateTradeNo(Long memberId) {
//当 memberId 的位数小于 5 位时,使用 0 来填充位数不足的部分,如果 memberId 已经是 5 位数或更长,则不进行填充
String userIdFilledZero = String.format("%05d", memberId);
//超出五位的保留后五位
String fiveDigitsUserId = userIdFilledZero.substring(userIdFilledZero.length() - 5);
// 在前面加上wxo(wx order)等前缀是为了人工可以快速分辨订单号是下单还是退款、来自哪家支付机构等
// 将时间戳+3位随机数+五位id组成商户订单号,规则参考自<a href="https://tech.meituan.com/2016/11/18/dianping-order-db-sharding.html">大众点评</a>
return System.currentTimeMillis() + RandomUtil.randomNumbers(3) + fiveDigitsUserId;
}
根据订单防重提交令牌缓存键前缀+订单号作为key存入redis,值为订单号。
订单提交的时候,会通过lua脚本验证redis是否含有该key,有则返回0,通过断言阻止重复提交,下面订单提交方法的代码含有该部分。
扩展:CompletableFuture 来实现异步执行获取订单项的操作,可以提高响应速度,减少阻塞时间,同时通过CompletableFuture可以更方便地处理异常,每个异步操作可以独立地捕获和处理异常,避免异常传递给上层调用者,提高了系统的健壮性和容错性。CompletableFuture对于处理大量并发请求的场景非常重要,可以提升系统的性能和用户体验。
涉及的自定义线程池(相关解释直接在代码注释上了)
/** * 自定义订单线程池 * */ @Configuration @Slf4j public class ThreadPoolConfig { @Bean public ThreadPoolExecutor threadPoolExecutor() { int cpuCoreSize = Runtime.getRuntime().availableProcessors();//使用 Java 获取可用的 CPU 核心数 log.info("当前CPU核心数:{}", cpuCoreSize); /* * 计算密集型: 核心线程数=CPU核心 +1 √ * I/O密集型: 核心线程数=2*CPU核心 +1 */ int corePoolSize = cpuCoreSize + 1; return new ThreadPoolExecutor( //核心线程数 corePoolSize, //最大线程数 2 * corePoolSize, //线程空闲(存活)时间;当线程数超过核心线程数,并且空闲时间超过指定时间时,多余的线程会被销毁 30, TimeUnit.SECONDS, //任务队列(选取数组阻塞队列)特点:固定容量,公平性,先进先出 new ArrayBlockingQueue<>(1000), //线程工厂 new NamedThreadFactory("order") // 订单线程 ); } }
@GlobalTransactional public String submitOrder(OrderSubmitForm submitForm) { log.info("订单提交参数:{}", JSONUtil.toJsonStr(submitForm)); String orderToken = submitForm.getOrderToken(); // 1. 判断订单是否重复提交(LUA脚本保证获取和删除的原子性,成功返回1,否则返回0) //KEYS[1]指OrderConstants.ORDER_TOKEN_PREFIX + orderToken,ARGV[1]指orderToken String lockAcquireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //这一行代码使用RedisTemplate的execute方法执行Lua脚本。通过new DefaultRedisScript<>创建一个Redis脚本对象,并指定脚本字符串和返回结果的类型(Long)。 // 然后使用Collections.singletonList来创建包含锁键名和订单令牌参数的列表,传递给execute方法。 //执行 Redis 脚本后,结果会赋值给 lockAcquired 变量,它的类型是 Long。 // lockAcquired这个值表示获取锁的结果,如果成功获取并删除了锁,则为 1;如果获取锁失败,则为 0。 Long lockAcquired = this.redisTemplate.execute( new DefaultRedisScript<>(lockAcquireScript, Long.class), Collections.singletonList(OrderConstants.ORDER_TOKEN_PREFIX + orderToken), orderToken ); Assert.isTrue(lockAcquired != null && lockAcquired.equals(1L), "订单重复提交,请刷新页面后重试"); // 2. 订单商品校验 (PS:校验进入订单确认页面到提交过程商品(价格、上架状态)变化) List<OrderSubmitForm.OrderItem> orderItems = submitForm.getOrderItems(); List<Long> skuIds = orderItems.stream() .map(OrderSubmitForm.OrderItem::getSkuId) .collect(Collectors.toList()); List<SkuInfoDTO> skuList = skuFeignClient.getSkuInfoList(skuIds); for (OrderSubmitForm.OrderItem item : orderItems) { SkuInfoDTO skuInfo = skuList.stream().filter(sku -> sku.getId().equals(item.getSkuId())) .findFirst() .orElse(null); Assert.isTrue(skuInfo != null, "商品({})已下架或删除"); //如果调用对象小于被比较对象,compareTo 方法返回一个负整数。 // 如果调用对象大于被比较对象,compareTo 方法返回一个正整数。 Assert.isTrue(item.getPrice().compareTo(skuInfo.getPrice()) == 0, "商品({})价格发生变动,请刷新页面", item.getSkuName()); } // 3. 校验库存并锁定库存 List<LockedSkuDTO> lockedSkuList = orderItems.stream() .map(item -> new LockedSkuDTO(item.getSkuId(), item.getQuantity(), item.getSkuSn())) .collect(Collectors.toList()); boolean lockStockResult = skuFeignClient.lockStock(orderToken, lockedSkuList); Assert.isTrue(lockStockResult, "订单提交失败:锁定商品库存失败!"); // 4. 生成订单 boolean result = this.saveOrder(submitForm); log.info("order ({}) create result:{}", orderToken, result); return orderToken; }
前提:此处涉及seata分布式事务和Redisson实现的分布式锁
KEYS[1] 指OrderConstants.ORDER_TOKEN_PREFIX + orderToken
ARGV[1] 指orderToken
判断是否含有这个锁key,有就删除这个锁并返回1L,没有就返回0
通过断言判断lockAcquired是否成功获取并删除了锁,不成功会报错
@Transactional public boolean lockStock(String orderToken, List<LockedSkuDTO> lockedSkuList) { Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkuList), "订单({})未包含任何商品", orderToken); // 校验库存数量是否足够以及锁定库存 for (LockedSkuDTO lockedSku : lockedSkuList) { Long skuId = lockedSku.getSkuId(); //商品分布式锁缓存键前缀:ProductConstants.SKU_LOCK_PREFIX //每次getLock都会返回一个独立的分布式锁对象,但它们共享一个锁资源。 RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + skuId); // 构建商品锁对象 try { //共享一个锁资源意味着多个分布式锁对象共同使用同一个锁来实现互斥访问。 //虽然每个分布式锁对象是独立创建的,但它们会使用相同的锁资源来进行加锁和释放锁的操作。 lock.lock();//锁定操作 Integer quantity = lockedSku.getQuantity(); // 订单的商品数量 // 库存足够 boolean lockResult = this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = locked_stock + " + quantity) // 修改锁定商品数 .eq(PmsSku::getId, lockedSku.getSkuId()) //通过 apply 方法添加动态 SQL 条件,确保 stock 减去 locked_stock 的值大于等于给定的 quantity。 //使用了占位符 {0} 来引用 quantity,0表示第一个传入的值。 .apply("stock - locked_stock >= {0}", quantity) // 剩余商品数 ≥ 订单商品数 ); Assert.isTrue(lockResult, "商品({})库存不足", lockedSku.getSkuSn()); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 锁定的商品缓存至 Redis (后续使用:1.取消订单解锁库存;2:支付订单扣减库存) redisTemplate.opsForValue().set(ProductConstants.LOCKED_SKUS_PREFIX + orderToken, lockedSkuList); return true; }
/** *解锁库存 *<P> *订单超时未支付,释放锁定的商品库存 */ public boolean unlockStock(String orderSn) { //锁定库存对象集合:lockedSkus List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("释放订单({})锁定的商品库存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); // 库存已释放 if (CollectionUtil.isEmpty(lockedSkus)) { return true; } // 遍历恢复锁定的商品库存 for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 获取商品分布式锁 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("locked_stock = locked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { //判断当前分布式锁是否已被锁定,通过这个判断可以防止非法释放锁等潜在问题 if (lock.isLocked()) { lock.unlock(); } } } // 移除 redis 订单锁定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
/** * 扣减库存 * <p> * 订单支付扣减商品库存和释放锁定库存 * * @param orderSn 订单编号 * @return ture/false */ public boolean deductStock(String orderSn) { // 获取订单提交时锁定的商品 List<LockedSkuDTO> lockedSkus = (List<LockedSkuDTO>) redisTemplate.opsForValue().get(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); log.info("订单({})支付成功,扣减订单商品库存:{}", orderSn, JSONUtil.toJsonStr(lockedSkus)); Assert.isTrue(CollectionUtil.isNotEmpty(lockedSkus), "扣减商品库存失败:订单({})未包含商品"); for (LockedSkuDTO lockedSku : lockedSkus) { RLock lock = redissonClient.getLock(ProductConstants.SKU_LOCK_PREFIX + lockedSku.getSkuId()); // 获取商品分布式锁 try { lock.lock(); this.update(new LambdaUpdateWrapper<PmsSku>() .setSql("stock = stock - " + lockedSku.getQuantity()) .setSql("locked_stock = locked_stock - " + lockedSku.getQuantity()) .eq(PmsSku::getId, lockedSku.getSkuId()) ); } finally { if (lock.isLocked()) { lock.unlock(); } } } // 移除订单锁定的商品 redisTemplate.delete(ProductConstants.LOCKED_SKUS_PREFIX + orderSn); return true; }
原理跟释放库存基本一致,更新语句发生变化
更新语句:将商品库存数进行扣减,扣减数量为订单商品数,然后锁定商品数也进行扣 减,扣减数量为订单商品数
通过saveOrder创建订单,返回result,打印日志。
返回订单号
/** * 创建订单 * * @param submitForm 订单提交表单对象 * @return */ private boolean saveOrder(OrderSubmitForm submitForm) { //创建订单详情表(OmsOrder)对象 OmsOrder order = orderConverter.form2Entity(submitForm); //设置待支付状态 order.setStatus(OrderStatusEnum.UNPAID.getValue()); //设置订单会员id order.setMemberId(SecurityUtils.getMemberId()); //设置订单来源(0代表PC订单,1代表APP订单) order.setSource(submitForm.getOrderSource().getValue()); //保存到数据库 boolean result = this.save(order); Long orderId = order.getId(); if (result) { // 保存订单明细 List<OmsOrderItem> orderItemEntities = orderItemConverter.item2Entity(submitForm.getOrderItems()); orderItemEntities.forEach(item -> item.setOrderId(orderId)); orderItemService.saveBatch(orderItemEntities); // 订单超时未支付取消 //这行代码使用 RabbitMQ 的 Java 客户端库来发送一条消息到 order.exchange 交换器, // 该消息会被路由到 order.close.delay 队列中。消息的内容是 submitForm.getOrderToken() 方法的返回结果。 rabbitTemplate.convertAndSend("order.exchange", "order.close.delay", submitForm.getOrderToken()); } return result; }
普通的保存到数据的操作就不做解释了,主要看订单超时未支付取消的功能。
订单超时未支付取消功能
通过rabbitMQ实现
@Component @Slf4j public class OrderRabbitConfig { // 普通延迟队列 private static final String ORDER_CLOSE_DELAY_QUEUE = "order.close.delay.queue"; private static final String ORDER_EXCHANGE = "order.exchange"; private static final String ORDER_CLOSE_DELAY_ROUTING_KEY = "order.close.delay"; // 死信关单队列 private static final String ORDER_ClOSE_QUEUE = "order.close.queue"; private static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange"; private static final String ORDER_ClOSE_ROUTING_KEY = "order.close"; /** * 定义交换机 */ @Bean public Exchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); } /** * 死信交换机 */ @Bean public Exchange orderDlxExchange() { return new DirectExchange(ORDER_DLX_EXCHANGE, true, false); } /** * 延时队列 */ @Bean public Queue orderDelayQueue() { // 延时队列的消息过期了,会自动触发消息的转发,根据routingKey发送到指定的exchange中,exchange路由到死信队列 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", ORDER_DLX_EXCHANGE); args.put("x-dead-letter-routing-key", ORDER_ClOSE_ROUTING_KEY); // 死信路由Key args.put("x-message-ttl", 10 * 1000L); // 单位毫秒,10s用于测试 return new Queue(ORDER_CLOSE_DELAY_QUEUE, true, false, false, args); } /** * 延时队列绑定交换机 */ @Bean public Binding orderDelayQueueBinding() { return new Binding(ORDER_CLOSE_DELAY_QUEUE, Binding.DestinationType.QUEUE, ORDER_EXCHANGE, ORDER_CLOSE_DELAY_ROUTING_KEY, null); } /** * 关单队列 */ @Bean public Queue orderCloseQueue() { log.info("死信队列(order.close.queue)创建"); return new Queue(ORDER_ClOSE_QUEUE, true, false, false); } /** * 关单队列绑定死信交换机 */ @Bean public Binding orderCloseQueueBinding() { return new Binding(ORDER_ClOSE_QUEUE, Binding.DestinationType.QUEUE, ORDER_DLX_EXCHANGE, ORDER_ClOSE_ROUTING_KEY, null); } }
/** * 订单超时未支付系统自动取消监听器 * */ @Component @RequiredArgsConstructor @Slf4j public class OrderCloseListener { private final OrderService orderService; private final RabbitTemplate rabbitTemplate; @RabbitListener(queues = "order.close.queue") public void closeOrder(String orderSn, Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); // 消息序号(消息队列中的位置) log.info("订单({})超时未支付,系统自动关闭订单", orderSn); try { boolean closeOrderResult = orderService.closeOrder(orderSn); log.info("关单结果:{}", closeOrderResult); if (closeOrderResult) { // 关单成功:释放库存 //发送订单号 rabbitTemplate.convertAndSend("stock.exchange", "stock.unlock", orderSn); } else { // 关单失败:订单已被关闭,手动ACK确认并从队列移除消息 channel.basicAck(deliveryTag, false); // false: 不批量确认,仅确认当前单个消息 } } catch (Exception e) { // 关单异常:拒绝消息并重新入队 try { channel.basicReject(deliveryTag, true); // true: 重新放回队列 // channel.basicReject(deliveryTag, false); // false: 直接丢弃消息 (TODO 定时任务补偿) } catch (IOException ex) { log.error("订单({})关闭失败,原因:{}", orderSn, ex.getMessage()); } } } }
监听死信关单队列,拿到消息序号deliveryTag(便于后续手动ACK和重新入队的操作)
进行关单操作orderService.closeOrder,主要修改订单详情表的状态从待支付改为已关闭。
public boolean closeOrder(String orderSn) {
return this.update(new LambdaUpdateWrapper<OmsOrder>()
.eq(OmsOrder::getOrderSn, orderSn)
.eq(OmsOrder::getStatus, OrderStatusEnum.UNPAID.getValue())
.set(OmsOrder::getStatus, OrderStatusEnum.CANCELED.getValue())
);
}
判断关单是否成功,成功就释放库存,发送订单号给释放库存的交换机,路由键为stock.unlock。
/** *库存释放监听器 */ @Component @Slf4j @RequiredArgsConstructor public class StockReleaseListener { private final SkuService skuService; private static final String STOCK_UNLOCK_QUEUE = "stock.unlock.queue"; private static final String STOCK_EXCHANGE = "stock.exchange"; private static final String STOCK_UNLOCK_ROUTING_KEY = "stock.unlock"; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = STOCK_UNLOCK_QUEUE,durable = "true"), exchange = @Exchange(value = STOCK_EXCHANGE), key = {STOCK_UNLOCK_ROUTING_KEY} ), ackMode = "MANUAL")//手动ACK @RabbitHandler public void UnlockStock(String orderSn, Message message, Channel channel){ log.info("订单{}取消释放库存",orderSn); long deliverTag=message.getMessageProperties().getDeliveryTag(); try{ skuService.unlockStock(orderSn); channel.basicAck(deliverTag,false); }catch (Exception e){ try { channel.basicAck(deliverTag,true); }catch (IOException ex){ log.error("订单{}关闭失败,原因:{}",orderSn,ex.getMessage()); } } } }
如果收到释放库存的消息,就会执行释放库存的方法unlockStock,上面锁定库存方法那节已解释。
@GlobalTransactional public <T> T payOrder(OrderPaymentForm paymentForm) { String orderSn = paymentForm.getOrderSn(); OmsOrder order = this.getOne(new LambdaQueryWrapper<OmsOrder>().eq(OmsOrder::getOrderSn, orderSn)); Assert.isTrue(order != null, "订单不存在"); Assert.isTrue(OrderStatusEnum.UNPAID.getValue().equals(order.getStatus()), "订单不可支付,请检查订单状态"); RLock lock = redissonClient.getLock(OrderConstants.ORDER_LOCK_PREFIX + order.getOrderSn()); try { lock.lock(); T result; switch (paymentForm.getPaymentMethod()) { case WX_JSAPI: result = (T) wxJsapiPay(paymentForm.getAppId(), order.getOrderSn(), order.getPaymentAmount()); break; default: result = (T) balancePay(order); break; } return result; } finally { //释放锁 if (lock.isLocked()) { lock.unlock(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。