赞
踩
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心是实现多进程之间互斥,而满足条件的并且常见的有三种:mysql、redis、zookeeper
实现分布式锁时需要实现的两个基本方法:
获取锁:
127.0.0.1:6379[1]> setnx lock thread1
(integer) 1 # 现在成功获取锁了
127.0.0.1:6379[1]> setnx lock thread1
(integer) 0 # 重复获取锁,发现获取失败
释放锁:
127.0.0.1:6379[1]> del lock
(integer) 1 # 删除key,就是手动释放锁
127.0.0.1:6379[1]> KEYS *
(empty array) # 发现key,已经没有了
127.0.0.1:6379[1]> set lock thread ex 8 nx
OK
127.0.0.1:6379[1]> ttl lock
(integer) 6
127.0.0.1:6379[1]> ttl lock
(integer) 4
127.0.0.1:6379[1]> ttl lock
(integer) 1
127.0.0.1:6379[1]> ttl lock
(integer) -2
1、添加释放锁需要判断是否是当前线程,避免锁误删操作。
2、添加LUA脚本解决多条命令原子性问题
尝试获取锁:是因为采用的是非阻塞式。获取锁只是获取一次。要么成功要么失败。
public interface ILock { /** * 尝试获取锁 * @param timeoutSec = EX :锁持有的超时时间,过期后自动释放 * @return true代表获取锁成功; false代表获取锁失败 */ boolean tryLock(long timeoutSec); /** * 释放锁 */ void unlock(); }
1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致。从而避免误删别人的锁。
public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) { this.name = name; this.stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:"; private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-"; private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public boolean tryLock(long timeoutSec) { // 获取线程标示 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock() { // 调用lua脚本 stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); } /*@Override public void unlock() { // 获取线程标示 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标示 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标示是否一致 if(threadId.equals(id)) { // 释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }*/ }
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0
Redisson是开源的框架,在redis基础上实现的分布式工具的集合。而分布式锁只是Redisson的一个子集。
每个Redis服务实例都能管理多达1TB的内存。
Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本
GitHub地址:https://github.com/redisson/redisson
下面这个图是来自官网,可以实现对锁的功能
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
@Configuration
public class RedissonConfig {
// redis的工厂类,可以从中拿到各种工具
@Bean
public RedissonClient redissonClient(){
// 配置类
Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
config.useSingleServer().setAddress("redis://192.168.75.111:6379").setPassword("123321");
// 创建RedissonClient对象。创建客户端
return Redisson.create(config);
}
}
添加配置可以使用yml文件,跟springBoot整合来实现,官网还提供了start。
因为会替代spring提供的redis的配置和实现。
建议使用Redisson时,自己进行配置bean,不和spring提供的redis配置进行掺和。
@Resource private RedissonClient redissonClient; @Test void testRedisson() throws InterruptedException { // 获取锁(可重入),指定锁的名称 RLock lock = redissonClient.getLock("anyLock"); /* 尝试获取锁,参数分别是: 参数一:获取锁的最大等待时间(期间会重试) 参数二:锁自动释放时间,时间单位 1. 无参模式:非阻塞式 等待时间为-1,就是不等待。如果获取失败立即结束。 自动释放为30秒钟,超时30秒后才会释放 */ boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS); // 判断释放获取成功 if(isLock){ try { System.out.println("执行业务"); }finally { // 释放锁 lock.unlock(); } } }
可重入锁:
缺点:redis宕机引起锁失效问题
例如方法A调用方法B,在方法A中先去获得锁,然后执行业务去调用B,而B又要获取同一把锁。
而例如set key value nx time 就是不可重入锁,就会出现死锁的状态。例如:如果A获得锁后,去执行B,B如果也想获得锁,但是A并没有释放锁,所以说就会出现死锁状态。
需要Hash类型
获取锁和释放锁的流程:
1、创建锁的对象
2、在方法A中,获取锁,tryLock时记录锁的线程标识和重试次数为1
3、在方法B中,获取锁。如果是锁已经存在,并且是同一线程时,只需要在重试次数中加1。代表是第二次获取同一个锁。
4、在方法B或者方法A中,执行完业务,释放锁的逻辑是:需要把重试次数减1,并判断是否为0,如果为0则删除锁。
@SpringBootTest class RedissonTest { @Resource private RedissonClient redissonClient; private RLock lock; @BeforeEach void setUp() { lock = redissonClient.getLock("order"); } @Test void method1() throws InterruptedException { // 尝试获取锁 boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS); if (!isLock) { log.error("获取锁失败 .... 1"); return; } try { log.info("获取锁成功 .... 1"); method2(); log.info("开始执行业务 ... 1"); } finally { log.warn("准备释放锁 .... 1"); lock.unlock(); } } void method2() { // 尝试获取锁 boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败 .... 2"); return; } try { log.info("获取锁成功 .... 2"); log.info("开始执行业务 ... 2"); } finally { log.warn("准备释放锁 .... 2"); lock.unlock(); } } }
获取锁和释放锁一定要采用Lua脚本,来确保获取和释放锁的原子性。
获取锁:
local key = KEYS[1]; -- 锁的key local threadId = ARGV[1]; -- 线程唯一标识 local releaseTime = ARGV[2]; -- 锁的自动释放时间 -- 判断是否存在 if(redis.call('exists', key) == 0) then -- 不存在, 获取锁 redis.call('hset', key, threadId, '1'); -- 设置有效期 redis.call('expire', key, releaseTime); return 1; -- 返回结果 end; -- 锁已经存在,判断threadId是否是自己 if(redis.call('hexists', key, threadId) == 1) then -- 不存在, 获取锁,重入次数+1 redis.call('hincrby', key, threadId, '1'); -- 设置有效期 redis.call('expire', key, releaseTime); return 1; -- 返回结果 end; return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
释放锁:
local key = KEYS[1]; -- 锁的key local threadId = ARGV[1]; -- 线程唯一标识 local releaseTime = ARGV[2]; -- 锁的自动释放时间 -- 判断当前锁是否还是被自己持有 if (redis.call('HEXISTS', key, threadId) == 0) then return nil; -- 如果已经不是自己,则直接返回 end; -- 是自己的锁,则重入次数-1 local count = redis.call('HINCRBY', key, threadId, -1); -- 判断是否重入次数是否已经为0 if (count > 0) then -- 大于0说明不能释放锁,重置有效期然后返回 redis.call('EXPIRE', key, releaseTime); return nil; else -- 等于0说明可以释放锁,直接删除 redis.call('DEL', key); return nil; end;
可重试:利用信号量和PubSub【发布订阅】功能实现等待、唤醒、获取锁失败的重试机制。
第一次尝试获取锁失败以后,并不是立即失败,而是利用了redis的PubSub的机制,做一个等待,等待释放锁的消息。
而获取锁成功的线程,在释放锁中会发送一条释放锁的消息。从而会被正在等待的线程通过订阅机制捕获到。
当等到释放锁的消息后,就会重试机制。
不可重试: 获取锁只尝试一次就返回false。
boolean isLock = lock.tryLock();
tryLock()的参数:
long waitTime:获取锁的最大等待时常。当第一次获取锁失败后,不会立即返回false,而是在规定的时间内进行重试,直到超时才会返回false。
long leaseTime:自动失效释放的时间
TimeUnit unit:时间单位
从获取锁这条命令开始往下执行:
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); //将传进来的时间,转换为毫秒 long current = System.currentTimeMillis();//得到当前时间 long threadId = Thread.currentThread().getId(); //得到线程的id,就是将来锁的标识 Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); //尝试获取锁。 if (ttl == null) { return true; //如果获取成功,返回true } else { /* *获取失败,就要再次获取: */ time -= System.currentTimeMillis() - current; //判断是否超时重试时间 if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); return false; } else { current = System.currentTimeMillis(); // 再次得到当前时间 /* 并没有立即去尝试。而是订阅subscribe 其他人释放锁的信号 在释放锁时有这样的语句,用来发布信号:redis.call('piblis',KEY[2],ARGV[1]);" */ RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId); // 当且仅当future在指定的时间限制内完成时为True // 等待time(锁的剩余等待时间),如果等到锁的时间过期, // 还没有等到释放锁的信号,就会返回获取锁失败 if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) { // 等待锁的重试超时时间,就取消订阅 this.unsubscribe(subscribeFuture, threadId); } }); } this.acquireFailed(waitTime, unit, threadId); return false; try { //计算剩余等待时间 time -= System.currentTimeMillis() - current; // 如果剩余等待时间小于0 if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); boolean var20 = false; return var20; } else { boolean var16; // 如果剩余等待时间大于0 。 进入do while循环 do { //得到当前时间 long currentTime = System.currentTimeMillis(); // 第一次去重试 ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null) { var16 = true; return var16; } // 如果获取失败,则看一下剩余时间 time -= System.currentTimeMillis() - currentTime; if (time <= 0L) { this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } // 剩余时间如果还有 currentTime = System.currentTimeMillis(); // 采用信号量。在规定时间内,等待得到释放锁的信号量 // 如果ttl小于等待时间:说明在等待时锁就释放了,就等待ttl的时间 // 如果ttl大于等待时间:等待time的时间 if (ttl >= 0L && ttl < time) { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; // 如果等着ttl 到期后,time肯定还没有到期。那么就一直循环while,等待锁的释放信号 } while(time > 0L); this.acquireFailed(waitTime, unit, threadId); var16 = false; return var16; } } finally { this.unsubscribe(subscribeFuture, threadId); } } } } }
超时释放: 锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
超时续约: 利用watchDog看门狗机制,每隔一段时间(releseTime/3),重置超时时间
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { // if (leaseTime != -1L) { return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { /* getConnectionManager:看门狗的时间,默认是30秒,去获取锁 */ RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 当future完成以后(剩余有效期,和异常) ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e == null) { // 剩余有效期= null, 说明获取锁成功了 if (ttlRemaining == null) { // 任务调度 过期时间 : 自动续期功能 this.scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } } this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
如果redis提供了主从集群,主从同步存在延迟,挡住宕机后,如果从并同步中的锁数据,则会出现锁实现。
主节点:负责增删改
从节点:只负责读的问题
那么主节点会把数据同步到从节点中,但是同步时会存在延迟,即使延迟很短也是会存在。当获取锁后,主从数据还没有来及同步时,主节点宕机了。主备切换后,在新的master节点中,发现锁并不存在了。
原理:多个独立的redis节点,必须在所有节点都获取重入锁,才算获取锁成功
优点:所有锁中最安全的实现方法
缺点:运维成本高、实现复杂
既然主从关系是导致一致性问题的原因,那么Redisson取消主从,那么所有的节点都是独立的redisson节点,相互之间没有任何关系,都可以做读写操作。那么获取锁时,依次在多个节点中进行获取锁操作。
可用性问题: 即使某一个节点宕机后,那么其他节点都有锁的信息。
更高的可用性: 在每一个节点后面加入slave节点,做主从同步。
即使加入了主从同步,也不会出现安全问题。
假设某一台master宕机后,刚好并没有完成数据同步。那么slave变成了master主节点。没有锁标识。
有一个线程趁虚而入,想要获取锁,并不能获取成功。因为只有在每一个节点都拿到锁才能获取成功。
只要任意一个节点存活中,其他线程就不能拿到锁,就不会出现锁失效的问题。
优点:保留了主从 机制,确保了整个redis的高可用特性,避免了主从一致引发的锁失效问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。