赞
踩
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.7</version>
- </dependency>
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
-
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- public class DistributeLock {
- private final String connectString = "127.0.0.1:2181";
- private final int sessionTimeout = 2000;
- private final ZooKeeper zk;
-
- private CountDownLatch connectLatch = new CountDownLatch(1);
- private CountDownLatch waitLatch = new CountDownLatch(1);
-
- private String waitPath;
- private String currentNode;
-
- public DistributeLock() throws IOException, InterruptedException, KeeperException {
- //获取连接
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- //connectLatch 如果连接上zk,释放
- if(event.getState() == Event.KeeperState.SyncConnected) {
- connectLatch.countDown();
- }
-
- // waitLatch 前一个节点释放锁删除后,释放
- if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
- waitLatch.countDown();
- }
- }
- });
-
- //等待zk正常连接后,再往下执行程序
- connectLatch.await();
-
- // 判断根节点(/locks)是否存在
- Stat stat = zk.exists("/locks", false);
- if (null == stat) {
- //创建根节点
- zk.create("/locks", "locking".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
- //加锁
- public void zkLock() {
- //创建临时节点
- try {
- currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- //判断创建的节点是否是最小的序号节点,如果是则获取到锁,如果不是则监听它序号前一个节点
- List<String> children = zk.getChildren("/locks", false);
- //如果children只有一个值,那就直接获取锁;如果有多个节点,需要判断,谁最小
- if(children.size() == 1) {
- return;
- } else {
- Collections.sort(children);
-
- //获得节点名称 seq-00000000
- String thisNode = currentNode.substring("/locks/".length());
- int index = children.indexOf(thisNode);
-
- if(index == -1) {
- System.out.println("数据异常");
- } else if (index == 0) {
- //就一个节点,直接获取锁
- return;
- } else {
- //需要监听它前一个节点变化
- waitPath = "/locks/" + children.get(index - 1);
- zk.getData(waitPath, true, null);
- //等待监听
- waitLatch.await();
- }
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- //解锁
- public void unZkLock() {
- try {
- zk.delete(currentNode, -1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
- }
- import org.apache.zookeeper.KeeperException;
-
- import java.io.IOException;
- import java.util.concurrent.TimeUnit;
-
- public class DistributeLockTest {
- public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
- final DistributeLock lock1 = new DistributeLock();
- final DistributeLock lock2 = new DistributeLock();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock1.zkLock();
- System.out.println(Thread.currentThread().getName()+"==>获得锁");
-
- TimeUnit.SECONDS.sleep(10);
-
- lock1.unZkLock();
- System.out.println(Thread.currentThread().getName()+"==>释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "线程1").start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock2.zkLock();
- System.out.println(Thread.currentThread().getName()+"==>获得锁");
-
- TimeUnit.SECONDS.sleep(10);
-
- lock2.unZkLock();
- System.out.println(Thread.currentThread().getName()+"==>释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "线程2").start();
- }
- }
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>4.3.0</version>
- </dependency>
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- import java.util.concurrent.TimeUnit;
-
- public class CuratorLock {
- public static void main(String[] args) {
- //创建分布式锁1
- final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- //创建分布式锁2
- final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock1.acquire();
- System.out.println(Thread.currentThread().getName()+"==>获得锁");
-
- lock1.acquire();
- System.out.println(Thread.currentThread().getName()+"==>再次获得锁");
-
- TimeUnit.SECONDS.sleep(10);
-
- lock1.release();
- System.out.println(Thread.currentThread().getName()+"==>释放锁");
-
- lock1.release();
- System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "线程1").start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock2.acquire();
- System.out.println(Thread.currentThread().getName()+"==>获得锁");
-
- lock2.acquire();
- System.out.println(Thread.currentThread().getName()+"==>再次获得锁");
-
- TimeUnit.SECONDS.sleep(8);
-
- lock2.release();
- System.out.println(Thread.currentThread().getName()+"==>释放锁");
-
- lock2.release();
- System.out.println(Thread.currentThread().getName()+"==>再次释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "线程2").start();
-
- }
-
- private static CuratorFramework getCuratorFramework() {
- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 3);
-
- CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
- .connectionTimeoutMs(2000)
- .sessionTimeoutMs(2000)
- .retryPolicy(retryPolicy).build();
-
- //启动客户端
- client.start();
-
- System.out.println("zookeeper启动成功");
-
- return client;
-
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。