当前位置:   article > 正文

分布式锁原理与实战四:ZooKeeper分布式锁Java代码实现_java zk分布式锁

java zk分布式锁

目录

ZooKeeper分布式锁的基本实现

实战:加锁的实现

lock()方法的实现代码

tryLock()尝试加锁

checkLocked()检查是否持有锁

可重入的实现代码

释放锁的实现

实战:分布式锁的使用

curator的InterProcessMutex 可重入锁

ZooKeeper分布式锁的优点和缺点


ZooKeeper分布式锁的基本实现

        接下来就是基于ZooKeeper,实现一下分布式锁。首先,定义了一个锁的接口Lock,很简单,仅仅两个抽象方法:一个加锁方法,一个解锁方法。Lock接口的代码如下:

  1. package com.crazymakercircle.zk.distributedLock;
  2. /**
  3. * create by 尼恩 @ 疯狂创客圈
  4. **/
  5. public interface Lock {
  6. /**
  7. * 加锁方法
  8. *
  9. * @return 是否成功加锁
  10. */
  11. boolean lock() throws Exception;
  12. /**
  13. * 解锁方法
  14. *
  15. * @return 是否成功解锁
  16. */
  17. boolean unlock();
  18. }
        使用 ZooKeeper  实现分布式锁的算法,有以下几个要点:
  1. 一把分布式锁通常使用一个 Znode 节点表示;如果锁对应的 Znode 节点不存在,首先创建Znode 节点。这里假设为 “/test/lock”,代表了一把需要创建的分布式锁。
  2. 抢占锁的所有客户端,使用锁的  Znode  节点的子节点列表来表示;如果某个客户端需要占用锁,则在 “/test/lock” 下创建一个临时有序的子节点。
    1. 这里,所有临时有序子节点,尽量共用一个有意义的子节点前缀。
    2. 比如,如果子节点的前缀为“/test/lock/seq-” ,则第一次抢锁对应的子节点为 “/test/lock/seq- 000000000” ,第二次抢锁对应的子节点为 “/test/lock/seq-000000001”,以此类推。
    3. 再比如,如果子节点前缀为“/test/lock/” ,则第一次抢锁对应的子节点  “/test/lock/000000000” ,第二次抢锁对应的子节点为 “/test/lock/000000001” ,以此类推,也非常直观。
  3. 如果判定客户端是否占有锁呢?
    1. 很简单,客户端创建子节点后,需要进行判断:自己创建的子节点,是否为当前子节点列表中序号最小的子节点。如果是,则认为加锁成功;如果不是,则监听前一个Znode子节点变更消息,等待前一个节点释放锁。
  4. 一旦队列中的后面的节点,获得前一个子节点变更通知,则开始进行判断,判断自己是否为当前子节点列表中序号最小的子节点,如果是,则认为加锁成功;如果不是,则持续监听,一直到获得锁。
  5. 获取锁后,开始处理业务流程。完成业务流程后,删除自己的对应的子节点,完成释放锁的工
    作,以方面后继节点能捕获到节点变更通知,获得分布式锁。

实战:加锁的实现

        Lock接口中加锁的方法是 lock ()。 lock ()方法的大致流程是:首先尝试着去加锁,如果加锁失败就去等待,然后再重复。

lock()方法的实现代码

  1. package com.crazymakercircle.zk.distributedLock;
  2. import com.crazymakercircle.zk.ZKclient;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.curator.framework.CuratorFramework;
  5. import org.apache.zookeeper.WatchedEvent;
  6. import org.apache.zookeeper.Watcher;
  7. import java.util.Collections;
  8. import java.util.List;
  9. import java.util.concurrent.CountDownLatch;
  10. import java.util.concurrent.TimeUnit;
  11. import java.util.concurrent.atomic.AtomicInteger;
  12. @Slf4j
  13. public class ZkLock implements Lock {
  14. //ZkLock的节点链接
  15. private static final String ZK_PATH = "/test/lock";
  16. private static final String LOCK_PREFIX = ZK_PATH + "/";
  17. private static final long WAIT_TIME = 1000;
  18. //Zk客户端
  19. CuratorFramework client = null;
  20. private String locked_short_path = null;
  21. private String locked_path = null;
  22. private String prior_path = null;
  23. final AtomicInteger lockCount = new AtomicInteger(0);
  24. private Thread thread;
  25. public ZkLock() {
  26. ZKclient.instance.init();
  27. synchronized (ZKclient.instance) {
  28. if (!ZKclient.instance.isNodeExist(ZK_PATH)) {
  29. ZKclient.instance.createNode(ZK_PATH, null);
  30. }
  31. }
  32. client = ZKclient.instance.getClient();
  33. }
  34. @Override
  35. public boolean lock() {
  36. //可重入,确保同一线程,可以重复加锁
  37. synchronized (this) {
  38. if (lockCount.get() == 0) {
  39. thread = Thread.currentThread();
  40. lockCount.incrementAndGet();
  41. } else {
  42. if (!thread.equals(Thread.currentThread())) {
  43. return false;
  44. }
  45. lockCount.incrementAndGet();
  46. return true;
  47. }
  48. }
  49. try {
  50. boolean locked = false;
  51. //首先尝试着去加锁
  52. locked = tryLock();
  53. if (locked) {
  54. return true;
  55. }
  56. //如果加锁失败就去等待
  57. while (!locked) {
  58. await();
  59. //获取等待的子节点列表
  60. List<String> waiters = getWaiters();
  61. //判断,是否加锁成功
  62. if (checkLocked(waiters)) {
  63. locked = true;
  64. }
  65. }
  66. return true;
  67. } catch (Exception e) {
  68. e.printStackTrace();
  69. unlock();
  70. }
  71. return false;
  72. }
  73. //...省略其他的方法
  74. }

