当前位置:   article > 正文

Redis之高并发超卖问题解决方案_redis解决超卖 高并发

redis解决超卖 高并发

目录

1. Redis高并发超卖问题解决方案

1.1 高并发场景超卖bug解析

1.2 Redisson


1. Redis高并发超卖问题解决方案

在高并发的秒杀抢购场景中,常常会面临一个称为“超卖”(Over-Selling)的问题。超卖指的是同一件商品被售出的数量超过了实际库存数量,导致库存出现负数。这是由于多个用户同时发起抢购请求,而系统未能有效地控制库存的并发访问。

下面进行一个秒杀购买某个商品的接口模拟,代码如下:

  1. @RestController
  2. public class MyController {
  3. @Autowired
  4. StringRedisTemplate stringRedisTemplate;
  5. @RequestMapping("/buy/{id}")
  6. public String buy(@PathVariable("id") Long id){
  7. String key="product_" + id;
  8. int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
  9. if(count>0){
  10. stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
  11. System.out.println(key+"商品购买成功,剩余库存"+count);
  12. return "success";
  13. }
  14. System.out.println(key+"商品库存不足");
  15. return "error";
  16. }
  17. }

上面的代码在高并发环境下容易出现超卖问题,使用JMeter进行压测,如下图:

5f9ddbaf5ca24350a921c52976c30711.png

dc5ffc12710940f7b3716f579a517f0f.png

 进行压测获得的日志如下图,存在并发安全问题。

a0fc440a4f114afe8a1994bca769a00a.png

 要解决上面的问题,我们一开始想到的是synchronized加锁,但是在 Redis 的高并发环境下,使用 Java 中的 synchronized关键字来解决超卖问题是行不通的,原因如下:

  1. 分布式环境下无效: synchronized是 Java 中的关键字,用于在单个 JVM 中保护共享资源。在分布式环境下,多个服务实例之间无法通过synchronized来同步,因为各个实例之间无法直接共享 JVM 中的锁。

  2. 性能问题: synchronized会导致性能问题,尤其在高并发的情况下,争夺锁可能会成为瓶颈。

对于 Redis 高并发环境下的超卖问题,更合适的解决方案通常是使用 Redis 提供的分布式锁(如基于 Redis 的分布式锁实现)。这可以确保在分布式环境中的原子性和可靠性。

基于Redis的分布式锁,我们可以基于Redis中的Setnx(命令在指定的 key 不存在时,为 key 设置指定的值),更改代码如下:

  1. @RestController
  2. public class MyController {
  3. @Autowired
  4. StringRedisTemplate stringRedisTemplate;
  5. @RequestMapping("/buy/{id}")
  6. public String buy(@PathVariable("id") Long id){
  7. String lock="product_lock_"+id;
  8. String key="product_" + id;
  9. Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
  10. String message="error";
  11. if(!lock1){
  12. System.out.println("业务繁忙稍后再试");
  13. return "业务繁忙稍后再试";
  14. }
  15. //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
  16. try {
  17. int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
  18. if (count > 0) {
  19. stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
  20. System.out.println(key + "商品购买成功,剩余库存" + count);
  21. message="success";
  22. }
  23. }catch (Throwable e){
  24. e.printStackTrace();
  25. }finally {
  26. stringRedisTemplate.delete(lock);
  27. }
  28. if(message.equals("error"))
  29. System.out.println(key+"商品库存不足");
  30. return message;
  31. }
  32. }

然后使用JMeter压测,在10s内陆续发送500个请求,日志如下图,由图可以看出基本解决超卖问题。

42cb97a25afc4276adbec529b685ebb6.png

1.1 高并发场景超卖bug解析

系统在达到finally块之前崩溃宕机,锁可能会一直存在于Redis中。这可能会导致其他进程或线程无法在未来获取该锁,从而导致资源被锁定,后续尝试访问该资源的操作可能被阻塞。因此在redis中给定 key设置过期时间。代码如下:

  1. @RestController
  2. public class MyController {
  3. @Autowired
  4. StringRedisTemplate stringRedisTemplate;
  5. @RequestMapping("/buy/{id}")
  6. public String buy(@PathVariable("id") Long id){
  7. String lock="product_lock_"+id;
  8. String key="product_" + id;
  9. Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock",10, TimeUnit.SECONDS); //保证原子性
  10. // Boolean lock2 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
  11. // stringRedisTemplate.expire(lock,10,TimeUnit.SECONDS); //此时宕机依旧会出现redis锁无法释放,应设置为原子操作
  12. String message="error";
  13. if(!lock1){
  14. System.out.println("业务繁忙稍后再试");
  15. return "业务繁忙稍后再试";
  16. }
  17. //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
  18. try {
  19. int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
  20. if (count > 0) {
  21. stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
  22. System.out.println(key + "商品购买成功,剩余库存" + count);
  23. message="success";
  24. }
  25. }catch (Throwable e){
  26. e.printStackTrace();
  27. }finally {
  28. stringRedisTemplate.delete(lock);
  29. }
  30. if(message.equals("error"))
  31. System.out.println(key+"商品库存不足");
  32. return message;
  33. }
  34. }

在高并发场景下,还存在一个问题,即业务执行时间过长可能导致 Redis 锁提前释放,并且误删除其他线程或进程持有的锁。这可能发生在以下情况:

  1. 线程A获取锁并开始执行业务逻辑。
  2. 由于高并发,其他线程B、C等也尝试获取相同资源的锁。
  3. 由于锁的过期时间设置为10秒,线程A的业务逻辑执行时间超过10秒,导致其锁被 Redis 自动释放。
  4. 线程B在10秒内获取到了之前由线程A持有的锁,并开始执行业务逻辑。
  5. 线程A在业务逻辑执行完成后,尝试删除自己的锁,但由于已经被线程B持有,线程A实际上删除的是线程B的锁。

