当前位置:   article > 正文

Java分布式锁的三种实现方案_java分布式锁的三种实现方式

java分布式锁的三种实现方式

转载:https://m.jb51.net/article/103617.htm

方案一:数据库乐观锁

乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonus),有一个字段(left_count)记录礼物的剩余个数,用户每领取一个奖品,对应的left_count减1,在并发的情况下如何要保证left_count不为负数,乐观锁的实现方式为在红包表上添加一个版本号字段(version),默认为0。

异常实现流程

  1. -- 可能会发生的异常情况
  2. -- 线程1查询,当前left_count为1,则有记录
  3. select * from t_bonus where id = 10001 and left_count > 0
  4. -- 线程2查询,当前left_count为1,也有记录
  5. select * from t_bonus where id = 10001 and left_count > 0
  6. -- 线程1完成领取记录,修改left_count为0,
  7. update t_bonus set left_count = left_count - 1 where id = 10001
  8. -- 线程2完成领取记录,修改left_count为-1,产生脏数据
  9. update t_bonus set left_count = left_count - 1 where id = 10001

通过乐观锁实现

  1. -- 添加版本号控制字段
  2. ALTER TABLE table ADD COLUMN version INT DEFAULT '0' NOT NULL AFTER t_bonus;
  3. -- 线程1查询,当前left_count为1,则有记录,当前版本号为1234
  4. select left_count, version from t_bonus where id = 10001 and left_count > 0
  5. -- 线程2查询,当前left_count为1,有记录,当前版本号为1234
  6. select left_count, version from t_bonus where id = 10001 and left_count > 0
  7. -- 线程1,更新完成后当前的version为1235,update状态为1,更新成功
  8. update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234
  9. -- 线程2,更新由于当前的version为1235,udpate状态为0,更新失败,再针对相关业务做异常处理
  10. update t_bonus set version = 1235, left_count = left_count-1 where id = 10001 and version = 1234

方案二:基于Redis的分布式锁

SETNX命令(SET if Not eXists)\
语法:SETNX key value\
功能:原子性操作,当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。\
Expire命令\
语法:expire(key, expireTime)\
功能:key设置过期时间\
GETSET命令\
语法:GETSET key value\
功能:将给定 key 的值设为 value ,并返回 key 的旧值 (old value),当 key 存在但不是字符串类型时,返回一个错误,当key不存在时,返回nil。\
GET命令\
语法:GET key\
功能:返回 key 所关联的字符串值,如果 key 不存在那么返回特殊值 nil 。\
DEL命令\
语法:DEL key [KEY …]\
功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。

第一种:使用redis的setnx()、expire()方法,用于分布式锁

  • setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
  • expire()命令对lockkey设置超时时间,为的是避免死锁问题。
  • 执行完业务代码后,可以通过delete命令删除key。

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题

