当前位置:   article > 正文

【Redis(9)】Spring Boot整合Redis,实现分布式锁,保证分布式系统中节点操作一致性_springboot 分布式 数据一致性

springboot 分布式 数据一致性

在上一篇系列文章中,咱们利用Redis解决了缓存穿透、缓存击穿、缓存雪崩等缓存问题,Redis除了解决缓存问题,还能干什么呢?这是今天咱们要接着探讨的问题。

在分布式系统中,为了保证在多个节点间操作的一致性,引入了分布式锁的概念。那么什么是分布式锁?为什么要用分布式锁?怎么实现分布式锁?带着这些问题,接下来本文将带你一起实现一个基于Redis的分布式锁?

什么是分布式锁?

分布式锁是一种在分布式系统中用来保证同一时间只有一个进程能操作共享资源的机制。它类似于我们熟知的单机环境下的锁,但分布式锁跨越了单机的界限,作用于多台机器之间,确保了在多个节点上的协调一致。

为什么要使用分布式锁?

在没有分布式锁的情况下,多个节点可能会同时修改共享资源,导致数据不一致甚至丢失。例如,在电子商务平台的库存管理中,如果多个用户同时下单购买同一件商品,而系统没有正确地管理库存,就可能出现超卖的情况。

如何实现分布式锁?

实现分布式锁有多种方式,以下是一些常见的实现策略:

基于数据库的锁:使用数据库的排他锁(如SQL中的SELECT ... FOR UPDATE)可以实现简单的分布式锁。

基于缓存的锁:使用分布式缓存系统(如Redis)提供的原子命令(如SETNX)来实现锁的功能。

基于ZooKeeper的锁:ZooKeeper的临时有序节点可以用来实现分布式锁,通过节点的创建和监听来实现锁的获取和释放。

基于etcd的锁:etcd是一个分布式键值存储,也常被用来实现分布式锁,它提供了可靠的键值存储和原子操作。

