赞
踩
分布式锁来解决高并发
若执行业务出现异常,锁释放不走,产生死锁,解决方案给锁加上过期时间,即便业务出现异常,锁还是会释放
在分布式系统中,使用 Lua 脚本在 Redis 中释放锁是为了确保释放锁的操作是原子的。这很重要,因为在高并发的场景下,可能会有多个客户端尝试同时获取和释放同一个锁。如果不使用原子操作来释放锁,就可能会出现竞态条件(race condition),导致锁被错误地释放或者根本无法释放。
考虑以下场景:
检查锁的值和删除锁不是原子的:如果没有使用 Lua 脚本,释放锁通常涉及两个步骤:首先检查锁的值是否与我们设置的值相匹配,如果匹配,则删除锁。但是,如果这两个操作不是原子的,就有可能出现以下问题:
在我们检查锁的值之后,但在删除锁之前,另一个客户端可能已经获取了锁(即改变了锁的值)。在这种情况下,如果我们仍然删除锁,就会错误地释放了其他客户端持有的锁。
并发删除导致的问题:如果有多个客户端同时尝试释放同一个锁(可能是因为它们都认为自己是锁的持有者),没有原子性保证的删除操作可能会导致不可预知的行为。
使用 Lua 脚本可以确保这两个步骤(检查和删除)在一个原子操作中完成。Redis 保证 Lua 脚本在执行期间不会被其他 Redis 命令打断,这被称为脚本的原子性。因此,当 Lua 脚本在 Redis 中执行时,它会锁定 Redis,直到脚本执行完成。这确保了在脚本执行期间,不会有其他客户端的命令插入进来改变锁的状态。
在代码中,使用 Lua 脚本来释放锁可以确保只有当锁的值与预期的值(即当前客户端设置的值)匹配时,锁才会被删除。这避免了上述的竞态条件,并保证了锁的安全性。
使用Redis的INCR、DECR和EXPIRE命令来操作库存,并通过SETNX和DEL命令来管理锁。
这段代码是一个Spring Boot控制器中的方法,用于减少特定产品的库存,并使用Redis进行库存管理和并发控制。现在,我们从无String lockValue(即没有使用唯一标识符作为锁的值)和无Lua脚本两方面来分析这段代码可能存在的问题。
锁的碰撞问题:
在这段代码中,锁的值被硬编码为字符串"locked"。这意味着,如果两个不同的进程或线程尝试同时锁定同一个产品ID,它们都会检查同一个锁键是否存在。虽然这在一定程度上可以防止并发问题(a线程执行decrStock方法并上锁,未解锁就过期,b线程来执行decrStock方法并上锁,a已经执行完判断锁的逻辑,所以会在b上锁期间,释放b的锁),但如果有其他不相关的操作也使用了相同的锁值(“locked”),就可能导致不必要的锁碰撞,从而影响系统的并发性能。
锁的安全性:
使用固定的锁值可能会降低系统的安全性。如果攻击者知道这个固定的锁值,他们可能会尝试破坏锁机制,例如通过删除或篡改锁键来干扰正常的库存扣减流程。
锁的调试和监控:
当使用固定的锁值时,很难追踪和调试与锁相关的问题。如果使用唯一标识符(如UUID)作为锁的值,那么每个锁的实例都会有一个独特的标识符,这有助于在日志或监控系统中跟踪和识别特定的锁操作。
无Lua脚本
原子性缺失:
当前代码通过多个命令来实现库存扣减的逻辑,这不是原子操作。如果在get和decrement之间Redis状态发生改变,就可能导致数据不一致。在两个操作之间,库存值可能被其他并发请求修改,从而导致数据不一致或超卖的情况。使用Lua脚本可以将多个操作合并成一个原子操作,确保数据的一致性。
性能优化:
由于库存检查和扣减是两个独立的操作,它们需要两次与Redis服务器的网络交互。这增加了网络延迟和I/O开销,降低了系统的性能。
使用Lua脚本可以减少网络往返时间(RTT),因为所有的操作都在服务器端执行,不需要像当前代码那样在客户端和服务器之间多次通信。使用Lua脚本可以将这两个操作合并成一个原子操作,并通过一次网络交互完成,从而提高性能
错误处理和重试:
Lua脚本的执行是原子的,要么全部成功,要么全部失败,这简化了错误处理和重试的逻辑。在当前代码中,如果decrement操作失败,可能需要复杂的逻辑来决定是否重试以及如何重试。
锁的粒度:
使用Lua脚本还可以更精细地控制锁的粒度。例如,可以在脚本内部实现更复杂的逻辑,而无需长时间占用锁,从而减少锁的持有时间,提高系统的并发性能。
复杂性增加:
没有使用Lua脚本意味着开发者需要处理更多的并发控制和错误处理逻辑。例如,在库存不足时需要手动处理重试或返回错误信息。而使用Lua脚本可以简化这部分逻辑,因为脚本在服务器端执行,可以确保操作的原子性。
潜在的竞争条件:
由于检查和扣减操作不是原子的,因此在高并发场景下更容易出现竞争条件。这可能导致库存扣减的不准确或超卖现象。Lua脚本可以避免这种情况,因为它在服务器端以单个原子操作执行。
代码中设置了锁的过期时间(在这个例子中是10秒),这是为了避免死锁而采取的一种预防措施。如果设置了过期时间,即使系统出现异常,锁也会在指定的时间后自动释放,这样可以防止因为异常而导致的永久死锁。
然而,依赖锁的自动过期来解决死锁问题并不是完美的解决方案,因为它引入了新的问题:
锁的续期问题:如果操作需要的时间超过锁的过期时间,那么在操作完成之前锁可能已经过期,这会导致其他线程或进程能够获取到锁并执行操作,从而可能引发并发问题。
业务逻辑的中断:如果在锁过期后业务逻辑还没有执行完,那么其他线程获得锁后可能会打断当前线程的业务逻辑,导致数据状态不一致。
性能问题:如果设置的过期时间太长,虽然可以减少锁过期的风险,但在系统出现异常时,会导致更长时间的资源锁定,影响系统的并发性能。
因此,虽然设置锁的过期时间可以在一定程度上防止死锁,但在设计并发系统时还需要考虑更多因素,比如锁的粒度、锁的续期策略、异常处理机制等,以确保系统的稳定性和性能。
另外,对于重要的业务逻辑,通常建议采用更加健壮的锁机制,比如基于Redis的RedLock算法,或者使用数据库的事务机制来确保数据的一致性。同时,监控和告警系统也应该被建立起来,以便及时发现和处理潜在的锁问题。
package com.example.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; @RestController @RequestMapping("test") public class DecrProductStockController { @Autowired private RedisTemplate<String, Integer> redisTemplate; @GetMapping("decrStock/{proId}") public String decrStock(@PathVariable("proId") Integer proId) { String lockKey = "lock_pro_" + proId; String proKey = "pro_stock_" + proId; // 尝试获取锁 Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 10, TimeUnit.SECONDS); if (Boolean.TRUE.equals(isLocked)) { try { ValueOperations<String, Integer> ops = redisTemplate.opsForValue(); Integer stock = ops.get(proKey); if (stock != null && stock > 0) { ops.decrement(proKey); // 减库存 System.out.println(Thread.currentThread().getName() + " 用户已减库存----"); } else { System.out.println("库存不足了============"); } } finally { // 释放锁 redisTemplate.delete(lockKey); System.out.println(Thread.currentThread().getName() + " 用户已释放锁"); } } else { // 未能获取锁,可以重试或返回失败信息 try { Thread.sleep(100); // 简单的重试延迟 } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 } return decrStock(proId); // 重试减库存操作 } return "success"; } }
lockValue的使用是为了确保只有加锁的线程(或进程)能够解锁。每个尝试获取锁的线程都会生成一个独特的lockValue,通常是UUID。这个值被设置为锁的值,因此在解锁时,只有知道正确lockValue的线程才能成功执行解锁操作。
为什么要设置lockValue:
避免误解锁:在分布式系统中,可能有多个线程或进程尝试同时访问和修改同一资源。如果没有 lockValue,而只是简单地检查锁是否存在,然后删除它,那么一个线程可能会误删其他线程设置的锁,这会导致数据不一致和其他线程能够错误地获取到它们本不应该获取的资源。
提高安全性:通过为每个锁分配一个独特的值,我们可以确保只有设置该锁的线程才能删除它。这是通过比较当前锁的值与尝试解锁的线程提供的 lockValue 来实现的。
防止锁重入:如果一个线程已经持有一个锁,并且没有 lockValue 的检查,它可能会错误地重新获取同一个锁(如果它试图这样做的话)。虽然在这个特定的例子中可能不是主要问题,但在更复杂的系统中,这是一个重要的考虑因素。
处理锁的过期:锁通常会设置过期时间,以防线程在持有锁时崩溃,从而防止死锁。如果有 lockValue,即使锁由于超时而过期,其他线程也不能简单地删除它,除非它们知道正确的 lockValue。这增加了系统的健壮性。
在这个代码中,redisTemplate.execute(deleteLockLua, Arrays.asList(lockKey), lockValue); 这一行确保只有知道正确 lockValue 的线程才能执行解锁操作。这是通过使用 Lua 脚本来原子性地检查锁的值是否匹配提供的 lockValue,如果是,则删除锁。这保证了即使有多个线程尝试解锁,也只有设置锁的线程能够成功解锁。
所以,虽然一个线程在执行 decrStock() 方法并上锁时,其他线程不能直接执行解锁操作,但 lockValue 提供了一个额外的安全层,确保解锁操作的正确性和安全性。
==========
在Redis中,实现分布式锁的一种常见做法是使用SETNX(SET if Not eXists)命令或者Redis的Lua脚本来尝试获取锁,并设置一个过期时间,以防锁无限期地被占用。如果锁设置了过期时间,那么即使持有锁的客户端崩溃或由于某种原因无法释放锁,锁也会在过期时间到达后自动被Redis删除。
你提到的lockValue可以是一个随机生成的字符串,用作锁的值,以确保只有知道这个值的客户端才能释放锁。这是一种安全措施,用来防止一个客户端误删其他客户端设置的锁。
当锁由于超时而过期时,Redis会自动删除这个键(key),因此锁在Redis中将不再存在。这意味着,一旦锁的过期时间到达,任何客户端都可以尝试再次获取这个锁,因为旧的锁已经从Redis中删除了。
为了增加系统的健壮性,客户端在尝试释放锁时应该检查lockValue是否与它设置的值相匹配。如果不匹配,那么客户端就不应该删除这个锁,因为这可能是一个由其他客户端设置的锁。这可以通过使用Redis的Lua脚本来原子性地完成检查和删除操作来实现。
总的来说,当锁在Redis中由于超时而过期时,它将不再存在于Redis中,任何知道如何正确设置lockValue的客户端都可以尝试获取这个锁。
为了增强锁的安全性,避免潜在的锁碰撞,并确保每个锁请求都是唯一的,可以在尝试获取锁时使用一个随机生成的唯一标识符作为锁的值。
redisTemplate的类型从RedisTemplate<String, Integer>更改为RedisTemplate<String, String>,因为现在要在Redis中存储UUID字符串作为锁的值。
在尝试获取锁时,使用lockValue作为锁的值,而不是硬编码的字符串。
在释放锁之前,检查当前锁的值是否与之前设置的lockValue相匹配。这确保了只有锁的原始设置者才能释放锁,从而避免了意外删除其他线程或进程设置的锁。
由于现在在Redis中存储的是字符串值,因此在处理库存数量时需要适当地转换数据类型。
这些更改提高了锁的安全性和可靠性,降低了锁碰撞和误删除锁的风险。
package com.example.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; import java.util.concurrent.TimeUnit; @RestController @RequestMapping("test") public class DecrProductStockController { @Autowired private RedisTemplate<String, String> redisTemplate; // 注意这里改为String, String类型 @GetMapping("decrStock/{proId}") public String decrStock(@PathVariable("proId") Integer proId) { String lockKey = "lock_pro_" + proId; String proKey = "pro_stock_" + proId; String lockValue = UUID.randomUUID().toString().replace("-", ""); // 生成唯一锁值 // 尝试获取锁 Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS); if (Boolean.TRUE.equals(isLocked)) { try { ValueOperations<String, Integer> ops = redisTemplate.opsForValue(); Integer stock = ops.get(proKey); if (stock != null && stock > 0) { ops.decrement(proKey); // 减库存 System.out.println(Thread.currentThread().getName() + " 用户已减库存----"); } else { System.out.println("库存不足了============"); } } finally { // 检查并释放锁,确保只有锁的持有者才能释放 if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) { redisTemplate.delete(lockKey); } System.out.println(Thread.currentThread().getName() + " 用户已释放锁"); } } else { // 未能获取锁,可以重试或返回失败信息 try { Thread.sleep(100); // 简单的重试延迟 } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 } return decrStock(proId); // 重试减库存操作 } return "success"; } }
为了使用 Lua 脚本来确保操作的原子性,需要对原代码进行一些修改,并将 Lua 脚本集成到 decrStock 方法中。以下是一个修改后的版本,包括 Lua 脚本的加载和执行
在这个修改后的版本中,添加了两个 Lua 脚本:一个用于减少库存,另一个用于释放锁。这两个脚本都通过 DefaultRedisScript 类加载,并在需要时通过 RedisTemplate 执行。这样可以确保减库存和释放锁的操作是原子的,从而避免了并发问题。注意,Lua 脚本中的 KEYS[1] 和 ARGV[1] 分别对应于执行脚本时传递的键和参数列表。在这个例子中,键是库存或锁的键,参数是锁的值。
package com.example.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.scripting.support.StaticScriptSource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RequestMapping; import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; @RestController @RequestMapping("test") public class DecrProductStockController { @Autowired private RedisTemplate<String, String> redisTemplate; // Lua 脚本,用于减库存和释放锁 private static final String DECR_STOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) ~= false and tonumber(redis.call('get', KEYS[1])) > 0 then " + " redis.call('decrby', KEYS[1], 1) " + " return 1 " + "else " + " return -1 " + "end"; private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"; private DefaultRedisScript<Long> decrStockScript; private DefaultRedisScript<Long> releaseLockScript; @Autowired public DecrProductStockController(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; // 初始化 Lua 脚本 decrStockScript = new DefaultRedisScript<>(); decrStockScript.setScriptText(DECR_STOCK_LUA_SCRIPT); decrStockScript.setResultType(Long.class); releaseLockScript = new DefaultRedisScript<>(); releaseLockScript.setScriptText(RELEASE_LOCK_LUA_SCRIPT); releaseLockScript.setResultType(Long.class); } @GetMapping("decrStock/{proId}") public String decrStock(@PathVariable("proId") Integer proId) { String lockKey = "lock_pro_" + proId; String proKey = "pro_stock_" + proId; String lockValue = UUID.randomUUID().toString().replace("-", ""); // 尝试获取锁 Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS); if (Boolean.TRUE.equals(isLocked)) { try { // 执行 Lua 脚本来减库存 Long result = redisTemplate.execute(decrStockScript, Collections.singletonList(proKey)); if (result == 1) { System.out.println(Thread.currentThread().getName() + " 用户已减库存----"); } else if (result == -1) { System.out.println("库存不足了============"); } } finally { // 执行 Lua 脚本来释放锁 redisTemplate.execute(releaseLockScript, Collections.singletonList(lockKey), lockValue); System.out.println(Thread.currentThread().getName() + " 用户已释放锁"); } } else { // 重试逻辑 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return decrStock(proId); } return "success"; } }
锁的释放可能存在问题:
在releaseLockScript中,比较了KEYS[1]对应的值和ARGV[1]。但是,在Redis中,键对应的值可能是以字符串形式存储的,而UUID生成的lockValue在比较时应该确保类型一致。为了确保比较的正确性,应该在Lua脚本中将get得到的值转换为字符串后再进行比较。
redis.call(‘get’, KEYS[1])返回的是一个字符串(如果存在的话),不需要转换ARGV[1],因为它是作为Lua脚本的参数传入的,本身就是字符串。但为了保险起见,可以在Lua脚本中显式地将ARGV[1]转换为字符串,虽然这通常是不必要的。
另外,在decrStockScript中,当库存为0时,脚本直接返回-1。这可能会导致一些竞态条件,因为如果两个请求几乎同时到达,并且当前库存为1,那么两个请求都可能看到库存大于0,并都尝试减少库存。为了避免这种情况,我们可以使用Redis的事务功能,或者更简单地,在Lua脚本中添加一个额外的检查来确保减库存的操作是原子的。
// Lua 脚本,用于释放锁
private static final String RELEASE_LOCK_LUA_SCRIPT =
"if redis.call('get', KEYS[1]) == tostring(ARGV[1]) then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
// ... 其他代码保持不变 ...
// 在 decrStock 方法中,不需要做任何额外的修改,因为 Lua 脚本已经处理了字符串转换。
关于decrStockScript,为了确保减库存的原子性,可以在减少库存之前再次检查库存是否大于0。这样,即使有多个请求几乎同时到达,也只有一个请求能够成功减少库存。
// Lua 脚本,用于减库存
private static final String DECR_STOCK_LUA_SCRIPT =
"local stock = tonumber(redis.call('get', KEYS[1])) " +
"if stock and stock > 0 then " +
" redis.call('decrby', KEYS[1], 1) " +
" return 1 " +
"else " +
" return -1 " +
"end";
// ... 其他代码保持不变 ...
锁的过期时间可能过短:
锁设置了10秒的过期时间,这在某些情况下可能过短,尤其是在高并发或有网络延迟的环境中。建议根据实际情况调整锁的过期时间。
重试逻辑可能导致大量递归调用:
如果获取锁失败,代码会等待100毫秒后递归调用decrStock方法。这种重试机制没有最大重试次数限制,可能导致无限递归和栈溢出错误。应该加入一个重试次数的限制。
异常处理不完善:
在执行Lua脚本时,如果Redis服务器出现问题或者网络问题导致脚本执行失败,应该有相应的异常处理逻辑。
代码冗余和可读性:
部分代码可以简化和重构以提高可读性。
// ...(其他部分保持不变) @RestController @RequestMapping("test") public class DecrProductStockController { // ...(其他部分保持不变) // 修改后的减库存方法 @GetMapping("decrStock/{proId}") public String decrStock(@PathVariable("proId") Integer proId) { String lockKey = "lock_pro_" + proId; String proKey = "pro_stock_" + proId; String lockValue = UUID.randomUUID().toString().replace("-", ""); int maxRetries = 5; // 最大重试次数 int retries = 0; // 当前重试次数 while (retries < maxRetries) { // 尝试获取锁 Boolean isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS); // 锁的过期时间调整为30秒 if (Boolean.TRUE.equals(isLocked)) { try { // 执行Lua脚本来减库存 Long result = redisTemplate.execute(decrStockScript, Collections.singletonList(proKey)); if (result == 1) { System.out.println(Thread.currentThread().getName() + " 用户已减库存----"); return "success"; // 减库存成功,直接返回成功信息 } else if (result == -1) { System.out.println("库存不足了============"); return "stock_not_enough"; // 库存不足,返回错误信息 } } finally { // 执行Lua脚本来释放锁,修改Lua脚本以确保类型一致 redisTemplate.execute(releaseLockScript, Collections.singletonList(lockKey), lockValue); System.out.println(Thread.currentThread().getName() + " 用户已释放锁"); } } else { retries++; // 增加重试次数 try { // 等待一段时间后重试,这里使用200毫秒作为等待时间,可根据实际情况调整 Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 处理中断异常,根据实际情况决定是否需要抛出或记录日志等操作 } } } // 重试次数达到上限仍未成功获取锁,返回错误信息或进行其他处理 return "failed_to_acquire_lock"; // 无法获取锁,返回错误信息 } } // ...(其他部分保持不变)
decrProductStock.lua
if redis.call('get', KEYS[1]) ~='0'
then
return redis.call('decrby',KEYS[1],ARGV[1])
else
return -1;
end
deleteLock.lua
if redis.call('get', KEYS[1]) == ARGV[1]
then
return redis.call('del', KEYS[1])
else
return 0
end
package com.example.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * 测试高并发减库存 */ @RestController @RequestMapping("test") public class DecrProductStockController { @Autowired private RedisTemplate redisTemplate; private DefaultRedisScript<Long> decrStockLua; private DefaultRedisScript<Long> deleteLockLua; //在本类di注入之后,在初始化的时候,就要加载lua脚本 @PostConstruct public void loadLuaScript(){ System.out.println("加载lua脚本"); decrStockLua = new DefaultRedisScript<>(); decrStockLua.setResultType(Long.class); decrStockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("decrProductStock.lua"))); deleteLockLua = new DefaultRedisScript<>(); deleteLockLua.setResultType(Long.class); deleteLockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("deleteLock.lua"))); } /** * 1.redis:(1)商品 (2)锁key-(lock_pro_101) value(UUID) * 2.能否获得锁 setnx key value ex 10 * 3.减库存:(1)获得库存get number != null >0 (2)decr number ===>原子性lus脚本 * 4释放锁(1) if get key == uuid (2) del key ===>原子性lua脚本 * * @param proId * @return */ @GetMapping("decrStock/{proId}") public String decrStock(@PathVariable("proId") Integer proId){ System.out.println(Thread.currentThread().getName() + "用户开始抢购商品" + proId); //锁的key-value String lockKey = "lock_pro_"+proId; String lockValue = UUID.randomUUID().toString().replace("-",""); boolean isGetLock = redisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,10, TimeUnit.SECONDS); //商品的key:pro_::101 -product //商品库存key: pro_stock::101 -5 String proKey = "pro_stock_"+proId; //获得到锁 if(isGetLock){ System.out.println(Thread.currentThread().getName() + "用户获得到了商品的锁"+lockKey); // System.out.println(Thread.currentThread().getName()+",获得到了商品的锁"+lockKey); // Integer stock = (Integer)redisTemplate.opsForValue().get(proKey); // if(stock != null && stock >0){ // redisTemplate.opsForValue().decrement(proKey); // } //减库存----LUA Long result = (long) redisTemplate.execute(decrStockLua,Arrays.asList(proKey),1); if (result == -1){ System.out.println("库存不足了============"); }else{ System.out.println(Thread.currentThread().getName() + "用户已减库存----"); } //释放锁 // if(redisTemplate.opsForValue().get(lockKey)==lockValue){ // redisTemplate.delete(lockKey) // } // System.out.println(Thread.currentThread().getName()+"已释放锁"); // System.out.println(Thread.currentThread().getName()+"目前的库存>>>"); //释放锁 ====LUA redisTemplate.execute(deleteLockLua,Arrays.asList(lockKey),lockValue); System.out.println(Thread.currentThread().getName()+"用户已释放锁"); System.out.println("****************"); }else { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } decrStock(proId); } return "success"; } }
package com.example.controller; import com.example.entity.Product; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * 测试高并发减库存 */ @RestController @RequestMapping("test") public class DecrProductStockController { @Autowired private RedisTemplate redisTemplate; private DefaultRedisScript<Long> decrStockLua; private DefaultRedisScript<Long> deleteLockLua; // 在本类DI注入之后,在初始化的时候,就要加载lua脚本 @PostConstruct public void loadLuaScript(){ System.out.println("加载lua脚本。。。。。。。。。。。。。。"); decrStockLua = new DefaultRedisScript<>(); decrStockLua.setResultType(Long.class); decrStockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("decrProductStock.lua"))); deleteLockLua = new DefaultRedisScript<>(); deleteLockLua.setResultType(Long.class); deleteLockLua.setScriptSource(new ResourceScriptSource(new ClassPathResource("deleteLock.lua"))); } /** * 1. redis : (1)商品 (2)锁 key- (lock_pro_101) value (UUID) * 2. 能否获得锁 : setnx key value ex 10 * 3. 减库存: (1)获得库存get number !=null >0 (2)decr number ====> 原子性 LUA 脚本 * 4. 释放锁 (1)if get key == uuid (2) del key ====> 原子性 LUA 脚本 */ @GetMapping("decrStock/{proId}") public String decrStock(@PathVariable("proId") Integer proId){ System.out.println(Thread.currentThread().getName() + "用户开始抢购商品"+ proId); // 锁的key-value String lockKey = "lock_pro_"+proId; String lockValue = UUID.randomUUID().toString().replace("-",""); boolean isGetLock = redisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,10, TimeUnit.SECONDS); //商品的key : pro_::101 - product //商品库存的key : pro_stock::101 - 5 String proKey = "pro_stock_"+proId; //获得到锁 if(isGetLock){ System.out.println(Thread.currentThread().getName() + "用户获得到了商品的锁"+ lockKey); // 减库存====LUA Long result = (Long) redisTemplate.execute(decrStockLua,Arrays.asList(proKey),1); if(result == -1){ System.out.println("库存不足了=================================="); }else{ System.out.println(Thread.currentThread().getName() + "用户已减库存-------"); } // 释放锁 ====LUA redisTemplate.execute(deleteLockLua, Arrays.asList(lockKey),lockValue); System.out.println(Thread.currentThread().getName() + "用户已释放锁 "); System.out.println("********************************************************"); // System.out.println(Thread.currentThread().getName() +"目前的库存是>>>>>>>>>>>>>>>" + redisTemplate.opsForValue().get(proKey)); }else{ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } decrStock(proId); } return "success"; } }
package com.example.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheConfiguration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.*; import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; /** * 配置redistemplate序列化 */ @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { //过期时间-1天 private Duration timeToLive = Duration.ofDays(-1); /** * RedisTemplate 先关配置 * * @param factory * @return */ @Bean @SuppressWarnings("all") public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); //LocalDatetime序列化 JavaTimeModule timeModule = new JavaTimeModule(); timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); om.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); om.registerModule(timeModule); jackson2JsonRedisSerializer.setObjectMapper(om); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key采用String的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // value序列化方式采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash的value序列化方式采用jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } @Bean public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) { //默认1 RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(timeToLive) .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(keySerializer())) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(valueSerializer())) .disableCachingNullValues(); RedisCacheManager redisCacheManager = RedisCacheManager.builder(connectionFactory) .cacheDefaults(config) .transactionAware() .build(); return redisCacheManager; } @Bean RedisMessageListenerContainer listenerContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); listenerContainer.setConnectionFactory(connectionFactory); return listenerContainer; } /** * key 类型 * @return */ private RedisSerializer<String> keySerializer() { return new StringRedisSerializer(); } /** * 值采用JSON序列化 * @return */ private RedisSerializer<Object> valueSerializer() { return new GenericJackson2JsonRedisSerializer(); } }
<artifactId>springboot_redis_demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>springboot_redis_demo</name>
<description>springboot_redis_demo</description>
在这个Java类中,Lua脚本被用于两个关键的操作:减少商品库存和删除锁。这种做法主要是为了确保在高并发环境下,这两个操作能够以原子性的方式执行,防止数据不一致。
减少商品库存 (decrStockLua):
Lua脚本被用于减少库存是为了确保操作的原子性。在高并发场景下,如果没有使用Lua脚本来确保原子性,那么在检查库存和减少库存之间可能会出现竞态条件(race condition)。比如,两个请求几乎同时检查库存,发现都还有库存,然后都进行减少操作,这可能导致超卖。
使用Lua脚本,可以将检查和减少库存这两个操作合并成一个原子操作,确保在检查库存后立即进行减少操作,而不会被其他请求打断。如果库存不足,Lua脚本可以返回一个特定的值(在这个例子中是-1),Java代码可以根据这个返回值来判断库存是否足够。
删除锁 (deleteLockLua):
同样地,删除锁的操作也使用了Lua脚本来确保原子性。在分布式系统中,为了避免多个进程或线程同时修改同一资源,通常会使用锁来确保同一时间只有一个进程/线程可以操作资源。在这个例子中,锁是通过Redis的键来实现的,每个商品都有一个对应的锁。
当某个线程获得了锁,并完成了库存减少的操作后,它需要释放这个锁,以便其他线程可以获得锁并继续操作。使用Lua脚本来删除锁可以确保在检查锁的值和删除锁这两个操作之间是原子性的。这是非常重要的,因为在高并发环境下,如果不使用原子操作,可能会出现一个线程正在释放锁的同时,另一个线程错误地获得了这个锁。
总的来说,Lua脚本在这里的作用是确保关键操作的原子性,从而防止在高并发环境下出现数据不一致的情况。
检查与删除之间的竞态条件:如果没有原子性保证,一个线程可能在检查锁之后、删除锁之前被其他线程抢占,导致错误的线程释放了锁。
误删其他线程的锁:如果两个线程几乎同时尝试释放同一个锁,没有原子性操作可能会导致一个线程的删除操作覆盖了另一个线程的删除操作,从而造成数据的不一致。
为了避免这些问题,可以使用一个Lua脚本来确保这两个步骤(验证和删除)在一个原子操作中完成。Lua脚本在Redis中执行时,会阻塞其他命令的执行,直到脚本执行完成,从而保证了脚本内的命令序列是原子的。
在Redis中设置过期时间的分布式锁确实有一个潜在的问题:如果锁的持有者在锁过期之前未能释放锁,那么其他客户端将能够获取到这个锁,这可能导致并发问题。然而,这并不意味着分布式锁的设置没有意义。
首先,分布式锁的主要目的是在并发环境中保护共享资源,防止多个客户端同时对其进行修改,从而导致数据不一致。通过为锁设置过期时间,可以确保即使锁的持有者崩溃或由于某种原因无法释放锁,锁也不会永久地被占用,其他客户端在锁过期后仍然有机会获取锁并继续操作。
其次,虽然存在锁过期的风险,但在实际应用中,可以通过合理的锁超时时间和重试机制来降低这种风险。例如,锁的持有者可以在获取锁后立即开始处理任务,并尽量在锁过期之前完成任务并释放锁。如果任务需要更长时间来完成,可以考虑将任务拆分成多个较小的部分,并在每个部分之间重新获取锁。
此外,Redis的分布式锁通常与其他机制(如消息队列、事务等)结合使用,以确保数据的一致性和完整性。因此,尽管存在锁过期的风险,但分布式锁仍然是并发环境中保护共享资源的一种有效手段。
使用了Redis的Lua脚本来确保减库存和删除锁的操作是原子的。这是一个很好的做法,因为它可以防止在多个命令之间出现竞态条件。然而,仍然需要注意锁的过期时间和重试机制的设计,以确保系统的稳定性和可靠性。
最后,需要强调的是,分布式锁并不是解决所有并发问题的银弹。在使用分布式锁之前,应该仔细评估问题的性质和需求,以确定是否需要使用锁以及如何使用锁来解决问题。在某些情况下,可能还有其他更适合的解决方案(如使用乐观锁、分布式事务等)。
在这段代码中,使用了Redis的setIfAbsent命令来获取锁,这并不是一个原子操作,因此在高并发场景下,仍然有可能存在竞争条件。不过,这个问题可以通过使用Redis的RedLock算法或者其他分布式锁的解决方案来进一步改善。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。