第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题

  • setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。
  • get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。
  • 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
  • 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。
  • 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。
  1. package com.jd.ejshop.common.pub;
  2. import com.jd.jim.cli.Cluster;
  3. import javax.annotation.Resource;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * Created by IDEA
  7. * @author cdboxuefeng
  8. * Date: 2016-08-16 14:01
  9. * Desc: redis分布式锁
  10. */
  11. public class RedisLockUtil {
  12. /**
  13. * Redis cluster
  14. */
  15. @Resource
  16. private Cluster cluster;
  17. /***************第一种:使用redis的setnx()、expire()方法,用于分布式锁******************************************/
  18. /**
  19. * 加锁
  20. * @param key redis key
  21. * @param expire 过期时间,单位秒
  22. * @return true:加锁成功,false,加锁失败
  23. */
  24. public boolean lock1(String key, int expire) {
  25. //"1"为任意值,无实际意义
  26. boolean status = cluster.setNX(key, "1");
  27. if(status) {
  28. cluster.expire(key, expire,TimeUnit.SECONDS);
  29. return true;
  30. }
  31. return false;
  32. }
  33. /**
  34. * 解锁
  35. * 执行完业务代码后,可以通过delete命令删除key
  36. * @param key
  37. */
  38. public void unLock1(String key) {
  39. cluster.del(key);
  40. }
  41. /*************第二种:使用redis的setnx()、get()、getset()方法,用于分布式锁,解决死锁问题*******************************************/
  42. /**
  43. * 加锁
  44. * @param key redis key
  45. * @param expire 过期时间,单位秒
  46. * @return true:加锁成功,false,加锁失败
  47. */
  48. public boolean lock2(String key,int expire) {
  49. //value 过期时间节点
  50. long value = System.currentTimeMillis() + expire;
  51. boolean status = cluster.setNX(key, String.valueOf(value));
  52. if(status) {
  53. //key不存在,加锁成功,直接返回
  54. return true;
  55. }
  56. long oldExpireTime = Long.parseLong(cluster.get(key));
  57. if(oldExpireTime < System.currentTimeMillis()) {
  58. //超时
  59. long newExpireTime = System.currentTimeMillis() + expire;
  60. long currentExpireTime = Long.parseLong(cluster.getSet(key, String.valueOf(newExpireTime)));
  61. if(currentExpireTime == oldExpireTime) {
  62. //版本一致加锁成功,原理类似于CAS
  63. return true;
  64. }
  65. }
  66. return false;
  67. }
  68. /**
  69. * 解锁
  70. * 当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,
  71. * 如果小于锁设置的超时时间,则直接执行delete释放锁;
  72. * 如果大于锁设置的超时时间,则不需要再锁进行处理
  73. * @param key
  74. */
  75. public void unLock2(String key) {
  76. long oldExpireTime = Long.parseLong(cluster.get(key));
  77. if(oldExpireTime > System.currentTimeMillis()) {
  78. cluster.del(key);
  79. }
  80. }
  81. /**
  82. * 使用redis实现分布式锁的应用场景
  83. * @param userId
  84. */
  85. public void drawRedPacket(long userId) {
  86. String key = "draw.redpacket.userid:" + userId;
  87. boolean lock = this.lock2(key, 60);
  88. if(lock) {
  89. try {
  90. //领取操作
  91. } finally {
  92. //释放锁
  93. this.unLock2(key);
  94. }
  95. } else {
  96. new RuntimeException("重复领取奖励");
  97. }
  98. }
  99. }

 

 Spring AOP基于注解方式和SpEL实现开箱即用的redis分布式锁策略

  1. package com.jd.ejshop.common;
  2. import java.lang.annotation.ElementType;
  3. import java.lang.annotation.Retention;
  4. import java.lang.annotation.RetentionPolicy;
  5. import java.lang.annotation.Target;
  6. /**
  7. * RUNTIME
  8. * 定义注解
  9. * 编译器将把注释记录在类文件中,在运行时 VM 将保留注释,因此可以反射性地读取。
  10. *
  11. * @author shma1664
  12. */
  13. @Retention(RetentionPolicy.RUNTIME)
  14. @Target(ElementType.METHOD)
  15. public @interface RedisLockable {
  16. String[] key() default "";
  17. long expiration() default 60;
  18. }

 

  1. package com.jd.ejshop.common;
  2. import javax.annotation.Resource;
  3. import java.lang.reflect.Method;
  4. import com.autohome.api.dealer.util.cache.RedisClient;
  5. import com.google.common.base.Joiner;
  6. import org.aspectj.lang.ProceedingJoinPoint;
  7. import org.aspectj.lang.Signature;
  8. import org.aspectj.lang.annotation.Around;
  9. import org.aspectj.lang.annotation.Aspect;
  10. import org.aspectj.lang.annotation.Pointcut;
  11. import org.aspectj.lang.reflect.MethodSignature;
  12. import org.springframework.expression.EvaluationContext;
  13. import org.springframework.expression.Expression;
  14. import org.springframework.expression.ExpressionParser;
  15. import org.springframework.expression.spel.standard.SpelExpressionParser;
  16. import org.springframework.expression.spel.support.StandardEvaluationContext;
  17. import org.springframework.stereotype.Component;
  18. /**
  19. * Created by IDEA
  20. * User: mashaohua
  21. * Date: 2016-09-28 18:08
  22. * Desc:
  23. */
  24. @Aspect
  25. @Component
  26. public class RedisLockAop {
  27. @Resource
  28. private RedisClient redisClient;
  29. @Pointcut("execution(* com.autohome.api.dealer.tuan.service.*.*(..))")
  30. public void pointcut() {
  31. }
  32. @Around("pointcut()")
  33. public Object doAround(ProceedingJoinPoint point) throws Throwable {
  34. Signature signature = point.getSignature();
  35. MethodSignature methodSignature = (MethodSignature) signature;
  36. Method method = methodSignature.getMethod();
  37. String targetName = point.getTarget().getClass().getName();
  38. String methodName = point.getSignature().getName();
  39. Object[] arguments = point.getArgs();
  40. if (method != null && method.isAnnotationPresent(RedisLockable.class)) {
  41. RedisLockable redisLock = method.getAnnotation(RedisLockable.class);
  42. long expire = redisLock.expiration();
  43. String redisKey = getLockKey(targetName, methodName, redisLock.key(), arguments);
  44. boolean isLock = RedisLockUtil.lock2(redisKey, expire);
  45. if (!isLock) {
  46. try {
  47. return point.proceed();
  48. } finally {
  49. unLock2(redisKey);
  50. }
  51. } else {
  52. throw new RuntimeException("您的操作太频繁,请稍后再试");
  53. }
  54. }
  55. return point.proceed();
  56. }
  57. private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) {
  58. StringBuilder sb = new StringBuilder();
  59. sb.append("lock.").append(targetName).append(".").append(methodName);
  60. if (keys != null) {
  61. String keyStr = Joiner.on(".").skipNulls().join(keys);
  62. String[] parameters = ReflectParamNames.getNames(targetName, methodName);
  63. ExpressionParser parser = new SpelExpressionParser();
  64. Expression expression = parser.parseExpression(keyStr);
  65. EvaluationContext context = new StandardEvaluationContext();
  66. int length = parameters.length;
  67. if (length > 0) {
  68. for (int i = 0; i < length; i++) {
  69. context.setVariable(parameters[i], arguments[i]);
  70. }
  71. }
  72. String keysValue = expression.getValue(context, String.class);
  73. sb.append("#").append(keysValue);
  74. }
  75. return sb.toString();
  76. }
  77. }

 

  1. <!-- https://mvnrepository.com/artifact/javassist/javassist -->
  2. <dependency>
  3. <groupId>org.javassist</groupId>
  4. <artifactId>javassist</artifactId>
  5. <version>3.18.1-GA</version>
  6. </dependency>

 

  1. package com.jd.ejshop.common;
  2. import javassist.*;
  3. import javassist.bytecode.CodeAttribute;
  4. import javassist.bytecode.LocalVariableAttribute;
  5. import javassist.bytecode.MethodInfo;
  6. import org.apache.log4j.Logger;
  7. /**
  8. * Created by IDEA
  9. * User: mashaohua
  10. * Date: 2016-09-28 18:39
  11. * Desc:
  12. */
  13. public class ReflectParamNames {
  14. private static Logger log = Logger.getLogger(ReflectParamNames.class);
  15. private static ClassPool pool = ClassPool.getDefault();
  16. static {
  17. ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class);
  18. pool.insertClassPath(classPath);
  19. }
  20. public static String[] getNames(String className, String methodName) {
  21. CtClass cc = null;
  22. try {
  23. cc = pool.get(className);
  24. CtMethod cm = cc.getDeclaredMethod(methodName);
  25. // 使用javaassist的反射方法获取方法的参数名
  26. MethodInfo methodInfo = cm.getMethodInfo();
  27. CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
  28. LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);
  29. if (attr == null) return new String[0];
  30. int begin = 0;
  31. String[] paramNames = new String[cm.getParameterTypes().length];
  32. int count = 0;
  33. int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;
  34. for (int i = 0; i < attr.tableLength(); i++) {
  35. // 为什么 加这个判断,发现在windows 跟linux执行时,参数顺序不一致,通过观察,实际的参数是从this后面开始的
  36. if (attr.variableName(i).equals("this")) {
  37. begin = i;
  38. break;
  39. }
  40. }
  41. for (int i = begin + 1; i <= begin + paramNames.length; i++) {
  42. paramNames[count] = attr.variableName(i);
  43. count++;
  44. }
  45. return paramNames;
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. } finally {
  49. try {
  50. if (cc != null) cc.detach();
  51. } catch (Exception e2) {
  52. log.error(e2.getMessage());
  53. }
  54. }
  55. return new String[0];
  56. }
  57. }