Redis实现分布式锁

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.core.io.Resource;
  4. import org.springframework.core.io.ResourceLoader;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import org.springframework.data.redis.core.script.DefaultRedisScript;
  7. import org.springframework.data.redis.core.script.RedisScript;
  8. import org.springframework.scheduling.annotation.Scheduled;
  9. import org.springframework.stereotype.Component;
  10. import java.io.IOException;
  11. import java.nio.charset.StandardCharsets;
  12. import java.util.Collections;
  13. import java.util.Map;
  14. import java.util.concurrent.ConcurrentHashMap;
  15. import java.util.concurrent.TimeUnit;
  16. import java.util.concurrent.locks.ReentrantLock;
  17. /**
  18. * 分布式锁实现,使用Redis作为后端存储。
  19. */
  20. @Component
  21. public class RedisDistributedLock {
  22. // 从配置文件中读取锁的默认有效期(毫秒)
  23. @Value("${lock.leaseTime:30000}")
  24. private long leaseTime;
  25. private final StringRedisTemplate stringRedisTemplate;
  26. private final RedisScript lockScript;
  27. private final RedisScript unlockScript;
  28. private final ReentrantLock lockReentrantLock = new ReentrantLock();
  29. // 存储锁的持有者信息
  30. private final Map<String, String> locks = new ConcurrentHashMap<>();
  31. // 存储锁的过期时间
  32. private final Map<String, Long> lockExpirationTimes = new ConcurrentHashMap<>();
  33. // 锁重试间隔时间
  34. private static final long LOCK_RETRY_INTERVAL_MS = 100L;
  35. // 锁最大重试次数
  36. private static final long LOCK_MAX_RETRY_TIMES = 10L;
  37. @Autowired
  38. public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, ResourceLoader resourceLoader) throws IOException {
  39. this.stringRedisTemplate = stringRedisTemplate;
  40. // 加载和编译Lua锁脚本
  41. this.lockScript = loadScript(resourceLoader, "lock.lua");
  42. this.unlockScript = loadScript(resourceLoader, "unlock.lua");
  43. }
  44. /**
  45. * 从指定路径加载Lua脚本。
  46. */
  47. private RedisScript loadScript(ResourceLoader resourceLoader, String scriptPath) throws IOException {
  48. Resource resource = resourceLoader.getResource("classpath:" + scriptPath);
  49. String script = new String(resource.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
  50. return stringRedisTemplate.getConnectionFactory().getConnection().scriptLoad(script);
  51. }
  52. /**
  53. * 尝试获取锁。
  54. *
  55. * @param lockKey 锁的键。
  56. * @param waitTime 锁的最长等待时间。
  57. * @return 如果获取到锁,返回true;否则返回false。
  58. */
  59. public boolean tryLock(String lockKey, long waitTime) {
  60. String requestId = UUID.randomUUID().toString();
  61. long endTime = System.currentTimeMillis() + waitTime;
  62. int attempts = 0;
  63. while (System.currentTimeMillis() < endTime && attempts < LOCK_MAX_RETRY_TIMES) {
  64. if (tryLockInner(lockKey, requestId)) {
  65. lockReentrantLock.lock();
  66. try {
  67. // 记录锁信息和过期时间
  68. locks.put(lockKey, requestId);
  69. lockExpirationTimes.put(lockKey, System.currentTimeMillis() + leaseTime);
  70. return true;
  71. } finally {
  72. lockReentrantLock.unlock();
  73. }
  74. }
  75. attempts++;
  76. try {
  77. Thread.sleep(LOCK_RETRY_INTERVAL_MS);
  78. } catch (InterruptedException e) {
  79. Thread.currentThread().interrupt();
  80. // 如果线程被中断,返回false
  81. return false;
  82. }
  83. }
  84. return false;
  85. }
  86. /**
  87. * 尝试获取锁的内部方法,使用Redis Lua脚本以保证原子性。
  88. */
  89. private boolean tryLockInner(String lockKey, String requestId) {
  90. return (Boolean) stringRedisTemplate.execute(lockScript,
  91. Collections.singletonList(lockKey),
  92. requestId,
  93. TimeUnit.MILLISECONDS.toMillis(leaseTime));
  94. }
  95. /**
  96. * 释放锁。
  97. *
  98. * @param lockKey 锁的键。
  99. */
  100. public void unlock(String lockKey) {
  101. lockReentrantLock.lock();
  102. try {
  103. String requestId = locks.remove(lockKey);
  104. if (requestId != null) {
  105. unlockInner(lockKey, requestId);
  106. lockExpirationTimes.remove(lockKey);
  107. }
  108. } finally {
  109. lockReentrantLock.unlock();
  110. }
  111. }
  112. /**
  113. * 释放锁的内部方法,使用Redis Lua脚本以保证原子性。
  114. */
  115. private boolean unlockInner(String lockKey, String requestId) {
  116. return (Long) stringRedisTemplate.execute(unlockScript,
  117. Collections.singletonList(lockKey),
  118. requestId) == 1;
  119. }
  120. /**
  121. * 定时任务,用于续期已获取的锁。
  122. */
  123. @Scheduled(fixedRateString = "${lock.renewRate:1000}")
  124. public void renewLocks() {
  125. lockReentrantLock.lock();
  126. try {
  127. long now = System.currentTimeMillis();
  128. for (Map.Entry<String, Long> entry : lockExpirationTimes.entrySet()) {
  129. if (now > entry.getValue()) {
  130. // 如果锁已过期,释放锁
  131. unlock(entry.getKey());
  132. } else {
  133. // 续期锁
  134. boolean renewed = stringRedisTemplate.expire(entry.getKey(), leaseTime - (now - entry.getValue()), TimeUnit.MILLISECONDS);
  135. if (!renewed) {
  136. // 如果续期失败,释放锁
  137. unlock(entry.getKey());
  138. }
  139. }
  140. }
  141. } catch (Exception e) {
  142. // 记录异常日志
  143. } finally {
  144. lockReentrantLock.unlock();
  145. }
  146. }
  147. }

请注意以下几点:

  1. 为了加载Lua脚本,这里使用了Spring的ResourceLoader。需要在类路径下提供lock.luaunlock.lua文件。

  2. tryLock方法只接受锁的键和等待时间,leaseTime从配置文件中获取。

  3. unlock方法只负责解锁,不从等待队列中移除。

  4. renewLocks方法会检查每个锁是否过期,并相应地续期或释放。

  5. 使用了ConcurrentHashMap来存储锁信息和锁过期时间,以支持高并发。

  6. 添加了异常处理和中断处理,但没有实现日志记录。需要根据日志框架(如SLF4J、Log4J等)添加适当的日志记录。

  7. 请确保的配置文件(如application.properties)中设置了lock.leaseTimelock.renewRate属性。

lock.lua文件

  1. -- lock.lua
  2. -- 参数1: 锁的key
  3. -- 参数2: 请求ID
  4. -- 参数3: 锁的超时时间(毫秒)
  5. local lockKey = KEYS[1]
  6. local requestId = ARGV[1]
  7. local leaseTime = tonumber(ARGV[2])
  8. -- 检查锁是否存在,如果不存在则设置锁,并返回1
  9. if redis.call('set', lockKey, requestId, 'NX', 'PX', leaseTime) == 1 then
  10. return 1
  11. else
  12. -- 如果锁已经存在,则返回0
  13. return 0
  14. end

