赞
踩
zookeeper的设计初衷,就是为了协调分布式服务,因此利用zookeeper来解决分布式锁的问题是一种最为简单的实现
利用zookeeper的顺序临时节点的特性来实现
/testLock
;客户端释放锁的两种情况:
任务执行完,客户端主动释放锁
当前获取锁的客户端C1在任务执行完成后,主动调用delete删除临时节点N1。
任务执行中,客户端崩溃被动解锁
当前获取锁的客户端C1在任务执行过程中崩溃,则会断开与zookeeper服务端的链接;因为是临时节点,所以与该客户端相关联的节点N1也会随之被自动删除;
由于C2注册有watcher一直监听着N1的存在,当N1节点被删除后,C2客户端会立刻收到通知;这时C2会再次查询父节点下面的所有节点并排序,确认自己当前的节点N2是不是最小的,如果是最小,则C2成功获取锁;
同理,之后排队等待的客户端也依次获取锁执行任务。
使用zookeeper的客户端api进行原始代码实现
public class ZKUtils { private static ZooKeeper zooKeeper; // 这里使用的zk集群,也可以使用单个,在连接后可添加一个路径作为父目录(该目录需要在测试之前手动在zk中创建) private static String address = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testLock"; // 默认的watcher private static DefaultWatch defaultWatch = new DefaultWatch(); // 线程锁(阻塞程序,连接成功后释放) private static CountDownLatch countDownLatch = new CountDownLatch(1); public static ZooKeeper getZooKeeper() { try { zooKeeper = new ZooKeeper(address, 1000, defaultWatch); // 传递countDownLatch到defaultWatch defaultWatch.setCountDownLatch(countDownLatch); // 堵塞等待,成功连接后释放 countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } return zooKeeper; } }
在zookeeper客户端建立连接时使用的默认监听,实现自Watcher接口
public class DefaultWatch implements Watcher { // 在zk连接下进行入参初始化countDownLatch private CountDownLatch countDownLatch; public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } // 监听 @Override public void process(WatchedEvent event) { System.out.println(event.toString()); switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: // 建立连接后释放 countDownLatch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } } }
加锁、解锁的具体操作实现,需要实现的回调接口:
public class LockWatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback { // 入参初始化 private ZooKeeper zooKeeper; // 入参初始化,当前操作的线程名 private String threadName; // 阻塞操作 private CountDownLatch countDownLatch = new CountDownLatch(1); // 创建的临时节点路径,类似于 /lock0000000001 private String pathName = ""; public String getPathName() { return pathName; } public void setPathName(String pathName) { this.pathName = pathName; } public String getThreadName() { return threadName; } public void setThreadName(String threadName) { this.threadName = threadName; } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // 获取锁操作tryLock public void tryLock() { // 判断:根数据==线程名,进行锁的重入(每重入一次,锁标志位加1) String[] str = getRootData().</
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。