tryLock()尝试加锁

        尝试加锁的tryLock 方法是关键,做了两件重要的事情:
        (1)创建临时顺序节点,并且保存自己的节点路径
        (2)判断是否是第一个,如果是第一个,则加锁成功。如果不是,就找到前一个Znode 节点,并且保存其路径到prior_path
        尝试加锁的tryLock 方法,其实现代码如下
  1. /**
  2. * 尝试加锁
  3. * @return 是否加锁成功
  4. * @throws Exception 异常
  5. */
  6. private boolean tryLock() throws Exception {
  7. //创建临时Znode
  8. locked_path = ZKclient.instance
  9. .createEphemeralSeqNode(LOCK_PREFIX);
  10. //然后获取所有节点
  11. List<String> waiters = getWaiters();
  12. if (null == locked_path) {
  13. throw new Exception("zk error");
  14. }
  15. //取得加锁的排队编号
  16. locked_short_path = getShortPath(locked_path);
  17. //获取等待的子节点列表,判断自己是否第一个
  18. if (checkLocked(waiters)) {
  19. return true;
  20. }
  21. // 判断自己排第几个
  22. int index = Collections.binarySearch(waiters, locked_short_path);
  23. if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了
  24. throw new Exception("节点没有找到: " + locked_short_path);
  25. }
  26. //如果自己没有获得锁,则要监听前一个节点
  27. prior_path = ZK_PATH + "/" + waiters.get(index - 1);
  28. return false;
  29. }
  30. private String getShortPath(String locked_path) {
  31. int index = locked_path.lastIndexOf(ZK_PATH + "/");
  32. if (index >= 0) {
  33. index += ZK_PATH.length() + 1;
  34. return index <= locked_path.length() ? locked_path.substring(index)
  35. : "";
  36. }
  37. return null;
  38. }
  • 创建临时顺序节点后,其完整路径存放在 locked_path 成员中;另外还截取了一个后缀路径,放在 locked_short_path 成员中,后缀路径是一个短路径,只有完整路径的最后一层。为什么要单独保存短路径呢?
  • 因为,在获取的远程子节点列表中的其他路径返回结果时,返回的都是短路径,都只有最后一层路径。所以为了方便后续进行比较,也把自己的短路径保存下来。
  • 创建了自己的临时节点后,调用 checkLocked 方法,判断是否是锁定成功。如果锁定成功,则返回 true;如果自己没有获得锁,则要监听前一个节点,此时需要找出前一个节点的路径,并保存在 prior_path 成员中,供后面的 await()等待方法去监听使用。在进入await()等待方法的介绍前,先说下 checkLocked 锁定判断方法。

checkLocked()检查是否持有锁

        在checkLocked ()方法中,判断是否可以持有锁。判断规则很简单:当前创建的节点,是否在上一步获取到的子节点列表的第一个位置:
        (1)如果是,说明可以持有锁,返回true ,表示加锁成功;
        (2)如果不是,说明有其他线程早已先持有了锁,返回false
        checkLocked()方法的代码如下:
  1. private boolean checkLocked(List<String> waiters) {
  2. //节点按照编号,升序排列
  3. Collections.sort(waiters);
  4. // 如果是第一个,代表自己已经获得了锁
  5. if (locked_short_path.equals(waiters.get(0))) {
  6. log.info("成功的获取分布式锁,节点为{}", locked_short_path);
  7. return true;
  8. }
  9. return false;
  10. }
  • checkLocked 方法比较简单,将参与排队的所有子节点列表,从小到大根据节点名称进行排序。排序主要依靠节点的编号,也就是后 Znode 路径的10位数字,因为前缀都是一样的。排序之后,做判断,如果自己的 locked_short_path 编号位置排在第一个,如果是,则代表自己已经获得了锁。如果不是,则会返回 false
  • 如果  checkLocked ()为  false ,外层的调用方法,一般来说会执行  await()等待方法,执行夺锁失败以后的等待逻辑。