在需要使用分布式锁的地方添加注解

  1. /**
  2. * 抽奖接口
  3. * 添加redis分布式锁保证一个订单只有一个请求处理,防止用户刷礼物,支持SpEL表达式
  4. * redisLockKey:lock.com.autohome.api.dealer.tuan.service.impl.drawBonus#orderId
  5. *
  6. * @param orderId 订单id
  7. * @return 抽中的奖品信息
  8. */
  9. @RedisLockable(key = {"#orderId"}, expiration = 120)
  10. @Override
  11. public BonusConvertBean drawBonus(Integer orderId) throws BonusException {
  12. // 业务逻辑
  13. }

第三种方案:基于Zookeeper的分布式锁

在描述算法流程之前,先看下zookeeper中几个关于节点的有趣的性质:

  • 有序节点:假如当前有一个父节点为/lock,我们可以在这个父节点下面创建子节点;zookeeper提供了一个可选的有序特性,例如我们可以创建子节点“/lock/node-”并且指明有序,那么zookeeper在生成子节点时会根据当前的子节点数量自动添加整数序号,也就是说如果是第一个创建的子节点,那么生成的子节点为/lock/node-0000000000,下一个节点则为/lock/node-0000000001,依次类推。

  • 临时节点:客户端可以建立一个临时节点,在会话结束或者会话超时后,zookeeper会自动删除该节点。

  • 事件监听:在读取数据时,我们可以同时对节点设置事件监听,当节点数据或结构变化时,zookeeper会通知客户端。当前zookeeper有如下四种事件:1)节点创建;2)节点删除;3)节点数据修改;4)子节点变更。

