当前位置:   article > 正文

Zookeeper 分布式锁_zk 分布式锁代码实现

zk 分布式锁代码实现

优质博文:IT-BLOG-CN

一、简介

随着公司业务的发展,单机应用已经无法支撑现有的用户量,之前采用synchronizedLock锁已经无法满足分布式系统的要求。我们应用程序目前都会运行120台,节假日会扩容至240台,属于多JVM环境。所以需要搭建一套独立的zk集群或者Redisson集群提供分布式锁的功能。

二、ZK分布式锁实现

ZK的特点是:文件系统 + 通知机制

分布式的思想:客户端获取锁时会创建一个临时文件,使用完锁之后删除该临时文件。

客户端向ZK尝试获取锁,ZK会在/lock节点下创建一个临时顺序节点,假设有三个客户端向ZK获取锁,会同时在ZK/lock节点下根据请求的顺序创建3个临时顺序节点:/lock/xxx1/lock/xxx2/lock/xxx3,这个顺序节点由ZK内部自行维护一个节点序号。

案例

【1】第一个客户端向客户端向ZK尝试获取锁,ZK内部会创建一个顺序节点,我们通过Curator框架会得到如下一个临时文件:最后的数字1就是当前客户端请求的顺序。随后,客户端A会通过getChildern()方法查找lock下的所有节点,这个时候会拿到一个顺序集合,当客户端A发现自己创建的顺序节点排在第一位,返回true表示加锁成功,开始业务处理。


【2】第二个客户端B向ZK尝试获取锁,ZK内部同样会创建一个顺序节点xxx2,随后,客户端B通过getChildern()方法查找lock下的所有节点,这个时候会拿到一个顺序集合。

[
    "_d_0asdf9sd-3df6-ak84-blsc9832ld0x-lock-0000000001",
    "_d_0asdf9sd-3df6-ak84-blsc9832ld0x-lock-0000000002"
]
  • 1
  • 2
  • 3
  • 4

客户端B判断自己创建的临时顺序节点是不是最小序号节点,发现不是加锁失败。加锁失败后,客户端B就会通过watcher监控上一个顺序节点。


【3】客户端A处理完逻辑后释放锁或者session超时后会删除自己创建的临时节点ZK监听器会收到通知客户端B该节点已删除,也就是说客户端A已经释放了锁。客户端B会重新尝试获取锁,重新获取临时节点集合,并检查自己的临时文件是否为最小的序号。是则加锁成功,执行业务逻辑代码。

如果服务器宕机,ZK会自动删除对应的顺序节点

三、Curator框架

通过Curator框架获取和释放zk分布式锁的代码如下:

// 定义锁节点名称
InterProcessMutex lock = InterProcessMutex(client, "/lock")

// 加锁
lock.acquire();

// 释放锁
lock.release();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

源码分析

Curator中,尝试获取锁的具体实现在LockInternals.attemptLock方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long startMillis = System.currentTimeMillis();
    final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int retryCount = 0;
    String ourPath = null;
    boolean hasTheLock = false;
    boolean isDone = false;
    while ( !isDone ) {
        isDone = true;

        try {
            // 创建临时顺序节点,并返回创建节点的路径
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 获取所有临时节点,并判断自己创建的临时节点是否为最小序号
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        } catch ( KeeperException.NoNodeException e ) {
            // 在找不到锁节点时由StandardLockInternalsDriver引发 
            // 可能发生在会话过期等情况下。因此,如果重试允许,只需重试 
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
                isDone = false;
            } else {
                throw e;
            }
        }
    }
 
    if ( hasTheLock ) {
        return ourPath;
    }
 
    return null;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

进入LockInternals.internalLockLoop方法,通过getChildren()获取到所有顺序节点,判断当前创建的节点是否是最小序号节点,如果是则获取锁成功,否则获取锁失败;如果获取锁失败,则通过watcher监听前一个顺序节点的节点变化,如果收到watcher监听回调,则再次进入循环,通过 getChildren()重新判断是否能够获取到锁:

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
    boolean haveTheLock = false;
    boolean doDelete = false;
    try {
        ...
 
        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
            // 得到排序好的临时顺序节点列表
            List<String> children = getSortedChildren();
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
 
            // 判断是否能够成功获取锁,在获取锁失败的情况下,会同时返回需要watch的前一个顺序节点路径
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if ( predicateResults.getsTheLock() ) {
                haveTheLock = true;
            } else {
                // 获取锁失败,开始监听前一个顺序节点的节点变化,并等待超时或者watcher监听回调
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
 
                synchronized(this) {
                    try {
                        // 使用getData()而不是exists()可以避免留下不必要的观察程序,这是一种资源泄漏
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null ) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 ) {
                                doDelete = true;    // 超时-删除我们的节点
                                break;
                            }
 
                            // 等待超时或者收到watcher回调,如果收到回调,则会再次进入循环判断是否能够获取到锁
                            wait(millisToWait);
                        } else {
                            // 没有传递超时时间的情况下,会一直等待直到watcher回调或者触发异常
                            wait();
                        }
                    } catch ( KeeperException.NoNodeException e ) {
                        //  它已被删除(即锁定已释放)。尝试再次获取 
                    }
                }
            }
        }
    } catch ( Exception e ) {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    } finally {
        if ( doDelete ) {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

StandardLockInternalsDriver.getsTheLock()方法中判断是否能够获取到锁:

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
    // 获取到当前客户端创建的节点在所有顺序节点中的index
    int ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);
 
    // 可重入锁的场景下,maxLeases固定为1,所以只有当ourIndex==0时能够获取到锁(当前节点是第一个顺序节点)
    boolean getsTheLock = ourIndex < maxLeases;
    // 判断是否能获取到锁,如果获取不到,则取到前一个顺序节点的路径
    String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
 
    return new PredicateResults(pathToWatch, getsTheLock);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

watcher收到监听回调时,通过LockInternals.notifyFromWatcher方法唤醒正在wait的线程:

private final Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        notifyFromWatcher();
    }
};
...
private synchronized void notifyFromWatcher() {
    notifyAll();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/605871
推荐阅读
相关标签
  

闽ICP备14008679号