修改代码如下:

  1. @RestController
  2. public class MyController {
  3. @Autowired
  4. StringRedisTemplate stringRedisTemplate;
  5. @RequestMapping("/buy/{id}")
  6. public String buy(@PathVariable("id") Long id){
  7. String lock="product_lock_"+id;
  8. String key="product_" + id;
  9. String clientId=UUID.randomUUID().toString();
  10. Boolean lock1 = stringRedisTemplate.opsForValue().setIfAbsent(lock, clientId,10, TimeUnit.SECONDS); //保证原子性
  11. // Boolean lock2 = stringRedisTemplate.opsForValue().setIfAbsent(lock, "lock");
  12. // stringRedisTemplate.expire(lock,10,TimeUnit.SECONDS); //此时宕机依旧会出现redis锁无法释放,应设置为原子操作
  13. String message="error";
  14. if(!lock1){
  15. System.out.println("业务繁忙稍后再试");
  16. return "业务繁忙稍后再试";
  17. }
  18. //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
  19. try {
  20. int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
  21. if (count > 0) {
  22. stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
  23. System.out.println(key + "商品购买成功,剩余库存" + count);
  24. message="success";
  25. }
  26. }catch (Throwable e){
  27. e.printStackTrace();
  28. }finally {
  29. if (stringRedisTemplate.opsForValue().get(lock).equals(clientId))//在这里如果有别的业务代码并且耗时较长, stringRedisTemplate.delete(lock)之前还是有可能超过过期时间出现问题
  30. stringRedisTemplate.delete(lock);
  31. }
  32. if(message.equals("error"))
  33. System.out.println(key+"商品库存不足");
  34. return message;
  35. }
  36. }

上面的代码在高并发场景下仍然存在概率很低的问题,所以就有了redisson分布式锁。

1.2 Redisson

Redisson 是一个用于 Java 的 Redis 客户端,它提供了丰富的功能,包括分布式锁。Redisson 的分布式锁实现了基于 Redis 的分布式锁,具有简单易用、可靠性高的特点。

以下是 Redisson 分布式锁的一些重要特性和用法:

  1. 可重入锁: Redisson 的分布式锁是可重入的,同一线程可以多次获取同一把锁,而不会出现死锁。

  2. 公平锁: Redisson 支持公平锁,即按照获取锁的顺序依次获取,避免了某些线程一直获取不到锁的情况。

  3. 锁超时: 可以为分布式锁设置过期时间,确保即使在某些情况下锁没有被显式释放,也能在一定时间后自动释放。

  4. 异步锁: Redisson 提供了异步的分布式锁,通过异步 API 可以在不阻塞线程的情况下获取和释放锁。

  5. 监控锁状态: Redisson 允许监控锁的状态,包括锁是否被某个线程持有,锁的过期时间等。

39b25cb597c34ee3ad06e02ca6a73cf7.png

导入依赖

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson</artifactId>
  4. <version>3.23.5</version>
  5. </dependency>

application.yaml 配置:

  1. spring:
  2. redis:
  3. host: 127.0.0.1
  4. port: 6379
  5. password:
  6. lettuce:
  7. pool:
  8. max-active: 8
  9. max-idle: 8
  10. min-idle: 0
  11. max-wait: 1000ms

RedissonConfig配置:

  1. @Configuration
  2. public class RedissonConfig {
  3. @Value("${spring.redis.host}")
  4. private String host;
  5. @Value("${spring.redis.port}")
  6. private String port;
  7. /**
  8. * RedissonClient,单机模式
  9. */
  10. @Bean
  11. public RedissonClient redisson() {
  12. Config config = new Config();
  13. SingleServerConfig singleServerConfig = config.useSingleServer();
  14. singleServerConfig.setAddress("redis://" + host + ":" + port);
  15. return Redisson.create(config);
  16. }
  17. }

利用Redisson分布式锁解决超卖问题,修改代码如下:

加锁 lock.lock()

阻塞等待,默认等待,加锁的默认时间都是30s,锁的自动续期,如果业务时间长,运行期间会自动给锁续上新的30s,不用担心业务时间长导致锁自动过期被删除,加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s后自动删除。

加锁 lock.lock(10,TimeUnit.SECONDS)

锁到期后,不会自动续期,如果传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间。如果未指定锁的超时时间,只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s就会自动进行续期。

  1. @RestController
  2. public class MyController {
  3. @Autowired
  4. StringRedisTemplate stringRedisTemplate;
  5. @Autowired
  6. RedissonClient redisson;
  7. @RequestMapping("/buy/{id}")
  8. public String buy(@PathVariable("id") Long id){
  9. String message="error";
  10. String lock_key="product_lock_"+id;
  11. String key="product_" + id;
  12. RLock lock = redisson.getLock(lock_key);
  13. //try catch 设计是为了防止在执行业务的时候出现异常导致redis锁一直无法释放
  14. try {
  15. lock.lock();
  16. int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(key));
  17. if (count > 0) {
  18. stringRedisTemplate.opsForValue().set(key, String.valueOf(--count));
  19. System.out.println(key + "商品购买成功,剩余库存" + count);
  20. message="success";
  21. }
  22. }catch (Throwable e){
  23. e.printStackTrace();
  24. }finally {
  25. lock.unlock();
  26. }
  27. if(message.equals("error"))
  28. System.out.println(key+"商品库存不足");
  29. return message;
  30. }
  31. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/330976
推荐阅读
相关标签
  

闽ICP备14008679号