利用节点名称的唯一性来实现独占锁

  1. 另一篇文章写道
  2. https://blog.csdn.net/xiaoliuliu2050/article/details/51226237
  3. ZooKeeper抽象出来的节点结构是一个和unix文件系统类似的小型的树状的目录结构。ZooKeeper机制规定:同一个目录下只能有一个唯一的文件名。例如:我们在Zookeeper目录/test目录下创建,两个客户端创建一个名为Lock节点,只有一个能够成功。
  4. 算法思路: 利用名称唯一性,加锁操作时,只需要所有客户端一起创建/test/Lock节点,只有一个创建成功,成功者获得锁。解锁时,只需删除/test/Lock节点,其余客户端再次进入竞争创建节点,直到所有客户端都获得锁。
  5. 该共享锁实现很符合我们通常多个线程去竞争锁的概念,利用节点名称唯一性的做法简明、可靠。
  6. 由上述算法容易看出,由于客户端会同时收到/test/Lock被删除的通知,重新进入竞争创建节点,故存在"惊群现象"
  7. 总结 这种方案的正确性和可靠性是ZooKeeper机制保证的,实现简单。缺点是会产生“惊群”效应,假如许多客户端在等待一把锁,当锁释放时候所有客户端都被唤醒,仅仅有一个客户端得到锁。