await()监听前一个节点释放锁

        await()也很简单,就是监听前一个ZNode节点(prior_path成员)的删除事件,代码如下:

  1. private void await() throws Exception {
  2. if (null == prior_path) {
  3. throw new Exception("prior_path error");
  4. }
  5. final CountDownLatch latch = new CountDownLatch(1);
  6. //订阅比自己次小顺序节点的删除事件
  7. Watcher w = new Watcher() {
  8. @Override
  9. public void process(WatchedEvent watchedEvent) {
  10. System.out.println("监听到的变化 watchedEvent = " + watchedEvent);
  11. log.info("[WatchedEvent]节点删除");
  12. latch.countDown();
  13. }
  14. };
  15. client.getData().usingWatcher(w).forPath(prior_path);
  16. /*
  17. //订阅比自己次小顺序节点的删除事件
  18. TreeCache treeCache = new TreeCache(client, prior_path);
  19. TreeCacheListener l = new TreeCacheListener() {
  20. @Override
  21. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  22. ChildData data = event.getData();
  23. if (data != null) {
  24. switch (event.getType()) {
  25. case NODE_REMOVED:
  26. log.debug("[TreeCache]节点删除, path={}, data={}",
  27. data.getPath(), data.getData());
  28. latch.countDown();
  29. break;
  30. default:
  31. break;
  32. }
  33. }
  34. }
  35. };
  36. treeCache.getListenable().addListener(l);
  37. treeCache.start();
  38. */
  39. latch.await(WAIT_TIME, TimeUnit.SECONDS);
  40. }
  • 首先添加一个 Watcher 监听,而监听的节点,正是前面所保存在 prior_path 成员的前一个节点的路径。这里,仅仅去监听自己前一个节点的变动,而不是其他节点的变动,提升效率。完成监听之后,调用 latch.await(),线程进入等待状态,一直到线程被监听回调代码中的latch.countDown() 所唤醒,或者等待超时。
  • 上面的代码中,监听前一个节点的删除,可以使用两种监听方式:
    • Watcher 订阅;
    • TreeCache 订阅。
  • 两种方式的效果,都差不多。但是这里的删除事件,只需要监听一次即可,不需要反复监听,所以使用的是 Watcher 一次性订阅。而 TreeCache 订阅的代码在源码工程中已经被注释,仅仅供大家参考。
  • 一旦前一个节点 prior_path 节点被删除,那么就将线程从等待状态唤醒,重新一轮的锁的争夺,直到获取锁,并且完成业务处理。
  • 至此,分布式 Lock 加锁的算法,还差一点就介绍完成。这一点,就是实现锁的可重入。

可重入的实现代码

        什么是可重入呢?只需要保障同一个线程进入加锁的代码,可以重复加锁成功即可。
修改前面的 lock 方法,在前面加上可重入的判断逻辑。代码如下:
  1. @Override
  2. public boolean lock() {
  3. //可重入的判断
  4. synchronized (this) {
  5. if (lockCount.get() == 0) {
  6. thread = Thread.currentThread();
  7. lockCount.incrementAndGet();
  8. } else {
  9. if (!thread.equals(Thread.currentThread())) {
  10. return false;
  11. }
  12. lockCount.incrementAndGet();
  13. return true;
  14. }
  15. }
  16. //....
  17. }
        为了变成可重入,在代码中增加了一个加锁的计数器 lockCount ,计算重复加锁的次数。如果是同一个线程加锁,只需要增加次数,直接返回,表示加锁成功。
        至此,lock ()方法已经介绍完成,接下来,就是去释放锁。

