赞
踩
优质博文:IT-BLOG-CN
随着公司业务的发展,单机应用已经无法支撑现有的用户量,之前采用synchronized
和Lock
锁已经无法满足分布式系统的要求。我们应用程序目前都会运行120台,节假日会扩容至240台,属于多JVM
环境。所以需要搭建一套独立的zk
集群或者Redisson
集群提供分布式锁的功能。
ZK
的特点是:文件系统 + 通知机制
分布式的思想:客户端获取锁时会创建一个临时文件,使用完锁之后删除该临时文件。
客户端向ZK
尝试获取锁,ZK
会在/lock
节点下创建一个临时顺序节点,假设有三个客户端向ZK
获取锁,会同时在ZK
的/lock
节点下根据请求的顺序创建3个临时顺序节点:/lock/xxx1
、/lock/xxx2
、/lock/xxx3
,这个顺序节点由ZK
内部自行维护一个节点序号。
【1】第一个客户端向客户端向ZK
尝试获取锁,ZK
内部会创建一个顺序节点,我们通过Curator
框架会得到如下一个临时文件:最后的数字1就是当前客户端请求的顺序。随后,客户端A会通过getChildern()
方法查找lock
下的所有节点,这个时候会拿到一个顺序集合,当客户端A发现自己创建的顺序节点排在第一位,返回true
表示加锁成功,开始业务处理。
【2】第二个客户端B向ZK
尝试获取锁,ZK
内部同样会创建一个顺序节点xxx2
,随后,客户端B通过getChildern()
方法查找lock
下的所有节点,这个时候会拿到一个顺序集合。
[
"_d_0asdf9sd-3df6-ak84-blsc9832ld0x-lock-0000000001",
"_d_0asdf9sd-3df6-ak84-blsc9832ld0x-lock-0000000002"
]
客户端B判断自己创建的临时顺序节点是不是最小序号节点,发现不是加锁失败。加锁失败后,客户端B就会通过watcher
监控上一个顺序节点。
【3】客户端A处理完逻辑后释放锁或者session
超时后会删除自己创建的临时节点。ZK
监听器会收到通知客户端B该节点已删除,也就是说客户端A已经释放了锁。客户端B会重新尝试获取锁,重新获取临时节点集合,并检查自己的临时文件是否为最小的序号。是则加锁成功,执行业务逻辑代码。
如果服务器宕机,ZK会自动删除对应的顺序节点
通过Curator
框架获取和释放zk
分布式锁的代码如下:
// 定义锁节点名称
InterProcessMutex lock = InterProcessMutex(client, "/lock")
// 加锁
lock.acquire();
// 释放锁
lock.release();
在Curator
中,尝试获取锁的具体实现在LockInternals.attemptLock
方法:
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone ) {
isDone = true;
try {
// 创建临时顺序节点,并返回创建节点的路径
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 获取所有临时节点,并判断自己创建的临时节点是否为最小序号
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
} catch ( KeeperException.NoNodeException e ) {
// 在找不到锁节点时由StandardLockInternalsDriver引发
// 可能发生在会话过期等情况下。因此,如果重试允许,只需重试
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
isDone = false;
} else {
throw e;
}
}
}
if ( hasTheLock ) {
return ourPath;
}
return null;
}
进入LockInternals.internalLockLoop
方法,通过getChildren()
获取到所有顺序节点,判断当前创建的节点是否是最小序号节点,如果是则获取锁成功,否则获取锁失败;如果获取锁失败,则通过watcher
监听前一个顺序节点的节点变化,如果收到watcher
监听回调,则再次进入循环,通过 getChildren()
重新判断是否能够获取到锁:
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
...
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
// 得到排序好的临时顺序节点列表
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 判断是否能够成功获取锁,在获取锁失败的情况下,会同时返回需要watch的前一个顺序节点路径
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
haveTheLock = true;
} else {
// 获取锁失败,开始监听前一个顺序节点的节点变化,并等待超时或者watcher监听回调
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
// 使用getData()而不是exists()可以避免留下不必要的观察程序,这是一种资源泄漏
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ) {
doDelete = true; // 超时-删除我们的节点
break;
}
// 等待超时或者收到watcher回调,如果收到回调,则会再次进入循环判断是否能够获取到锁
wait(millisToWait);
} else {
// 没有传递超时时间的情况下,会一直等待直到watcher回调或者触发异常
wait();
}
} catch ( KeeperException.NoNodeException e ) {
// 它已被删除(即锁定已释放)。尝试再次获取
}
}
}
}
} catch ( Exception e ) {
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
} finally {
if ( doDelete ) {
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
在StandardLockInternalsDriver.getsTheLock()
方法中判断是否能够获取到锁:
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
// 获取到当前客户端创建的节点在所有顺序节点中的index
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
// 可重入锁的场景下,maxLeases固定为1,所以只有当ourIndex==0时能够获取到锁(当前节点是第一个顺序节点)
boolean getsTheLock = ourIndex < maxLeases;
// 判断是否能获取到锁,如果获取不到,则取到前一个顺序节点的路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
在watcher
收到监听回调时,通过LockInternals.notifyFromWatcher
方法唤醒正在wait
的线程:
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
notifyFromWatcher();
}
};
...
private synchronized void notifyFromWatcher() {
notifyAll();
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。