赞
踩
什么叫做分布式锁呢?比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
代码中的注释我已经写的很详细了。
这其中用到了JUC中的CountDownLatch,可以参考:https://blog.csdn.net/weixin_43823808/article/details/120799251
- package com.szh.case2;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
-
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.Objects;
- import java.util.concurrent.CountDownLatch;
-
- /**
- *
- */
- public class DistributedZkLock {
-
- private final String connectString = "192.168.40.130:2181";
- private final int sessionTimeout = 30000;
- private final ZooKeeper zk;
-
- private CountDownLatch connectLatch = new CountDownLatch(1);
- private CountDownLatch waitLatch = new CountDownLatch(1);
-
- private String currentNode;
- private String waitPath;
-
- public DistributedZkLock() throws IOException, InterruptedException, KeeperException {
- //获取zk连接
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- //如果连接上zk,则connectLatch可以释放
- if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
- connectLatch.countDown();
- }
- //如果监听的节点已被删除,同时当前监听的节点路径与即将被监听节点的前一个节点路径相同,则waitLatch可以释放
- if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
- waitLatch.countDown();
- }
- }
- });
- //等待zk连接成功之后,程序则继续往下走,其他线程进入等待连接的状态
- connectLatch.await();
- //判断根节点 /locks 是否存在
- Stat stat = zk.exists("/locks", false);
- //如果根节点 /locks 不存在,则立马创建
- if (Objects.isNull(stat)) {
- //此 /locks 节点默认所有人均可访问,而且是永久性节点
- zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
- //加锁
- public void zkLock() {
- try {
- //所谓加锁,就是在根节点/locks下创建对应的临时、带序号的节点
- currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
- //睡一会,让结果更清晰
- Thread.sleep(100);
- //判断创建的节点是否是序号最小的节点,如果是,则获取到锁;如果不是,则监听它序号的前一个节点
- List<String> children = zk.getChildren("/locks", false);
- if (children.size() == 1) { //只有一个节点,则直接获取锁
- return;
- } else { //如果有多个节点,则需要判断谁的序号最小
- //先对获取的节点的list集合排序,确保从小到大的顺序
- Collections.sort(children);
- //获取节点名称 seq-00000000
- String thisNode = currentNode.substring("/locks/".length());
- //通过节点名称获取到它在list集合中的位置
- int index = children.indexOf(thisNode);
- if (index == -1) { //没数据,无意义
- System.out.println("数据异常....");
- } else if (index == 0) { //说明此节点处于第一个位置,可以获取锁
- return;
- } else { //非第一个位置,需要监听前一个节点的变化
- //获取该节点序号的前一个节点
- waitPath = "/locks/" + children.get(index - 1);
- //监听,回调Watch的process方法
- zk.getData(waitPath, true, new Stat());
- //其他线程进入等待锁的状态
- waitLatch.await();
- return;
- }
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- //解锁
- public void zkUnLock() {
- //删除节点即解锁
- try {
- zk.delete(this.currentNode, -1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
- }
下面是针对上面的一些方法的测试。
- package com.szh.case2;
-
- import org.apache.zookeeper.KeeperException;
-
- import java.io.IOException;
- import java.util.concurrent.TimeUnit;
-
- /**
- *
- */
- public class DistributedZkLockTest {
-
- public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
- //创建分布式锁1
- final DistributedZkLock lock1 = new DistributedZkLock();
- //创建分布式锁2
- final DistributedZkLock lock2 = new DistributedZkLock();
-
- //如下创建两个线程,模拟获取分布式锁的过程
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock1.zkLock();
- System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁....");
- TimeUnit.MILLISECONDS.sleep(3000);
- lock1.zkUnLock();
- System.out.println(Thread.currentThread().getName() + "已释放锁....");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock2.zkLock();
- System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁....");
- TimeUnit.MILLISECONDS.sleep(3000);
- lock2.zkUnLock();
- System.out.println(Thread.currentThread().getName() + "已释放锁....");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- }
那么上面是我们自己手写的加锁、解锁的一些方法,其中也存在着很多问题。
(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch(2)Watch 需要重复注册,不然就不能生效(3)开发的复杂性还是比较高的(4)不支持多节点删除和创建。需要自己去递归所以就引出了下面的案例:声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/997522推荐阅读
相关标签
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。