释放锁的实现

        Lock接口中的 unLock ()方法,表示释放锁,释放锁主要有两个工作:
        (1)减少重入锁的计数,如果最终的值不是0 ,直接返回,表示成功的释放了一次;
        (2)如果计数器为0 ,移除 Watchers 监听器,并且删除创建的 Znode 临时节点。
        unLock()方法的代码如下:
  1. @Override
  2. public boolean unlock() {
  3. //只有加锁的线程,能够解锁
  4. if (!thread.equals(Thread.currentThread())) {
  5. return false;
  6. }
  7. //减少可重入的计数
  8. int newLockCount = lockCount.decrementAndGet();
  9. //计数不能小于0
  10. if (newLockCount < 0) {
  11. throw new IllegalMonitorStateException("Lock count has gone negative
  12. for lock: " + locked_path);
  13. }
  14. //如果计数不为0,直接返回
  15. if (newLockCount != 0) {
  16. return true;
  17. }
  18. //删除临时节点
  19. try {
  20. if (ZKclient.instance.isNodeExist(locked_path)) {
  21. client.delete().forPath(locked_path);
  22. }
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. return false;
  26. }
  27. return true;
  28. }
        这里,为了尽量保证线程安全,可重入计数器的类型,使用的不是int 类型,而是 Java 并发包中的原子类型——AtomicInteger

实战:分布式锁的使用

        写一个用例,测试一下ZLock的使用,代码如下:

  1. @Test
  2. public void testLock() throws InterruptedException {
  3. for (int i = 0; i < 10; i++) {
  4. FutureTaskScheduler.add(() -> {
  5. //创建锁
  6. ZkLock lock = new ZkLock();
  7. lock.lock();
  8. //每条线程,执行10次累加
  9. for (int j = 0; j < 10; j++) {
  10. //公共的资源变量累加
  11. count++;
  12. }
  13. try {
  14. Thread.sleep(1000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. log.info("count = " + count);
  19. //释放锁
  20. lock.unlock();
  21. });
  22. }
  23. Thread.sleep(Integer.MAX_VALUE);
  24. }
  • 以上代码是10个并发任务,每个任务累加10次,执行以上用例,会发现结果会是预期的和100,如果不使用锁,结果可能就不是100,因为上面的 count 是一个普通的变量,不是线程安全的。
  • 原理上一个  Zlock  实例代表一把锁,并需要占用一个  Znode  永久节点,如果需要很多分布式锁,则也需要很多的不同的Znode 节点。以上代码,如果要扩展为多个分布式锁的版本,还需要进行简单改造,这种改造留给各位自己去练习和实现吧。

curatorInterProcessMutex 可重入锁

        分布式锁 Zlock  自主实现主要的价值:学习一下分布式锁的原理和基础开发,仅此而已。实际的开发中,如果需要使用到分布式锁,并建议去自己造轮子,建议直接使用 Curator  客户端中的各种官方实现的分布式锁,比如其中的 InterProcessMutex 可重入锁。
        这里提供一个简单的InterProcessMutex 可重入锁的使用实例,代码如下:
  1. @Test
  2. public void testzkMutex() throws InterruptedException {
  3. CuratorFramework client = ZKclient.instance.getClient();
  4. final InterProcessMutex zkMutex =
  5. new InterProcessMutex(client, "/mutex");
  6. ;
  7. for (int i = 0; i < 10; i++) {
  8. FutureTaskScheduler.add(() -> {
  9. try {
  10. //获取互斥锁
  11. zkMutex.acquire();
  12. for (int j = 0; j < 10; j++) {
  13. //公共的资源变量累加
  14. count++;
  15. }
  16. try {
  17. Thread.sleep(1000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. log.info("count = " + count);
  22. //释放互斥锁
  23. zkMutex.release();
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. }
  27. });
  28. }
  29. Thread.sleep(Integer.MAX_VALUE);
  30. }

ZooKeeper分布式锁的优点和缺点

  • 总结一下ZooKeeper分布式锁:
    • 优点:ZooKeeper分布式锁(如InterProcessMutex),能有效的解决分布式问题,不可重入问题,使用起来也较为简单。
    • 缺点:ZooKeeper实现的分布式锁,性能并不太高。为啥呢?
  • 因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。大家知道,ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同不到所有的 Follower 机器上,这样频繁的网络通信,性能的短板是非常突出的。
  • 总之,在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。而由于ZooKeeper的高可用特性,所以在并发量不是太高的场景,推荐使用ZooKeeper的分布式锁。
  • 在目前分布式锁实现方案中,比较成熟、主流的方案有两种:
    • 基于 Redis 的分布式锁
    • 基于 ZooKeeper 的分布式锁
  • 两种锁,分别适用的场景为:
    • 基于 ZooKeeper 的分布式锁,适用于高可靠(高可用)而并发量不是太大的场景;
    • 基于 Redis 的分布式锁,适用于并发量很大、性能要求很高的、而可靠性问题可以通过其他方案去弥补的场景。
    • 总之,这里没有谁好谁坏的问题,而是谁更合适的问题。
        最后对本章的内容做个总结:在分布式系统中,ZooKeeper  是一个重要的协调工具
        本章介绍了分布式命名服务、分布式锁的原理以及基于 ZooKeeper 的参考实现。本章的那些实战案例,建议大家自己去动手掌握,无论是应用实际开始、还是大公司面试,都是非常有用的。
        另外,主流的分布式协调中间件,也不仅仅只有Zookeeper,还有非常著名的  Etcd  中间件。但是从学习的层面来说,二者之间的功能设计、核心原理都是差不多的,掌握了 Zookeeper Etcd  的上手使用也是很容易的。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/615380
推荐阅读
相关标签
  

闽ICP备14008679号