赞
踩
前言
前面我的博客介绍了有关分布式锁,分布式事务相关的问题以及解决方案,但是还是不能解决并发下单,扣减的问题,并发的时候由于数据库的隔离级别/乐观锁/悲观锁…总是会出现一些问题。最近集成了一套方案解决此类型问题,并可以适用于一般情况的秒杀方案。欢迎拍砖…
情景分析
前提条件:
商品 P 库存数量为 100
用户A,用户B 同时分别想买P,并且A,B分别买一个
数据库有版本号控制乐观锁
期望结果:
用户A,B都买到商品P,并且商品P的库存变为98
- 分析:
- 1.以前碰到这类问题的时候,我们说,可以添加分布式锁(也就是悲观锁),将商品P的id锁住,然后A,B提交订
- 单的时候分别预扣商品P的库存就好了(如方案一)。
- 2.是的,1分析的有道理,我们的核心思路确实要将并行控制为串行,但是事与愿违。我们仔细想一想:
- 如果,A,B想买的不仅仅P一个商品,而是P,Q两个商品呢?我们该怎么锁?一次性将P,Q都锁住?显然不是
- 很现实,并且实际上提交订单的时候都是从购物车下单,一个购物车里包括多个商品去下单也是很常见的,
- 并且还有如下逻辑大家仔细思考:
-
- 用户A下单->用户A读到库存为100
- 用户A预扣库存,判断剩余库存是否>=0,100-1=99>0,则A预扣
- 用户A的下单流程....
- 此时A的事务没有提交,B下单了:
- 用户B下单->用户B读到库存为100(这里读到100,是因为A还有其他逻辑在执行,还未提交事务,B读未提交了!)
- 用户B预扣库存,判断剩余库存是否>=0,100-1=99>0,则B预扣
- 用户B的下单流程....
-
- 最后不论A/B谁先提交事务,后面提交事务的就不会扣减库存成功。因为版本号不一致(就算没有乐观锁,
- 修改的结果也会错,而且错的更离谱)。最终的结局就是库存是99
-
- 3.解决方案
- 目前控制库存的方案有很多种,我这边介绍通过redis预减库存,通过mq发送消息同步的扣减数据库库存的
- 方案。
方案一
-
- @Transactional
- public void createOrder(...){
- //1.校验,扣减库存
- check(Item item);
-
- //2.创建订单
-
- //3.创建支付单
-
- }
-
- @RedisLock("pId")
- public void check(Item item){
- }
解决方案伪代码
-
- //当然,我们管理平台新建商品时需要初始化到redis库存里,
- //这里暂时就不介绍了
-
-
- //下单部分
- @Transactional
- public void createOrder(...){
- //1.校验,扣减库存
- check(Item item);
-
- //2.创建订单
-
- //3.创建支付单
-
- //4.redis扣减库存
-
- }
-
- //支付回调部分
- @Transactional
- public void wxCall(...){
- //1.校验订单状态
-
- //2.修改订单,支付单状态
-
- //3.mq发送全局顺序消息 扣减库存
-
- }
-
-
- //取消支付部分
- @Transactional
- public void cancelOrder(...){
- //1.校验订单状态
-
- //2.修改订单,支付单状态
-
- //3.redis回退库存
-
- }
-
-
- //退货/退款部分
- @Transactional
- public void returnOrder(...){
- //1.校验订单状态
-
- //2.修改订单,支付单状态
-
- //3.redis回退库存
-
- //4.mq发送全局顺序消息
-
- }
代码部分
实现思路
初始化库存回调函数(IStockCallback )
- /**
- * 获取库存回调
- * create by liuliang
- * on 2019-11-13 10:45
- */
- public interface IStockCallback {
- /**
- * 获取库存
- * @return
- */
- String getStock();
- }
-
- /**
- *
- * Redis分布式锁
- * 使用 SET resource-name anystring NX EX max-lock-time 实现
- * <p>
- * 该方案在 Redis 官方 SET 命令页有详细介绍。
- * http://doc.redisfans.com/string/set.html
- * <p>
- * 在介绍该分布式锁设计之前,我们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性,
- * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中:
- * <p>
- * EX seconds — 以秒为单位设置 key 的过期时间;
- * PX milliseconds — 以毫秒为单位设置 key 的过期时间;
- * NX — 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
- * XX — 将key 的值设为value ,当且仅当key 存在,等效于 SETEX。
- * <p>
- * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
- * <p>
- * 客户端执行以上的命令:
- * <p>
- * 如果服务器返回 OK ,那么这个客户端获得锁。
- * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
- *
- *
- * create by liuliang
- * on 2019-11-13 10:49
- */
- public class RedisStockLock {
-
- private static Logger logger = LoggerFactory.getLogger(RedisStockLock.class);
-
- private RedisTemplate<String, Object> redisTemplate;
-
- /**
- * 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。
- */
- public static final String NX = "NX";
-
- /**
- * seconds — 以秒为单位设置 key 的过期时间,等效于EXPIRE key seconds
- */
- public static final String EX = "EX";
-
- /**
- * 调用set后的返回值
- */
- public static final String OK = "OK";
-
- /**
- * 默认请求锁的超时时间(ms 毫秒)
- */
- private static final long TIME_OUT = 100;
-
- /**
- * 默认锁的有效时间(s)
- */
- public static final int EXPIRE = 60;
-
- /**
- * 解锁的lua脚本
- */
- public static final String UNLOCK_LUA;
-
- static {
- StringBuilder sb = new StringBuilder();
- sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
- sb.append("then ");
- sb.append(" return redis.call(\"del\",KEYS[1]) ");
- sb.append("else ");
- sb.append(" return 0 ");
- sb.append("end ");
- UNLOCK_LUA = sb.toString();
- }
-
- /**
- * 锁标志对应的key
- */
- private String lockKey;
-
- /**
- * 记录到日志的锁标志对应的key
- */
- private String lockKeyLog = "";
-
- /**
- * 锁对应的值
- */
- private String lockValue;
-
- /**
- * 锁的有效时间(s)
- */
- private int expireTime = EXPIRE;
-
- /**
- * 请求锁的超时时间(ms)
- */
- private long timeOut = TIME_OUT;
-
- /**
- * 锁标记
- */
- private volatile boolean locked = false;
-
- final Random random = new Random();
-
- /**
- * 使用默认的锁过期时间和请求锁的超时时间
- *
- * @param redisTemplate
- * @param lockKey 锁的key(Redis的Key)
- */
- public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {
- this.redisTemplate = redisTemplate;
- this.lockKey = lockKey + "_lock";
- }
-
- /**
- * 使用默认的请求锁的超时时间,指定锁的过期时间
- *
- * @param redisTemplate
- * @param lockKey 锁的key(Redis的Key)
- * @param expireTime 锁的过期时间(单位:秒)
- */
- public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) {
- this(redisTemplate, lockKey);
- this.expireTime = expireTime;
- }
-
- /**
- * 使用默认的锁的过期时间,指定请求锁的超时时间
- *
- * @param redisTemplate
- * @param lockKey 锁的key(Redis的Key)
- * @param timeOut 请求锁的超时时间(单位:毫秒)
- */
- public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) {
- this(redisTemplate, lockKey);
- this.timeOut = timeOut;
- }
-
- /**
- * 锁的过期时间和请求锁的超时时间都是用指定的值
- *
- * @param redisTemplate
- * @param lockKey 锁的key(Redis的Key)
- * @param expireTime 锁的过期时间(单位:秒)
- * @param timeOut 请求锁的超时时间(单位:毫秒)
- */
- public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) {
- this(redisTemplate, lockKey, expireTime);
- this.timeOut = timeOut;
- }
-
- /**
- * 尝试获取锁 超时返回
- *
- * @return
- */
- public boolean tryLock() {
- // 生成随机key
- lockValue = UUID.randomUUID().toString();
- // 请求锁超时时间,纳秒
- long timeout = timeOut * 1000000;
- // 系统当前时间,纳秒
- long nowTime = System.nanoTime();
- while ((System.nanoTime() - nowTime) < timeout) {
- if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
- locked = true;
- // 上锁成功结束请求
- return locked;
- }
-
- // 每次请求等待一段时间
- seleep(10, 50000);
- }
- return locked;
- }
-
- /**
- * 尝试获取锁 立即返回
- *
- * @return 是否成功获得锁
- */
- public boolean lock() {
- lockValue = UUID.randomUUID().toString();
- //不存在则添加 且设置过期时间(单位ms)
- String result = set(lockKey, lockValue, expireTime);
- locked = OK.equalsIgnoreCase(result);
- return locked;
- }
-
- /**
- * 以阻塞方式的获取锁
- *
- * @return 是否成功获得锁
- */
- public boolean lockBlock() {
- lockValue = UUID.randomUUID().toString();
- while (true) {
- //不存在则添加 且设置过期时间(单位ms)
- String result = set(lockKey, lockValue, expireTime);
- if (OK.equalsIgnoreCase(result)) {
- locked = true;
- return locked;
- }
-
- // 每次请求等待一段时间
- seleep(10, 50000);
- }
- }
-
- /**
- * 解锁
- * <p>
- * 可以通过以下修改,让这个锁实现更健壮:
- * <p>
- * 不使用固定的字符串作为键的值,而是设置一个不可猜测(non-guessable)的长随机字符串,作为口令串(token)。
- * 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。
- * 这两个改动可以防止持有过期锁的客户端误删现有锁的情况出现。
- */
- public Boolean unlock() {
- // 只有加锁成功并且锁还有效才去释放锁
- // 只有加锁成功并且锁还有效才去释放锁
- if (locked) {
- return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() {
- @Override
- public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
- Object nativeConnection = connection.getNativeConnection();
- Long result = 0L;
-
- List<String> keys = new ArrayList<>();
- keys.add(lockKey);
- List<String> values = new ArrayList<>();
- values.add(lockValue);
-
- // 集群模式
- if (nativeConnection instanceof JedisCluster) {
- result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
- }
-
- // 单机模式
- if (nativeConnection instanceof Jedis) {
- result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
- }
-
- if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {
- logger.info("Redis分布式锁,解锁{}失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis());
- }
-
- locked = result == 0;
- return result == 1;
- }
- });
- }
-
- return true;
- }
-
- /**
- * 获取锁状态
- * @Title: isLock
- * @Description: TODO
- * @return
- * @author yuhao.wang
- */
- public boolean isLock() {
-
- return locked;
- }
-
- /**
- * 重写redisTemplate的set方法
- * <p>
- * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。
- * <p>
- * 客户端执行以上的命令:
- * <p>
- * 如果服务器返回 OK ,那么这个客户端获得锁。
- * 如果服务器返回 NIL ,那么客户端获取锁失败,可以在稍后再重试。
- *
- * @param key 锁的Key
- * @param value 锁里面的值
- * @param seconds 过去时间(秒)
- * @return
- */
- private String set(final String key, final String value, final long seconds) {
- Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空");
- return (String) redisTemplate.execute(new RedisCallback<String>() {
- @Override
- public String doInRedis(RedisConnection connection) throws DataAccessException {
- Object nativeConnection = connection.getNativeConnection();
- String result = null;
- if (nativeConnection instanceof JedisCommands) {
- result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
- }
-
- if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
- logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis());
- }
-
- return result;
- }
- });
- }
-
- /**
- * @param millis 毫秒
- * @param nanos 纳秒
- * @Title: seleep
- * @Description: 线程等待时间
- * @author yuhao.wang
- */
- private void seleep(long millis, int nanos) {
- try {
- Thread.sleep(millis, random.nextInt(nanos));
- } catch (InterruptedException e) {
- logger.info("获取分布式锁休眠被中断:", e);
- }
- }
-
- public String getLockKeyLog() {
- return lockKeyLog;
- }
-
- public void setLockKeyLog(String lockKeyLog) {
- this.lockKeyLog = lockKeyLog;
- }
-
- public int getExpireTime() {
- return expireTime;
- }
-
- public void setExpireTime(int expireTime) {
- this.expireTime = expireTime;
- }
-
- public long getTimeOut() {
- return timeOut;
- }
-
- public void setTimeOut(long timeOut) {
- this.timeOut = timeOut;
- }
-
-
- }
-
- /**
- * 扣库存
- * create by liuliang
- * on 2019-11-13 10:46
- */
- @Service
- public class StockComponent {
-
- Logger logger = LoggerFactory.getLogger(StockComponent.class);
-
- /**
- * 不限库存
- */
- public static final long UNINITIALIZED_STOCK = -3L;
-
- /**
- * Redis 客户端
- */
- @Autowired
- private RedisTemplate<String, Object> redisTemplate;
-
- /**
- * 执行扣库存的脚本
- */
- public static final String STOCK_LUA;
-
-
-
-
- static {
- /**
- *
- * @desc 扣减库存Lua脚本
- * 库存(stock)-1:表示不限库存
- * 库存(stock)0:表示没有库存
- * 库存(stock)大于0:表示剩余库存
- *
- * @params 库存key
- * @return
- * -3:库存未初始化
- * -2:库存不足
- * -1:不限库存
- * 大于等于0:剩余库存(扣减之后剩余的库存)
- * redis缓存的库存(value)是-1表示不限库存,直接返回-1
- */
- StringBuilder sb = new StringBuilder();
- sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
- sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));");
- sb.append(" local num = tonumber(ARGV[1]);");
- sb.append(" if (stock == -1) then");
- sb.append(" return -1;");
- sb.append(" end;");
- sb.append(" if (stock >= num) then");
- sb.append(" return redis.call('incrby', KEYS[1], 0 - num);");
- sb.append(" end;");
- sb.append(" return -2;");
- sb.append("end;");
- sb.append("return -3;");
- STOCK_LUA = sb.toString();
- }
-
- /**
- * @param key 库存key
- * @param expire 库存有效时间,单位秒
- * @param num 扣减数量
- * @param stockCallback 初始化库存回调函数
- * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
- */
- public long stock(String key, long expire, int num, IStockCallback stockCallback) {
- long stock = stock(key, num);
- // 初始化库存
- if (stock == UNINITIALIZED_STOCK) {
- RedisStockLock redisLock = new RedisStockLock(redisTemplate, key);
- try {
- // 获取锁
- if (redisLock.tryLock()) {
- // 双重验证,避免并发时重复回源到数据库
- stock = stock(key, num);
- if (stock == UNINITIALIZED_STOCK) {
- // 获取初始化库存
- final String initStock = stockCallback.getStock();
- // 将库存设置到redis
- redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
- // 调一次扣库存的操作
- stock = stock(key, num);
- }
- }
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- } finally {
- redisLock.unlock();
- }
-
- }
- return stock;
- }
-
- /**
- * 加库存(还原库存)
- *
- * @param key 库存key
- * @param num 库存数量
- * @return
- */
- public long addStock(String key, int num) {
-
- return addStock(key, null, num);
- }
-
- /**
- * 加库存
- *
- * @param key 库存key
- * @param expire 过期时间(秒)
- * @param num 库存数量
- * @return
- */
- public long addStock(String key, Long expire, int num) {
- boolean hasKey = redisTemplate.hasKey(key);
- // 判断key是否存在,存在就直接更新
- if (hasKey) {
- return redisTemplate.opsForValue().increment(key, num);
- }
-
- Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
- RedisStockLock redisLock = new RedisStockLock(redisTemplate, key);
- try {
- if (redisLock.tryLock()) {
- // 获取到锁后再次判断一下是否有key
- hasKey = redisTemplate.hasKey(key);
- if (!hasKey) {
- // 初始化库存
- redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
- }
- }
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- } finally {
- redisLock.unlock();
- }
-
- return num;
- }
-
- /**
- * 获取库存
- *
- * @param key 库存key
- * @return -1:不限库存; 大于等于0:剩余库存
- */
- public int getStock(String key) {
- Integer stock = (Integer) redisTemplate.opsForValue().get(key);
- return stock == null ? -1 : stock;
- }
-
- /**
- * 扣库存
- *
- * @param key 库存key
- * @param num 扣减库存数量
- * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
- */
- private Long stock(String key, int num) {
- // 脚本里的KEYS参数
- List<String> keys = new ArrayList<>();
- keys.add(key);
- // 脚本里的ARGV参数
- List<String> args = new ArrayList<>();
- args.add(Integer.toString(num));
-
- long result = redisTemplate.execute(new RedisCallback<Long>() {
- @Override
- public Long doInRedis(RedisConnection connection) throws DataAccessException {
- Object nativeConnection = connection.getNativeConnection();
- // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
- // 集群模式
- if (nativeConnection instanceof JedisCluster) {
- return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
- }
-
- // 单机模式
- else if (nativeConnection instanceof Jedis) {
- return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
- }
- return UNINITIALIZED_STOCK;
- }
- });
- return result;
- }
- }
-
-
- /**
- * 库存操作对外接口
- *
- * create by liuliang
- * on 2019-11-13 11:00
- */
- @Slf4j
- @Service
- public class StockService {
-
- @Autowired
- private StockComponent stockComponent;
-
- @Autowired
- private ProductSkuMapper skuMapper;
-
- @Autowired
- private ProductMapper productMapper;
-
- @Autowired
- private RocketMQConfig rocketMQConfig;
-
- @Autowired
- private PresentRocketProducer presentRocketProducer;
-
- private static final String REDIS_STOCK_KEY="redis_key:stock:";
-
- /**
- * 扣减库存
- * @param skuId
- * @param num
- * @return
- */
- public Boolean stock(String skuId,Integer num) {
- // 库存ID
- String redisKey = REDIS_STOCK_KEY + skuId;
- long stock = stockComponent.stock(redisKey, 60 * 60, num, () -> initStock(skuId));
- if(stock < 0){//异常,库存不足
- log.info("库存不足........");
- ProductSku productSku = skuMapper.selectById(skuId);
- throw new MallException(MsRespCode.STOCK_NUMBER_ERROR,new Object[]{productMapper.selectById(productSku.getProductId()).getTitle()});
- }
- return stock >= 0 ;
- }
-
-
-
- /**
- * 添加redis - sku库存数量
- * @param skuId
- * @param num
- * @return
- */
- public Long addStock(String skuId ,Integer num) {
- // 库存ID
- String redisKey = REDIS_STOCK_KEY + skuId;
- long l = stockComponent.addStock(redisKey, num);
- return l;
- }
-
-
- /**
- * 获取初始的库存
- *
- * @return
- */
- private String initStock(String skuId) {
- //初始化库存
- ProductSku productSku = skuMapper.selectById(skuId);
- return productSku.getStockNumber()+"";
- }
-
- /**
- * 获取sku库存
- * @param skuId
- * @return
- */
- public Integer getStock(String skuId) {
-
- // 库存ID
- String redisKey = REDIS_STOCK_KEY + skuId;
-
- return stockComponent.getStock(redisKey);
- }
- }
redis序列化
-
- /**
- * create by liuliang
- * on 2019-11-13 11:29
- */
- @Configuration
- public class RedisConfig {
- /**
- * 重写Redis序列化方式,使用Json方式:
- * 当我们的数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的。RedisTemplate默认使用的是JdkSerializationRedisSerializer,StringRedisTemplate默认使用的是StringRedisSerializer。
- * Spring Data JPA为我们提供了下面的Serializer:
- * GenericToStringSerializer、Jackson2JsonRedisSerializer、JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。
- * 在此我们将自己配置RedisTemplate并定义Serializer。
- *
- * @param redisConnectionFactory
- * @return
- */
- @Bean
- public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
- RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
- redisTemplate.setConnectionFactory(redisConnectionFactory);
-
- Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
- ObjectMapper om = new ObjectMapper();
- om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
- om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
- jackson2JsonRedisSerializer.setObjectMapper(om);
-
- // 设置值(value)的序列化采用Jackson2JsonRedisSerializer。
- redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
- redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
- // 设置键(key)的序列化采用StringRedisSerializer。
- redisTemplate.setKeySerializer(new StringRedisSerializer());
- redisTemplate.setHashKeySerializer(new StringRedisSerializer());
-
- redisTemplate.afterPropertiesSet();
- return redisTemplate;
- }
- }
作者:crawler
原文地址:https://segmentfault.com/a/1190000021003438
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。