赞
踩
SETNX lock.foo < current Unix time + lock timeout + 1>
(1)出现死锁的原因
(2)死锁的解决方法
public class Setexnx { private static Logger logger = Logger.getLogger(Setexnx.class.getName()); //最长时间锁为1小时 private final static int maxExpireTime = 1 * 60 * 60; //系统时间偏移量15秒,服务器间的系统时间差不可以超过15秒,避免由于时间差造成错误的解锁 private final static int offsetTime = 15; //分布式锁的实现 public static boolean Lock(String key, String value, int waitTime, int expire) { long start = System.currentTimeMillis(); String lock_key = key + "_lock"; logger.info("开始获取分布式锁 key:" + key + " lock_key:" + lock_key + " value:" + value); do { try { Thread.sleep(1); long ret = CacheUtils.Setnx(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key, System.currentTimeMillis() + "$T$" + value, (expire > maxExpireTime) ? maxExpireTime : expire); if (ret == 1) { logger.info("成功获得分布式锁 key:" + key + " value:" + value); return Boolean.TRUE; } else { // 存在锁,并对死锁进行修复 String desc = CacheUtils.GSetnx(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key); // 首次锁检测 if (desc.indexOf("$T$") > 0) { // 上次锁时间 long lastLockTime = NumberUtils.toLong(desc.split("[$T$]")[0]); // 明确死锁,利用Setex复写,再次设定一个合理的解锁时间让系统正常解锁 if (System.currentTimeMillis() - lastLockTime > (expire + offsetTime) * 1000) { // 原子操作,只需要一次,会发生小概率事件,多个服务同时发现死锁同时执行此行代码(并发), // 为什么设置解锁时间为expire(而不是更小的时间),防止在解锁发送错乱造成新锁解锁 CacheUtils.Setex(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key, value, expire); logger.warn("发现死锁【" + expire + "秒后解锁】key:" + key + " desc:" + desc); } else { logger.info("当前锁key:" + key + " desc:" + desc); } } else { logger.warn("死锁解锁中key:" + key + " desc:" + desc); } } if (waitTime == 0) { break; } Thread.sleep(500); } catch (Exception ex) { logger.error(Trace.GetTraceStackDetails("获取锁失败", ex)); } } while ((System.currentTimeMillis() - start) < waitTime * 1000); logger.warn("获取分布式锁失败 key:" + key + " value:" + value); return Boolean.FALSE; } //解锁 public static boolean UnLock(String key) { String lock_key = key + "_lock"; try { CacheUtils.Del(CacheSpacePrefixEnum.TOOLBAR_SYS.name(), lock_key); } catch (Exception ex) { logger.error(Trace.GetTraceStackDetails("解锁锁失败key:" + key + " lock_key:" + lock_key, ex)); } return Boolean.FALSE; } public Long Setnx(String key, String value, int expireTime) throws Exception { ShardedJedis jedis = null; try { jedis = pool.getResource(); Long ret = jedis.setnx(key, value); if (ret == 1 && expireTime > 0) { jedis.expire(key, expireTime); } return ret; } catch (Exception e) { throw e; } finally { if (pool != null && jedis != null) { pool.returnResourceObject(jedis); } } } public String GSetnx(String key) throws Exception { ShardedJedis jedis = null; try { jedis = pool.getResource(); return jedis.get(key); } catch (Exception e) { throw e; } finally { if (pool != null && jedis != null) { pool.returnResourceObject(jedis); } } } }
存在的问题
public class RedisLockSetNx { private static Logger logger = Logger.getLogger(RedisLockSetNx.class.getName()); private RedisTemplate redisTemplate; private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100; private String lockKey; //锁超时时间,防止线程在入锁以后,无限的执行等待 private int expireMsecs = 60 * 1000; //锁等待时间,防止线程饥饿 private int timeoutMsecs = 10 * 1000; private volatile boolean locked = false; //Detailed constructor with default acquire timeout 10000 msecs and lock expiration of 60000 msecs. public RedisLock(RedisTemplate redisTemplate, String lockKey) { this.redisTemplate = redisTemplate; this.lockKey = lockKey + "_lock"; } //Detailed constructor with default lock expiration of 60000 msecs. public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs) { this(redisTemplate, lockKey); this.timeoutMsecs = timeoutMsecs; } //Detailed constructor. public RedisLock(RedisTemplate redisTemplate, String lockKey, int timeoutMsecs, int expireMsecs) { this(redisTemplate, lockKey, timeoutMsecs); this.expireMsecs = expireMsecs; } //获取lockkey public String getLockKey() { return lockKey; } //获取键的值 private String get(final String key) { Object obj = null; try { obj = redisTemplate.execute(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisSerializer serializer = new StringRedisSerializer(); byte[] data = connection.get(serializer.serialize(key)); connection.close(); if (data == null) { return null; } return serializer.deserialize(data); } }); } catch (Exception e) { logger.error("get redis error, key : {}", key); } return obj != null ? obj.toString() : null; } //设置键的值 private boolean setNX(final String key, final String value) { Object obj = null; try { obj = redisTemplate.execute(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisSerializer serializer = new StringRedisSerializer(); Boolean success = connection.setNX(serializer.serialize(key), serializer.serialize(value)); connection.close(); return success; } }); } catch (Exception e) { logger.error("setNX redis error, key : {}", key); } return obj != null ? (Boolean) obj : false; } //获取并设置键的值,并返回旧值 private String getSet(final String key, final String value) { Object obj = null; try { obj = redisTemplate.execute(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { StringRedisSerializer serializer = new StringRedisSerializer(); byte[] ret = connection.getSet(serializer.serialize(key), serializer.serialize(value)); connection.close(); return serializer.deserialize(ret); } }); } catch (Exception e) { logger.error("setNX redis error, key : {}", key); } return obj != null ? (String) obj : null; } //实现分布式锁 public synchronized boolean lock() throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0) { long expires = System.currentTimeMillis() + expireMsecs + 1; String expiresStr = String.valueOf(expires); //锁到期时间 if (this.setNX(lockKey, expiresStr)) { // 获取锁 locked = true; return true; } //redis系统的时间 String currentValueStr = this.get(lockKey); //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { //获取上一个锁到期时间,并设置现在的锁到期时间, //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的 String oldValueStr = this.getSet(lockKey, expiresStr); //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受 if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { //分布式的情况下:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁 // 获取锁 locked = true; return true; } } timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS; //延迟100 毫秒, Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS); } return false; } //释放锁 public static void wrongReleaseLock(Jedis jedis, String lockKey, String requestId) { // 判断加锁与解锁是不是同一个客户端 if (requestId.equals(jedis.get(lockKey))) { // 若在此时,这把锁突然不是这个客户端的,则会误解锁 jedis.del(lockKey); } } }
存在的问题
public class RedisTool { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; /** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); return LOCK_SUCCESS.equals(result); } /** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); return RELEASE_SUCCESS.equals(result); } }
分析:
存在的问题:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } // 加锁成功进入该方法,即将进入看门狗逻辑 protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } }
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } if (res) { // reschedule itself renewExpiration(); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); ee.setTimeout(task); }
public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 取消看门狗定时任务(响应式编程) cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; }
参考链接:
1.分布式系统互斥性与幂等性问题的分析与解决-美团技术博客
2. 分布式锁的几种实现方式
3. Redis锁从面试连环炮聊到神仙打架
4. Redisson看门狗+时间轮
5. 分布式锁的实现之 redis 篇
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。