ZooKeeper机制规定同一个目录下只能有一个唯一的文件名,zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/lock/${lock_name}_lock节点,最终成功创建的那个客户端也即拥有了这把锁,创建失败的可以选择监听继续等待,还是放弃抛出异常实现独占锁。
package com.shma.example.zookeeper.lock;

  1. package com.shma.example.zookeeper.lock;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.List;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.locks.Condition;
  9. import java.util.concurrent.locks.Lock;
  10. import org.apache.zookeeper.*;
  11. import org.apache.zookeeper.data.Stat;
  12. /**
  13. * Created by IDEA
  14. * User: mashaohua
  15. * Date: 2016-09-30 16:09
  16. * Desc:
  17. */
  18. public class ZookeeperLock implements Lock, Watcher {
  19. private ZooKeeper zk;
  20. private String root = "/locks";//根
  21. private String lockName;//竞争资源的标志
  22. private String myZnode;//当前锁
  23. private int sessionTimeout = 30000;
  24. private List<Exception> exception = new ArrayList<Exception>();
  25. /**
  26. * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
  27. *
  28. * @param config 127.0.0.1:2181
  29. * @param lockName 竞争资源标志,lockName中不能包含单词lock
  30. */
  31. public ZookeeperLock(String config, String lockName) {
  32. this.lockName = lockName;
  33. // 创建一个与服务器的连接
  34. try {
  35. zk = new ZooKeeper(config, sessionTimeout, this);
  36. Stat stat = zk.exists(root, false);
  37. if (stat == null) {
  38. // 创建根节点
  39. zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  40. }
  41. } catch (IOException e) {
  42. exception.add(e);
  43. } catch (KeeperException e) {
  44. exception.add(e);
  45. } catch (InterruptedException e) {
  46. exception.add(e);
  47. }
  48. }
  49. @Override
  50. public void lock() {
  51. if (exception.size() > 0) {
  52. throw new LockException(exception.get(0));
  53. }
  54. if (!tryLock()) {
  55. throw new LockException("您的操作太频繁,请稍后再试");
  56. }
  57. }
  58. @Override
  59. public void lockInterruptibly() throws InterruptedException {
  60. this.lock();
  61. }
  62. @Override
  63. public boolean tryLock() {
  64. try {
  65. myZnode = zk.create(root + "/" + lockName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  66. return true;
  67. } catch (KeeperException e) {
  68. e.printStackTrace();
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. return false;
  73. }
  74. @Override
  75. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  76. return tryLock();
  77. }
  78. @Override
  79. public void unlock() {
  80. try {
  81. zk.delete(myZnode, -1);
  82. myZnode = null;
  83. zk.close();
  84. } catch (InterruptedException e) {
  85. e.printStackTrace();
  86. } catch (KeeperException e) {
  87. e.printStackTrace();
  88. }
  89. }
  90. @Override
  91. public Condition newCondition() {
  92. return null;
  93. }
  94. @Override
  95. public void process(WatchedEvent watchedEvent) {
  96. //
  97. }
  98. }
  1. ZookeeperLock lock = null;
  2. try {
  3. lock = new ZookeeperLock("127.0.0.1:2182","test1");
  4. lock.lock();
  5. //业务逻辑处理
  6. } catch (LockException e) {
  7. throw e;
  8. } finally {
  9. if(lock != null)
  10. lock.unlock();
  11. }

 

利用临时顺序节点控制时序实现

  1. 另一篇文章对该实现原理分析:
  2. 下面描述使用zookeeper实现分布式锁的算法流程,假设锁空间的根节点为/lock:
  3. 客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推。
  4. 客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听/lock的子节点变更消息,获得子节点变更通知后重复此步骤直至获得锁;
  5. 执行业务代码;
  6. 完成业务流程后,删除对应的子节点释放锁。
  7. 步骤1中创建的临时节点能够保证在故障的情况下锁也能被释放,考虑这么个场景:假如客户端a当前创建的子节点为序号最小的节点,获得锁之后客户端所在机器宕机了,客户端没有主动删除子节点;如果创建的是永久的节点,那么这个锁永远不会释放,导致死锁;由于创建的是临时节点,客户端宕机后,过了一定时间zookeeper没有收到客户端的心跳包判断会话失效,将临时节点删除从而释放锁。
  8. 另外细心的朋友可能会想到,在步骤2中获取子节点列表与设置监听这两步操作的原子性问题,考虑这么个场景:客户端a对应子节点为/lock/lock-0000000000,客户端b对应子节点为/lock/lock-0000000001,客户端b获取子节点列表时发现自己不是序号最小的,但是在设置监听器前客户端a完成业务流程删除了子节点/lock/lock-0000000000,客户端b设置的监听器岂不是丢失了这个事件从而导致永远等待了?这个问题不存在的。因为zookeeper提供的API中设置监听器的操作与读操作是原子执行的,也就是说在读子节点列表时同时设置监听器,保证不会丢失事件。
  9. 最后,对于这个算法有个极大的优化点:假如当前有1000个节点在等待锁,如果获得锁的客户端释放锁时,这1000个客户端都会被唤醒,这种情况称为“羊群效应”;在这种羊群效应中,zookeeper需要通知1000个客户端,这会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表为/lock/lock-0000000000、/lock/lock-0000000001、/lock/lock-0000000002,序号为1的客户端监听序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息
  10. 所以调整后的分布式锁算法流程如下:
  11. 客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/lock/lock-0000000000,第二个为/lock/lock-0000000001,以此类推;
  12. 客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁;
  13. 执行业务代码;
  14. 完成业务流程后,删除对应的子节点释放锁。
  15. 总结 利用临时顺序节点来实现分布式锁机制其实就是一种按照创建顺序排队的实现。这种方案效率高,避免了“惊群”效应,多个客户端共同等待锁,当锁释放时只有一个客户端会被唤醒。

**********************************************************************

/lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,和选master一样,编号最小的获得锁,用完删除,依次方便。\

算法思路:对于加锁操作,可以让所有客户端都去/lock目录下创建临时顺序节点,如果创建的客户端发现自身创建节点序列号是/lock/目录下最小的节点,则获得锁。否则,监视比自己创建节点的序列号小的节点(比自己创建的节点小的最大节点),进入等待。

对于解锁操作,只需要将自身创建的节点删除即可。

  1. package com.shma.example.zookeeper.lock;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.List;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.locks.Condition;
  9. import java.util.concurrent.locks.Lock;
  10. import org.apache.zookeeper.CreateMode;
  11. import org.apache.zookeeper.KeeperException;
  12. import org.apache.zookeeper.WatchedEvent;
  13. import org.apache.zookeeper.Watcher;
  14. import org.apache.zookeeper.ZooDefs;
  15. import org.apache.zookeeper.ZooKeeper;
  16. import org.apache.zookeeper.data.Stat;
  17. /**
  18. * Created by IDEA
  19. * User: mashaohua
  20. * Date: 2016-09-30 16:09
  21. * Desc:
  22. */
  23. public class DistributedLock implements Lock, Watcher {
  24. private ZooKeeper zk;
  25. private String root = "/locks";//根
  26. private String lockName;//竞争资源的标志
  27. private String waitNode;//等待前一个锁
  28. private String myZnode;//当前锁
  29. private CountDownLatch latch;//计数器
  30. private int sessionTimeout = 30000;
  31. private List<Exception> exception = new ArrayList<Exception>();
  32. /**
  33. * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
  34. *
  35. * @param config 127.0.0.1:2181
  36. * @param lockName 竞争资源标志,lockName中不能包含单词lock
  37. */
  38. public DistributedLock(String config, String lockName) {
  39. this.lockName = lockName;
  40. // 创建一个与服务器的连接
  41. try {
  42. zk = new ZooKeeper(config, sessionTimeout, this);
  43. Stat stat = zk.exists(root, false);
  44. if (stat == null) {
  45. // 创建根节点
  46. zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  47. }
  48. } catch (IOException e) {
  49. exception.add(e);
  50. } catch (KeeperException e) {
  51. exception.add(e);
  52. } catch (InterruptedException e) {
  53. exception.add(e);
  54. }
  55. }
  56. /**
  57. * zookeeper节点的监视器
  58. */
  59. public void process(WatchedEvent event) {
  60. if (this.latch != null) {
  61. this.latch.countDown();
  62. }
  63. }
  64. public void lock() {
  65. if (exception.size() > 0) {
  66. throw new LockException(exception.get(0));
  67. }
  68. try {
  69. if (this.tryLock()) {
  70. System.out.println("Thread " + Thread.currentThread().getId() + " " + myZnode + " get lock true");
  71. return;
  72. } else {
  73. waitForLock(waitNode, sessionTimeout);//等待锁
  74. }
  75. } catch (KeeperException e) {
  76. throw new LockException(e);
  77. } catch (InterruptedException e) {
  78. throw new LockException(e);
  79. }
  80. }
  81. public boolean tryLock() {
  82. try {
  83. String splitStr = "_lock_";
  84. if (lockName.contains(splitStr))
  85. throw new LockException("lockName can not contains \\u000B");
  86. //创建临时子节点
  87. myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  88. System.out.println(myZnode + " is created ");
  89. //取出所有子节点
  90. List<String> subNodes = zk.getChildren(root, false);
  91. //取出所有lockName的锁
  92. List<String> lockObjNodes = new ArrayList<String>();
  93. for (String node : subNodes) {
  94. String _node = node.split(splitStr)[0];
  95. if (_node.equals(lockName)) {
  96. lockObjNodes.add(node);
  97. }
  98. }
  99. Collections.sort(lockObjNodes);
  100. System.out.println(myZnode + "==" + lockObjNodes.get(0));
  101. if (myZnode.equals(root + "/" + lockObjNodes.get(0))) {
  102. //如果是最小的节点,则表示取得锁
  103. return true;
  104. }
  105. //如果不是最小的节点,找到比自己小1的节点
  106. String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
  107. waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
  108. } catch (KeeperException e) {
  109. throw new LockException(e);
  110. } catch (InterruptedException e) {
  111. throw new LockException(e);
  112. }
  113. return false;
  114. }
  115. public boolean tryLock(long time, TimeUnit unit) {
  116. try {
  117. if (this.tryLock()) {
  118. return true;
  119. }
  120. return waitForLock(waitNode, time);
  121. } catch (Exception e) {
  122. e.printStackTrace();
  123. }
  124. return false;
  125. }
  126. private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
  127. Stat stat = zk.exists(root + "/" + lower, true);
  128. //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
  129. if (stat != null) {
  130. System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
  131. this.latch = new CountDownLatch(1);
  132. this.latch.await(waitTime, TimeUnit.MILLISECONDS);
  133. this.latch = null;
  134. }
  135. return true;
  136. }
  137. public void unlock() {
  138. try {
  139. System.out.println("unlock " + myZnode);
  140. zk.delete(myZnode, -1);
  141. myZnode = null;
  142. zk.close();
  143. } catch (InterruptedException e) {
  144. e.printStackTrace();
  145. } catch (KeeperException e) {
  146. e.printStackTrace();
  147. }
  148. }
  149. public void lockInterruptibly() throws InterruptedException {
  150. this.lock();
  151. }
  152. public Condition newCondition() {
  153. return null;
  154. }
  155. public class LockException extends RuntimeException {
  156. private static final long serialVersionUID = 1L;
  157. public LockException(String e) {
  158. super(e);
  159. }
  160. public LockException(Exception e) {
  161. super(e);
  162. }
  163. }
  164. }

可能对Zookeeper实现分布式锁讲的不是很透彻,可以移步下面链接查看:

https://blog.csdn.net/qiangcuo6087/article/details/79067136

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号