当前位置:   article > 正文

高并发场景下如何利用Redis 实现库存扣减操作_redis incrby并发

redis incrby并发

在电商系统中的商品库存,抽奖系统中的奖品库存等场景中都有类似扣减库存的操作。我们怎么来做呢?

解决方案

  1. 使用mysql数据库,使用一个字段来存储库存,每次扣减库存去更新这个字段。(操作数据库,在高并发场景下数据库压力大不适合)

  2. 还是使用数据库,但是将库存分成多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。(同样是操作数据库,在高并发场景下数据库压力大不适合)

  3. 将库存放到redis使用redis的incrby特性来扣减库存。

分析

通过分析上面的解决方案得出第一种和第二种方式都是基于数据来扣减库存。

基于数据库单库存

        这种方式在所有请求都会在这里等待锁,获取锁再去扣减库存。在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

基于数据库多库存

        这种方式其实是上面一种方式的优化版本,在一定程度上提高了并发量,但是还是会大量的对数据库做更新操作大量占用数据库资源。所以也不适合在高并发场景下使用。

基于数据库来实现扣减库存还存在一些其他问题

  • 用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec再update,这样在并发下会出现超卖的情况。如:
update number set store=store-1 where x > 0
  • MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。
  • 当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。

由于以上的各种情况,我们将采用redis来操作减库存方案,也是本文重点介绍的方案。

基于redis的实现方案

        将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

具体实现

  • 我们使用redis的lua脚本来实现扣减库存

  • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存。

  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

