当前位置:   article > 正文

zookeeper实现分布式锁

zookeeper实现分布式锁

前言

zookeeper的设计初衷,就是为了协调分布式服务,因此利用zookeeper来解决分布式锁的问题是一种最为简单的实现

1 原理

利用zookeeper的顺序临时节点的特性来实现

1.1 获取锁

  • 首先,在zookeeper当中创建一个父节点 /testLock
  • 当第一个客户端C1想要获取锁时,会先在父节点下创建一个临时顺序节点N1; 之后,C1会查找父节点下的所有的临时顺序节点并排序,判断自己所创建的节点N1是不是最小的(最靠前的); 如果是,则成功获得锁;
  • 这时候,如果再有一个客户端C2前来获取锁,也会在父节点下再创建一个临时顺序节点N2; 之后,C2也会查找父节点下面所有的临时顺序节点并排序,判断自己所创建的节点N2是不是最小的; 结果发现节点N2不是最小的。 于是,C2向排序仅在它前面的一个节点N1注册一个watcher,用于监听N1节点是否存在; C2抢锁失败,进入了等待监听状态;
  • 这时候,如果再有客户端C3前来获取锁,也会在父节点下再创建一个临时顺序节点N3; 之后,C3也会查找父节点下面所有的临时顺序节点并排序,判断自己所创建的节点N3是不是最小的; 结果发现节点N3不是最小的。 于是,C3向排序仅在它前面的一个节点N2注册一个watcher,用于监听N2节点是否存在; C3抢锁失败,进入了等待监听状态;
  • 最终各个节点形成一个类似于队列的模型来有序地监听并获取锁。

1.2 释放锁

客户端释放锁的两种情况:

任务执行完,客户端主动释放锁

当前获取锁的客户端C1在任务执行完成后,主动调用delete删除临时节点N1。

任务执行中,客户端崩溃被动解锁

当前获取锁的客户端C1在任务执行过程中崩溃,则会断开与zookeeper服务端的链接;因为是临时节点,所以与该客户端相关联的节点N1也会随之被自动删除;
由于C2注册有watcher一直监听着N1的存在,当N1节点被删除后,C2客户端会立刻收到通知;这时C2会再次查询父节点下面的所有节点并排序,确认自己当前的节点N2是不是最小的,如果是最小,则C2成功获取锁;
同理,之后排队等待的客户端也依次获取锁执行任务。

2 原生API实现

使用zookeeper的客户端api进行原始代码实现

2.1 zk连接工具类

public class ZKUtils {
   
    private static ZooKeeper zooKeeper;
    // 这里使用的zk集群,也可以使用单个,在连接后可添加一个路径作为父目录(该目录需要在测试之前手动在zk中创建)
    private static String address = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testLock";
    // 默认的watcher
    private static DefaultWatch defaultWatch = new DefaultWatch();
    // 线程锁(阻塞程序,连接成功后释放)
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() {
   
        try {
   
            zooKeeper = new ZooKeeper(address, 1000, defaultWatch);
            // 传递countDownLatch到defaultWatch
            defaultWatch.setCountDownLatch(countDownLatch);
            // 堵塞等待,成功连接后释放
            countDownLatch.await();
        } catch (Exception e) {
   
            e.printStackTrace();
        }
        return zooKeeper;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2.2 默认监听watcher

在zookeeper客户端建立连接时使用的默认监听,实现自Watcher接口

public class DefaultWatch implements Watcher {
   
	// 在zk连接下进行入参初始化countDownLatch
    private CountDownLatch countDownLatch;

    public void setCountDownLatch(CountDownLatch countDownLatch) {
   
        this.countDownLatch = countDownLatch;
    }
    // 监听
    @Override
    public void process(WatchedEvent event) {
   
        System.out.println(event.toString());
        switch (event.getState()) {
   
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                // 建立连接后释放
                countDownLatch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2.3 锁具体实现

加锁、解锁的具体操作实现,需要实现的回调接口:

  • Watcher:用于节点间的监听,监听事件类型NodeDeleted;
  • AsyncCallback.StringCallback:创建节点时的异步回调create;
  • AsyncCallback.Children2Callback:获取子节点时的异步回调getChildren;
public class LockWatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
   
	// 入参初始化
    private ZooKeeper zooKeeper;
    // 入参初始化,当前操作的线程名
    private String threadName;
    // 阻塞操作
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    // 创建的临时节点路径,类似于  /lock0000000001
    private String pathName = "";

    public String getPathName() {
   
        return pathName;
    }

    public void setPathName(String pathName) {
   
        this.pathName = pathName;
    }

    public String getThreadName() {
   
        return threadName;
    }

    public void setThreadName(String threadName) {
   
        this.threadName = threadName;
    }

    public ZooKeeper getZooKeeper() {
   
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
   
        this.zooKeeper = zooKeeper;
    }

    // 获取锁操作tryLock
    public void tryLock() {
   
        // 判断:根数据==线程名,进行锁的重入(每重入一次,锁标志位加1)
        String[] str = getRootData().</
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/634965
推荐阅读
相关标签
  

闽ICP备14008679号