当前位置:   article > 正文

Zookeeper开源客户端curator 分布式锁_locked <0x00000000fc66e180> (a org.apache.curator.

locked <0x00000000fc66e180> (a org.apache.curator.framework.recipes.leader.l

目录

一、概述

二、InterProcessMutex分布式可重入排它锁

三、InterProcessReadWriteLock读写锁

四、总结


一、概述

前面我们已经介绍过zookeeper是分布式锁的实现方案之一,它是利用临时有序节点的特性来保证唯一性的。

Zookeeper分布式锁原理:

  • 获取锁

在zookeeper中,一个分布式锁对应到zookeeper中的一个文件夹,每个客户端线程请求获取分布式锁的时候,就需要在这个文件夹创建一个临时有序节点,有两种情况:

  • 1)、创建的临时顺序节点是文件夹下的第一个节点,则认为是获取分布式锁成功;
  • 2)、创建的临时顺序节点不是文件夹下的第一个节点,则认为当前锁已经被另一个客户端线程获取,此时当前线程进入阻塞等待,这个时候它会监听上一个节点【序号小于当前节点的节点】,等待前一个节点释放锁的时候唤醒当前线程;

阻塞-唤醒逻辑:把文件夹下的节点顺序排一下序,找到当前节点的前一个节点,使用watcher机制监听前面一个节点的变化,当前一个节点被删除时会触发Watch事件,进而唤醒当前阻塞的线程。

如果前一个节点对应的客户端崩溃了,则节点对应的Watch事件也会触发,也会唤醒后一个节点对应的客户端线程,此时仍需要判断当前节点是第一个节点之后才能获取锁,否则继续进入阻塞并监听前面一个节点。

  • 可重入性

当某个客户端线程第一次获取锁成功之后,在JVM内存中的一个ConcurrentMap中存储当前线程对应的锁路径及重入次数,后面同一个线程再次获取锁时,先检查该Map中当前锁是否已被当前线程占用即可,如果已占用,则只需要递增重入次数即可。

注意:只考虑同一个客户端、同一个线程获取同一个分布式锁的可重入性。

  • 释放锁

释放锁时,首先将锁的重入次数减一,然后判断重入次数是否已经为0:

  • 1)如果重入次数为0,则删除当前客户端线程对应的临时顺序节点,删除操作会触发此节点的Watch事件,如果有别的客户端线程正在阻塞等待,则会通过Watch机制唤醒;
  • 2)如果重入次数非0,则说明还未完全释放锁,直接返回即可;

二、InterProcessMutex分布式可重入排它锁

  1. package com.wsh.zookeeper.zookeeperapidemo.curator;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessLock;
  5. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class CuratorInterProcessMutexDemo {
  10. private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessMutexDemo.class);
  11. private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
  12. /**
  13. * Curator对象
  14. */
  15. private static CuratorFramework curatorFramework;
  16. static {
  17. //初始化curator对象
  18. curatorFramework = CuratorFrameworkFactory
  19. .builder()
  20. //会话超时时间
  21. .sessionTimeoutMs(5000)
  22. //服务器集群地址
  23. .connectString(ZOOKEEPER_SERVER_ADDRESS)
  24. //重试策略
  25. .retryPolicy(new ExponentialBackoffRetry(1000, 3))
  26. .build();
  27. //开启客户端
  28. curatorFramework.start();
  29. }
  30. private static void getLock(InterProcessLock interProcessLock) {
  31. try {
  32. interProcessLock.acquire();
  33. logger.info("线程:" + Thread.currentThread().getName() + "获取锁成功......");
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. } finally {
  37. try {
  38. interProcessLock.release();
  39. logger.info("线程:" + Thread.currentThread().getName() + "释放锁成功......");
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. public static void main(String[] args) {
  46. //分布式可重入排它锁
  47. InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/distributeLock");
  48. new Thread(() -> getLock(interProcessMutex), "T1").start();
  49. new Thread(() -> getLock(interProcessMutex), "T2").start();
  50. new Thread(() -> getLock(interProcessMutex), "T3").start();
  51. new Thread(() -> getLock(interProcessMutex), "T4").start();
  52. }
  53. }

运行程序,观察后端日志:

  1. 14:41:37.532 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T2获取锁成功......
  2. 14:41:37.538 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T2释放锁成功......
  3. 14:41:37.543 [T4] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T4获取锁成功......
  4. 14:41:37.554 [T4] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T4释放锁成功......
  5. 14:41:37.555 [T3] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T3获取锁成功......
  6. 14:41:37.559 [T3] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T3释放锁成功......
  7. 14:41:37.560 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T1获取锁成功......
  8. 14:41:37.564 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T1释放锁成功......

如上可见,多个线程争夺一个共享锁的时候,相互之间是互斥的。流程分析如下图所示:

三、InterProcessReadWriteLock读写锁

curator实现了跨JVM的可重入读写互斥锁。读写锁包含了读锁、写锁两个,它们的互斥关系如下:

  • 1)读写互斥;
  • 2)写写互斥;
  • 3)读读不互斥;