初始化库存回调函数(IStockCallback )

  1. /**
  2. * 获取库存回调
  3. * @author hx
  4. */
  5. public interface IStockCallback {
  6. /**
  7. * 获取库存
  8. * @return
  9. */
  10. int getStock();
  11. }

 扣减库存服务(StockServiceImpl)

  1. /**
  2. * 扣库存
  3. *
  4. * @author hx
  5. */
  6. @Service
  7. public class StockServiceImpl {
  8. Logger logger = LoggerFactory.getLogger(StockServiceImpl.class);
  9. /**
  10. * 不限库存
  11. */
  12. public static final long UNINITIALIZED_STOCK = -3L;
  13. /**
  14. * Redis 客户端
  15. */
  16. @Autowired
  17. private RedisTemplate<String, Object> redisTemplate;
  18. /**
  19. * 执行扣库存的脚本
  20. */
  21. public static final String STOCK_LUA;
  22. static {
  23. /**
  24. *
  25. * @desc 扣减库存Lua脚本
  26. * 库存(stock)-1:表示不限库存
  27. * 库存(stock)0:表示没有库存
  28. * 库存(stock)大于0:表示剩余库存
  29. *
  30. * @params 库存key
  31. * @return
  32. * -3:库存未初始化
  33. * -2:库存不足
  34. * -1:不限库存
  35. * 大于等于0:剩余库存(扣减之后剩余的库存)
  36. * redis缓存的库存(value)是-1表示不限库存,直接返回1
  37. */
  38. StringBuilder sb = new StringBuilder();
  39. sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
  40. sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));");
  41. sb.append(" local num = tonumber(ARGV[1]);");
  42. sb.append(" if (stock == -1) then");
  43. sb.append(" return -1;");
  44. sb.append(" end;");
  45. sb.append(" if (stock >= num) then");
  46. sb.append(" return redis.call('incrby', KEYS[1], 0 - num);");
  47. sb.append(" end;");
  48. sb.append(" return -2;");
  49. sb.append("end;");
  50. sb.append("return -3;");
  51. STOCK_LUA = sb.toString();
  52. }
  53. /**
  54. * @param key 库存key
  55. * @param expire 库存有效时间,单位秒
  56. * @param num 扣减数量
  57. * @param stockCallback 初始化库存回调函数
  58. * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
  59. */
  60. public long stock(String key, long expire, int num, IStockCallback stockCallback) {
  61. long stock = stock(key, num);
  62. // 初始化库存
  63. if (stock == UNINITIALIZED_STOCK) {
  64. RedisLock redisLock = new RedisLock(redisTemplate, key);
  65. try {
  66. // 获取锁
  67. if (redisLock.tryLock()) {
  68. // 双重验证,避免并发时重复回源到数据库
  69. stock = stock(key, num);
  70. if (stock == UNINITIALIZED_STOCK) {
  71. // 获取初始化库存
  72. final int initStock = stockCallback.getStock();
  73. // 将库存设置到redis
  74. redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
  75. // 调一次扣库存的操作
  76. stock = stock(key, num);
  77. }
  78. }
  79. } catch (Exception e) {
  80. logger.error(e.getMessage(), e);
  81. } finally {
  82. redisLock.unlock();
  83. }
  84. }
  85. return stock;
  86. }
  87. /**
  88. * 加库存(还原库存)
  89. *
  90. * @param key 库存key
  91. * @param num 库存数量
  92. * @return
  93. */
  94. public long addStock(String key, int num) {
  95. return addStock(key, null, num);
  96. }
  97. /**
  98. * 加库存
  99. *
  100. * @param key 库存key
  101. * @param expire 过期时间(秒)
  102. * @param num 库存数量
  103. * @return
  104. */
  105. public long addStock(String key, Long expire, int num) {
  106. boolean hasKey = redisTemplate.hasKey(key);
  107. // 判断key是否存在,存在就直接更新
  108. if (hasKey) {
  109. return redisTemplate.opsForValue().increment(key, num);
  110. }
  111. Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
  112. RedisLock redisLock = new RedisLock(redisTemplate, key);
  113. try {
  114. if (redisLock.tryLock()) {
  115. // 获取到锁后再次判断一下是否有key
  116. hasKey = redisTemplate.hasKey(key);
  117. if (!hasKey) {
  118. // 初始化库存
  119. redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
  120. }
  121. }
  122. } catch (Exception e) {
  123. logger.error(e.getMessage(), e);
  124. } finally {
  125. redisLock.unlock();
  126. }
  127. return num;
  128. }
  129. /**
  130. * 获取库存
  131. *
  132. * @param key 库存key
  133. * @return -1:不限库存; 大于等于0:剩余库存
  134. */
  135. public int getStock(String key) {
  136. Integer stock = (Integer) redisTemplate.opsForValue().get(key);
  137. return stock == null ? -1 : stock;
  138. }
  139. /**
  140. * 扣库存
  141. *
  142. * @param key 库存key
  143. * @param num 扣减库存数量
  144. * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
  145. */
  146. private Long stock(String key, int num) {
  147. // 脚本里的KEYS参数
  148. List<String> keys = new ArrayList<>();
  149. keys.add(key);
  150. // 脚本里的ARGV参数
  151. List<String> args = new ArrayList<>();
  152. args.add(Integer.toString(num));
  153. long result = redisTemplate.execute(new RedisCallback<Long>() {
  154. @Override
  155. public Long doInRedis(RedisConnection connection) throws DataAccessException {
  156. Object nativeConnection = connection.getNativeConnection();
  157. // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
  158. // 集群模式
  159. if (nativeConnection instanceof JedisCluster) {
  160. return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
  161. }
  162. // 单机模式
  163. else if (nativeConnection instanceof Jedis) {
  164. return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
  165. }
  166. return UNINITIALIZED_STOCK;
  167. }
  168. });
  169. return result;
  170. }
  171. }

测试

  1. /**
  2. * @author hxg
  3. */
  4. @RestController
  5. public class StockController {
  6. @Autowired
  7. private StockServiceImpl stockService;
  8. @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  9. public Object stock() {
  10. // 商品ID
  11. long commodityId = 1;
  12. // 库存ID
  13. String redisKey = "redis_key:stock:" + commodityId;
  14. long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
  15. return stock >= 0;
  16. }
  17. /**
  18. * 获取初始的库存
  19. *
  20. * @return
  21. */
  22. private int initStock(long commodityId) {
  23. // TODO 这里做一些初始化库存的操作
  24. return 1000;
  25. }
  26. @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  27. public Object getStock() {
  28. // 商品ID
  29. long commodityId = 1;
  30. // 库存ID
  31. String redisKey = "redis_key:stock:" + commodityId;
  32. return stockService.getStock(redisKey);
  33. }
  34. @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  35. public Object addStock() {
  36. // 商品ID
  37. long commodityId = 2;
  38. // 库存ID
  39. String redisKey = "redis_key:stock:" + commodityId;
  40. return stockService.addStock(redisKey, 2);
  41. }
  42. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/515948
推荐阅读
相关标签
  

闽ICP备14008679号