- package com.example.demo.util;
- import org.I0Itec.zkclient.ZkClient;
- import java.util.Comparator;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- public final class DLock
- {
- private ZkClient zkClient;
- private String lockName;
- private String thisReadLock;
- private String thisWriteLock;
- /**
- * 分布式锁连接zookeeper 以及 初始化锁主节点
- *
- * @param hostUrl zookeeper 连接url
- * @param lockName 锁主节点
- */
- public void connect(String hostUrl , String lockName)
- {
- this.lockName = lockName;
- zkClient = new ZkClient(hostUrl);
- if (!zkClient.exists(lockName))
- zkClient.createPersistent(lockName);
- }
- /**
- * 获取读锁
- */
- public void lockRead()
- {
- CountDownLatch readLatch = new CountDownLatch(1);
- // 创建此临时节点, 获取带有顺序号的完整节点
- String thisLockNodeBuilder = lockName + "/" + LockType.READ + "-";
- thisReadLock = zkClient.createEphemeralSequential(thisLockNodeBuilder , "");
- // 找到此读锁前一个写锁。 找到节点的子节点
- List<String> tmp_nodes = zkClient.getChildren(lockName);
- sortNodes(tmp_nodes);
- tmp_nodes.forEach(System.out::println);
- int tmp_index = 0;
- // 倒序循环,直到读锁前面所有的写锁释放,读锁才执行
- for (int i = tmp_nodes.size() - 1; i >= 0; i--)
- {
- if (thisReadLock.equals(lockName + "/" + tmp_nodes.get(i)))
- {
- tmp_index = i;
- } else if (i < tmp_index && tmp_nodes.get(i).split("-")[0].equals(LockType.WRITE.toString()))
- {
- // 找到当前读锁之前的一个写锁
- // 先监听此写锁(监听该写锁是否释放),再阻塞当前读锁
- zkClient.subscribeChildChanges(lockName + "/" + tmp_nodes.get(i) , (parentPath , currentChilds) -> readLatch.countDown());
- try
- {
- readLatch.await();
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- break;
- }
- }
- }
- /**
- * 释放读锁
- */
- public void unLockRead()
- {
- if (this.thisReadLock != null)
- {
- zkClient.delete(thisReadLock);
- thisReadLock = null;
- }
- }
- /**
- * 获取写锁
- */
- public void lockWrite()
- {
- CountDownLatch writeLatch = new CountDownLatch(1);
- // 创建此临时节点, 获取带有顺序号的完整节点
- String thisLockNodeBuilder = lockName + "/" + LockType.WRITE + "-";
- thisWriteLock = zkClient.createEphemeralSequential(thisLockNodeBuilder , "");
- List<String> tmp_nodes = zkClient.getChildren(lockName);
- sortNodes(tmp_nodes);
- // 倒序循环,直到拿到最前面的写锁,它释放了,后面的写锁才能执行
- for (int i = tmp_nodes.size() - 1; i >= 0; i--)
- {
- if (thisWriteLock.equals(lockName + "/" + tmp_nodes.get(i)))
- {
- // 在锁列表中找到此写锁
- if (i > 0)
- {
- // 如果此写锁前面还有锁
- // 监听前面的锁(监听前面的锁是否释放), 然后阻塞当前写锁获取
- zkClient.subscribeChildChanges(lockName + "/" + tmp_nodes.get(i - 1) , (parentPath , currentChilds) -> writeLatch.countDown());
- try
- {
- // 阻塞当前写锁获取
- writeLatch.await();
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- break;
- }
- }
- }
- }
- /**
- * 释放写锁
- */
- public void unLockWrite()
- {
- if (thisWriteLock != null)
- {
- zkClient.delete(thisWriteLock);
- thisWriteLock = null;
- }
- }
- /**
- * 重新对节点进行排序,按照顺序号排序
- *
- * @param nodes 临时节点
- */
- private void sortNodes(List<String> nodes)
- {
- nodes.sort(Comparator.comparing(o -> o.split("-")[1]));
- }
- /**
- * 锁类型枚举
- */
- private enum LockType
- {
- }
- }

- import com.example.demo.util.DLock;
- import org.springframework.beans.factory.annotation.Value;
- public class TestDLock {
- private static String hostUrl = "";
- public static void main(String[] args) {
- dLockClient1();
- dLockClient2();
- dLockClient3();
- dLockClient4();
- }
- public static void dLockClient1(){
- // 客户端1
- DLock lock=new DLock();
- lock.connect(hostUrl, "/lock");
- lock.lockRead();
- System.out.println("I am testNode 1");
- try
- {
- System.out.println("I am TestNode 1");
- System.out.println("睡眠10s 之后释放分布式读锁, 开始倒计时");
- for (int i = 0; i < 10; i++)
- {
- System.out.println(10-i);
- Thread.sleep(1000);
- }
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- lock.unLockRead();
- }
- public static void dLockClient2(){
- // 客户端2
- DLock lock=new DLock();
- lock.connect(hostUrl, "/lock");
- lock.lockRead();
- System.out.println("I am TestNode 2");
- lock.unLockRead();
- }
- public static void dLockClient3(){
- // 客户端3
- DLock lock=new DLock();
- lock.connect(hostUrl, "/lock");
- lock.lockWrite();
- System.out.println("I am testNode 3");
- System.out.println("睡眠10s 之后释放分布式写锁, 开始倒计时");
- for (int i = 0; i < 10; i++)
- {
- System.out.println(10-i);
- try
- {
- Thread.sleep(1000);
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- lock.unLockWrite();
- }
- public static void dLockClient4(){
- // 客户端4
- DLock lock=new DLock();
- lock.connect(hostUrl, "/lock");
- lock.lockRead();
- System.out.println("I am TestNode 4");
- lock.unLockRead();
- }
- }

- READ-0000000000
- I am testNode 1
- I am TestNode 1
- 睡眠10s 之后释放分布式读锁, 开始倒计时
- 10
- 9
- 8
- 7
- 6
- 5
- 4
- 3
- 2
- 1
- 17:44:29.643 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980000 after 12ms
- 17:44:29.804 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980000, packet:: clientPath:null serverPath:null finished:false header:: 5,2 replyHeader:: 5,30064771079,0 request:: '/lock/READ-0000000000,-1 response:: null
- 17:44:29.805 [main] DEBUG org.I0Itec.zkclient.ZkConnection - Creating new ZookKeeper instance to connect to
- 17:44:29.805 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString= sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@1e643faf
- 17:44:29.805 [ZkClient-EventThread-15-] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
- 17:44:29.809 [main] DEBUG org.I0Itec.zkclient.ZkClient - Awaiting connection to Zookeeper server
- 17:44:29.809 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
- 17:44:29.810 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server Will not attempt to authenticate using SASL (unknown error)
- 17:44:29.812 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to, initiating session
- 17:44:29.812 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on
- 17:44:29.827 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server, sessionid = 0x100026588980001, negotiated timeout = 30000
- 17:44:29.827 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Received event: WatchedEvent state:SyncConnected type:None path:null
- 17:44:29.827 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
- 17:44:29.828 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Leaving process event
- 17:44:29.828 [main] DEBUG org.I0Itec.zkclient.ZkClient - State is SyncConnected
- 17:44:29.838 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,30064771080,0 request:: '/lock,F response:: s{30064771077,30064771077,1607247859574,1607247859574,0,2,0,0,0,0,30064771079}
- 17:44:29.847 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 2,1 replyHeader:: 2,30064771081,0 request:: '/lock/READ-,#ffffffacffffffed057400,v{s{31,s{'world,'anyone}}},3 response:: '/lock/READ-0000000001
- 17:44:29.851 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 3,8 replyHeader:: 3,30064771081,0 request:: '/lock,F response:: v{'READ-0000000001}
- READ-0000000001
- I am TestNode 2
- 17:44:29.859 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 4,2 replyHeader:: 4,30064771082,0 request:: '/lock/READ-0000000001,-1 response:: null
- 17:44:29.860 [main] DEBUG org.I0Itec.zkclient.ZkConnection - Creating new ZookKeeper instance to connect to
- 17:44:29.860 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString= sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6e8dacdf
- 17:44:29.860 [ZkClient-EventThread-18-] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
- 17:44:29.862 [main] DEBUG org.I0Itec.zkclient.ZkClient - Awaiting connection to Zookeeper server
- 17:44:29.862 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
- 17:44:29.862 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server Will not attempt to authenticate using SASL (unknown error)
- 17:44:29.863 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to, initiating session
- 17:44:29.863 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on
- 17:44:29.870 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server, sessionid = 0x100026588980002, negotiated timeout = 30000
- 17:44:29.870 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Received event: WatchedEvent state:SyncConnected type:None path:null
- 17:44:29.870 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
- 17:44:29.871 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Leaving process event
- 17:44:29.871 [main] DEBUG org.I0Itec.zkclient.ZkClient - State is SyncConnected
- 17:44:29.875 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,30064771083,0 request:: '/lock,F response:: s{30064771077,30064771077,1607247859574,1607247859574,0,4,0,0,0,0,30064771082}
- 17:44:29.882 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 2,1 replyHeader:: 2,30064771084,0 request:: '/lock/WRITE-,#ffffffacffffffed057400,v{s{31,s{'world,'anyone}}},3 response:: '/lock/WRITE-0000000002
- 17:44:29.885 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 3,8 replyHeader:: 3,30064771084,0 request:: '/lock,F response:: v{'WRITE-0000000002}
- I am testNode 3
- 睡眠10s 之后释放分布式写锁, 开始倒计时
- 10
- 9
- 8
- 7
- 6
- 5
- 4
- 3
- 2
- 1
- 17:44:39.790 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980000 after 5ms
- 17:44:39.864 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980001 after 3ms
- 17:44:39.898 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980002 after 5ms
- 17:44:39.992 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 4,2 replyHeader:: 4,30064771085,0 request:: '/lock/WRITE-0000000002,-1 response:: null
- 17:44:39.993 [main] DEBUG org.I0Itec.zkclient.ZkConnection - Creating new ZookKeeper instance to connect to
- 17:44:39.994 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString= sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@7a79be86
- 17:44:39.994 [ZkClient-EventThread-21-] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
- 17:44:40.002 [main] DEBUG org.I0Itec.zkclient.ZkClient - Awaiting connection to Zookeeper server
- 17:44:40.002 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
- 17:44:40.003 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server Will not attempt to authenticate using SASL (unknown error)
- 17:44:40.005 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to, initiating session
- 17:44:40.005 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on
- 17:44:40.021 [main-SendThread(] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server, sessionid = 0x100026588980003, negotiated timeout = 30000
- 17:44:40.021 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Received event: WatchedEvent state:SyncConnected type:None path:null
- 17:44:40.021 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
- 17:44:40.021 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Leaving process event
- 17:44:40.022 [main] DEBUG org.I0Itec.zkclient.ZkClient - State is SyncConnected
- 17:44:40.032 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,30064771086,0 request:: '/lock,F response:: s{30064771077,30064771077,1607247859574,1607247859574,0,6,0,0,0,0,30064771085}
- 17:44:40.042 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 2,1 replyHeader:: 2,30064771087,0 request:: '/lock/READ-,#ffffffacffffffed057400,v{s{31,s{'world,'anyone}}},3 response:: '/lock/READ-0000000003
- 17:44:40.046 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 3,8 replyHeader:: 3,30064771087,0 request:: '/lock,F response:: v{'READ-0000000003}
- READ-0000000003
- I am TestNode 4
- 17:44:40.052 [main-SendThread(] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 4,2 replyHeader:: 4,30064771088,0 request:: '/lock/READ-0000000003,-1 response:: null
- Process finished with exit code 0

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。