当前位置:   article > 正文

ZooKeeper实现分布式锁

ZooKeeper实现分布式锁

1、基于ZooKeeper基本API实现

pom.xml
  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.5.7</version>
  5. </dependency>
  1. import org.apache.zookeeper.*;
  2. import org.apache.zookeeper.data.Stat;
  3. import java.io.IOException;
  4. import java.util.Collections;
  5. import java.util.List;
  6. import java.util.concurrent.CountDownLatch;
  7. public class DistributeLock {
  8. private final String connectString = "127.0.0.1:2181";
  9. private final int sessionTimeout = 2000;
  10. private final ZooKeeper zk;
  11. private CountDownLatch connectLatch = new CountDownLatch(1);
  12. private CountDownLatch waitLatch = new CountDownLatch(1);
  13. private String waitPath;
  14. private String currentNode;
  15. public DistributeLock() throws IOException, InterruptedException, KeeperException {
  16. //获取连接
  17. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  18. @Override
  19. public void process(WatchedEvent event) {
  20. //connectLatch 如果连接上zk,释放
  21. if(event.getState() == Event.KeeperState.SyncConnected) {
  22. connectLatch.countDown();
  23. }
  24. // waitLatch 前一个节点释放锁删除后,释放
  25. if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
  26. waitLatch.countDown();
  27. }
  28. }
  29. });
  30. //等待zk正常连接后,再往下执行程序
  31. connectLatch.await();
  32. // 判断根节点(/locks)是否存在
  33. Stat stat = zk.exists("/locks", false);
  34. if (null == stat) {
  35. //创建根节点
  36. zk.create("/locks", "locking".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  37. }
  38. }
  39. //加锁
  40. public void zkLock() {
  41. //创建临时节点
  42. try {
  43. currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  44. //判断创建的节点是否是最小的序号节点,如果是则获取到锁,如果不是则监听它序号前一个节点
  45. List<String> children = zk.getChildren("/locks", false);
  46. //如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小
  47. if(children.size() == 1) {
  48. return;
  49. } else {
  50. Collections.sort(children);
  51. //获得节点名称 seq-00000000
  52. String thisNode = currentNode.substring("/locks/".length());
  53. int index = children.indexOf(thisNode);
  54. if(index == -1) {
  55. System.out.println("数据异常");
  56. } else if (index == 0) {
  57. //就一个节点,直接获取锁
  58. return;
  59. } else {
  60. //需要监听它前一个节点变化
  61. waitPath = "/locks/" + children.get(index - 1);
  62. zk.getData(waitPath, true, null);
  63. //等待监听
  64. waitLatch.await();
  65. }
  66. }
  67. } catch (KeeperException e) {
  68. e.printStackTrace();
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. //解锁
  74. public void unZkLock() {
  75. try {
  76. zk.delete(currentNode, -1);
  77. } catch (InterruptedException e) {
  78. e.printStackTrace();
  79. } catch (KeeperException e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. }
测试:
  1. import org.apache.zookeeper.KeeperException;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeUnit;
  4. public class DistributeLockTest {
  5. public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
  6. final DistributeLock lock1 = new DistributeLock();
  7. final DistributeLock lock2 = new DistributeLock();
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. lock1.zkLock();
  13. System.out.println(Thread.currentThread().getName()+"==>获得锁");
  14. TimeUnit.SECONDS.sleep(10);
  15. lock1.unZkLock();
  16. System.out.println(Thread.currentThread().getName()+"==>释放锁");
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }, "线程1").start();
  22. new Thread(new Runnable() {
  23. @Override
  24. public void run() {
  25. try {
  26. lock2.zkLock();
  27. System.out.println(Thread.currentThread().getName()+"==>获得锁");
  28. TimeUnit.SECONDS.sleep(10);
  29. lock2.unZkLock();
  30. System.out.println(Thread.currentThread().getName()+"==>释放锁");
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }, "线程2").start();
  36. }
  37. }

2、基于框架curator实现

pom.xml
  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-framework</artifactId>
  4. <version>4.3.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-recipes</artifactId>
  9. <version>4.3.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.curator</groupId>
  13. <artifactId>curator-client</artifactId>
  14. <version>4.3.0</version>
  15. </dependency>
  1. import org.apache.curator.framework.CuratorFramework;
  2. import org.apache.curator.framework.CuratorFrameworkFactory;
  3. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  4. import org.apache.curator.retry.ExponentialBackoffRetry;
  5. import java.util.concurrent.TimeUnit;
  6. public class CuratorLock {
  7. public static void main(String[] args) {
  8. //创建分布式锁1
  9. final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
  10. //创建分布式锁2
  11. final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. lock1.acquire();
  17. System.out.println(Thread.currentThread().getName()+"==>获得锁");
  18. lock1.acquire();
  19. System.out.println(Thread.currentThread().getName()+"==>再次获得锁");
  20. TimeUnit.SECONDS.sleep(10);
  21. lock1.release();
  22. System.out.println(Thread.currentThread().getName()+"==>释放锁");
  23. lock1.release();
  24. System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }, "线程1").start();
  30. new Thread(new Runnable() {
  31. @Override
  32. public void run() {
  33. try {
  34. lock2.acquire();
  35. System.out.println(Thread.currentThread().getName()+"==>获得锁");
  36. lock2.acquire();
  37. System.out.println(Thread.currentThread().getName()+"==>再次获得锁");
  38. TimeUnit.SECONDS.sleep(8);
  39. lock2.release();
  40. System.out.println(Thread.currentThread().getName()+"==>释放锁");
  41. lock2.release();
  42. System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }, "线程2").start();
  48. }
  49. private static CuratorFramework getCuratorFramework() {
  50. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);
  51. CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
  52. .connectionTimeoutMs(2000)
  53. .sessionTimeoutMs(2000)
  54. .retryPolicy(retryPolicy).build();
  55. //启动客户端
  56. client.start();
  57. System.out.println("zookeeper启动成功");
  58. return client;
  59. }
  60. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/997523
推荐阅读
相关标签
  

闽ICP备14008679号