unlock.lua文件

  1. -- unlock.lua
  2. -- 参数1: 锁的key
  3. -- 参数2: 请求ID
  4. local lockKey = KEYS[1]
  5. local requestId = ARGV[1]
  6. -- 检查锁是否存在,并且锁的持有者ID与传入的请求ID匹配,如果匹配则删除锁
  7. if redis.call('get', lockKey) == requestId then
  8. return redis.call('del', lockKey)
  9. else
  10. -- 如果锁存在但请求ID不匹配,或者锁不存在,则返回0
  11. return 0
  12. end

这些Lua脚本通过使用Redis的原子命令来确保锁的获取和释放操作的原子性。在lock.lua脚本中,使用set命令尝试设置一个锁,如果锁不存在(NX),则设置成功并返回1,否则返回0。

unlock.lua脚本中,首先检查锁是否存在,并且当前请求者是否是锁的持有者,如果是,则删除锁。

请将这些脚本保存为.lua文件,并确保它们位于Spring Boot项目的classpath路径下,以便RedisDistributedLock类可以加载它们。同时,确保Redis服务器配置允许执行Lua脚本,并且没有禁用Lua脚本命令。

使用方式

首先,确保您的项目中已经包含了Spring框架和Spring Data Redis的相关依赖。然后,按照以下步骤使用上述代码:

  1. 配置Redis:在Spring配置文件中配置Redis连接信息。

  2. 注入依赖:在Spring组件中注入StringRedisTemplateResourceLoader

  3. 配置锁参数:在配置文件中设置锁的有效期(lock.leaseTime)和锁续期的时间间隔(lock.renewRate)。

  4. 使用锁:在需要同步的代码块前后,使用tryLock方法尝试获取锁,并在操作完成后调用unlock方法释放锁。

    1. @Autowired
    2. private RedisDistributedLock redisDistributedLock;
    3. public void criticalSection() {
    4. String lockKey = "some_resource_key";
    5. long waitTime = 10000; // 等待10秒获取锁
    6. if (redisDistributedLock.tryLock(lockKey, waitTime)) {
    7. try {
    8. // 临界区代码
    9. } finally {
    10. redisDistributedLock.unlock(lockKey);
    11. }
    12. }
    13. }

优缺点

优点

  • 线程安全:通过ReentrantLock确保了多线程环境下的线程安全。
  • 自动续期:通过定时任务自动续期,减少了锁提前释放的风险。
  • 高可用:Redis的分布式特性提供了高可用的锁机制。

缺点

  • 资源消耗:定时任务和锁重试机制可能会增加系统资源的消耗。
  • 复杂性:引入了额外的Lua脚本和锁管理逻辑,增加了系统的复杂性。

改进点

异常处理:增强异常处理,确保在出现异常时能够记录日志并采取适当的恢复措施。

性能监控:引入性能监控,以便及时发现并解决潜在的性能瓶颈。

锁优化:考虑使用更高效的锁重试策略,如指数退避,以减少资源消耗。

注意事项

版本兼容性:确保Redis和Spring Data Redis的版本兼容。

锁超时设置:合理设置锁的有效期,避免死锁或资源浪费。

资源释放:确保在操作完成后释放锁,避免资源长时间被占用。

使用场景案例

场景一:数据库记录更新

在多实例的微服务架构中,当需要更新共享数据库中的记录时,可以使用分布式锁来保证同一时间只有一个实例进行更新。

  1. if (redisDistributedLock.tryLock("db_record_123", 5000)) {
  2. try {
  3. // 更新数据库记录
  4. } finally {
  5. redisDistributedLock.unlock("db_record_123");
  6. }
  7. }

场景二:分布式任务调度

在分布式任务调度系统中,使用分布式锁可以避免同一个任务被多个实例重复执行。

  1. if (redisDistributedLock.tryLock("task_123", 5000)) {
  2. try {
  3. // 执行任务
  4. } finally {
  5. redisDistributedLock.unlock("task_123");
  6. }
  7. }

场景三:分布式缓存更新

当多个服务实例需要更新同一个缓存项时,使用分布式锁可以保证只有一个实例在任何给定时间更新缓存。

  1. if (redisDistributedLock.tryLock("cache_item_123", 5000)) {
  2. try {
  3. // 更新缓存项
  4. } finally {
  5. redisDistributedLock.unlock("cache_item_123");
  6. }
  7. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/793711
推荐阅读
相关标签
  

闽ICP备14008679号