案例:读写互斥

  1. package com.wsh.zookeeper.zookeeperapidemo.curator;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class CuratorInterProcessReadWriteLockDemo {
  10. private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessReadWriteLockDemo.class);
  11. private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
  12. /**
  13. * Curator对象
  14. */
  15. private static CuratorFramework curatorFramework;
  16. /**
  17. * curator读锁对象
  18. */
  19. private static InterProcessMutex readLock;
  20. /**
  21. * curator写锁对象
  22. */
  23. private static InterProcessMutex writeLock;
  24. static {
  25. //初始化curator对象
  26. curatorFramework = CuratorFrameworkFactory
  27. .builder()
  28. //会话超时时间
  29. .sessionTimeoutMs(5000)
  30. //服务器集群地址
  31. .connectString(ZOOKEEPER_SERVER_ADDRESS)
  32. //重试策略
  33. .retryPolicy(new ExponentialBackoffRetry(1000, 3))
  34. .build();
  35. //开启客户端
  36. curatorFramework.start();
  37. InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/distribute-lock");
  38. readLock = interProcessReadWriteLock.readLock();
  39. writeLock = interProcessReadWriteLock.writeLock();
  40. }
  41. public static void main(String[] args) {
  42. new Thread(() -> {
  43. try {
  44. readLock.acquire();
  45. logger.info("线程:" + Thread.currentThread().getName() + "获取读锁...");
  46. Thread.sleep(3000);
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. } finally {
  50. try {
  51. readLock.release();
  52. logger.info("线程:" + Thread.currentThread().getName() + "释放读锁...");
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. }, "T1").start();
  58. new Thread(() -> {
  59. try {
  60. Thread.sleep(2000);
  61. writeLock.acquire();
  62. logger.info("线程:" + Thread.currentThread().getName() + "获取写锁...");
  63. } catch (Exception e) {
  64. e.printStackTrace();
  65. } finally {
  66. try {
  67. writeLock.release();
  68. logger.info("线程:" + Thread.currentThread().getName() + "释放写锁...");
  69. } catch (Exception e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }, "T2").start();
  74. }
  75. }
  1. 15:22:11.460 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1获取读锁...
  2. 15:22:14.468 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1释放读锁...
  3. 15:22:14.470 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2获取写锁...
  4. 15:22:14.473 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2释放写锁...

由上日志可见,读锁先获取到锁资源后,写锁必须等待锁释放之后,才能获取锁。

案例:读读不互斥

  1. package com.wsh.zookeeper.zookeeperapidemo.curator;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class CuratorInterProcessReadWriteLockDemo {
  10. private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessReadWriteLockDemo.class);
  11. private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
  12. /**
  13. * Curator对象
  14. */
  15. private static CuratorFramework curatorFramework;
  16. /**
  17. * curator读锁对象
  18. */
  19. private static InterProcessMutex readLock;
  20. /**
  21. * curator写锁对象
  22. */
  23. private static InterProcessMutex writeLock;
  24. static {
  25. //初始化curator对象
  26. curatorFramework = CuratorFrameworkFactory
  27. .builder()
  28. //会话超时时间
  29. .sessionTimeoutMs(5000)
  30. //服务器集群地址
  31. .connectString(ZOOKEEPER_SERVER_ADDRESS)
  32. //重试策略
  33. .retryPolicy(new ExponentialBackoffRetry(1000, 3))
  34. .build();
  35. //开启客户端
  36. curatorFramework.start();
  37. InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/distribute-lock");
  38. readLock = interProcessReadWriteLock.readLock();
  39. writeLock = interProcessReadWriteLock.writeLock();
  40. }
  41. public static void main(String[] args) {
  42. new Thread(() -> {
  43. try {
  44. readLock.acquire();
  45. logger.info("线程:" + Thread.currentThread().getName() + "获取读锁...");
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. } finally {
  49. try {
  50. readLock.release();
  51. logger.info("线程:" + Thread.currentThread().getName() + "释放读锁...");
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }, "T1").start();
  57. new Thread(() -> {
  58. try {
  59. readLock.acquire();
  60. logger.info("线程:" + Thread.currentThread().getName() + "获取读锁...");
  61. } catch (Exception e) {
  62. e.printStackTrace();
  63. } finally {
  64. try {
  65. readLock.release();
  66. logger.info("线程:" + Thread.currentThread().getName() + "释放读锁...");
  67. } catch (Exception e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }, "T2").start();
  72. }
  73. }
  1. 15:32:30.347 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2获取读锁...
  2. 15:32:30.347 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1获取读锁...
  3. 15:32:30.354 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1释放读锁...
  4. 15:32:30.354 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2释放读锁...

 由上面的日志可见,两个读线程之间获取锁资源的话,是可以同时获取到锁的,并不互斥。

案例:写写互斥

  1. package com.wsh.zookeeper.zookeeperapidemo.curator;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. public class CuratorInterProcessReadWriteLockDemo {
  10. private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessReadWriteLockDemo.class);
  11. private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
  12. /**
  13. * Curator对象
  14. */
  15. private static CuratorFramework curatorFramework;
  16. /**
  17. * curator读锁对象
  18. */
  19. private static InterProcessMutex readLock;
  20. /**
  21. * curator写锁对象
  22. */
  23. private static InterProcessMutex writeLock;
  24. static {
  25. //初始化curator对象
  26. curatorFramework = CuratorFrameworkFactory
  27. .builder()
  28. //会话超时时间
  29. .sessionTimeoutMs(5000)
  30. //服务器集群地址
  31. .connectString(ZOOKEEPER_SERVER_ADDRESS)
  32. //重试策略
  33. .retryPolicy(new ExponentialBackoffRetry(1000, 3))
  34. .build();
  35. //开启客户端
  36. curatorFramework.start();
  37. InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/distribute-lock");
  38. readLock = interProcessReadWriteLock.readLock();
  39. writeLock = interProcessReadWriteLock.writeLock();
  40. }
  41. public static void main(String[] args) {
  42. new Thread(() -> {
  43. try {
  44. writeLock.acquire();
  45. logger.info("线程:" + Thread.currentThread().getName() + "获取写锁...");
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. } finally {
  49. try {
  50. writeLock.release();
  51. logger.info("线程:" + Thread.currentThread().getName() + "释放写锁...");
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }, "T1").start();
  57. new Thread(() -> {
  58. try {
  59. writeLock.acquire();
  60. logger.info("线程:" + Thread.currentThread().getName() + "获取写锁...");
  61. } catch (Exception e) {
  62. e.printStackTrace();
  63. } finally {
  64. try {
  65. writeLock.release();
  66. logger.info("线程:" + Thread.currentThread().getName() + "释放写锁...");
  67. } catch (Exception e) {
  68. e.printStackTrace();
  69. }
  70. }
  71. }, "T2").start();
  72. }
  73. }
  1. 15:33:28.701 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1获取写锁...
  2. 15:33:28.706 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1释放写锁...
  3. 15:33:28.711 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2获取写锁...
  4. 15:33:28.717 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2释放写锁...

如上日志可见,两个线程同时尝试获取锁资源的时候,某个时刻只会有一个线程能获取成功,其他线程必须等待释放锁后才能尝试去获取锁。

四、总结

本文主要总结了curator分布式锁的实现原理,以及通过案例详解介绍了curator的可重入排它锁以及读写锁的使用,在分布式系统开发中,实现分布式锁的方式有很多种,如数据库方式、redis、zookeeper等,具体根据业务场景选择合适的即可。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/965714
推荐阅读
相关标签
  

闽ICP备14008679号