赞
踩
比如说:“进程一” 在使用该资源的时候,会先去获得锁,“进程一”获得锁之后会对资源保持独占,这样其他进程就无法访问该资源,”进程一“用完该资源之后就会将锁释放,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。我们把在分布式环境下的锁称之为
分布式锁
为什么要借助 zookeeper 实现分布式锁,就是利用 zookeeper的原理,zookeeper 提供了
监听
存储在zk内部数据的功能,从而可以达到基于数据的集群管理。同时 zk 可以创建临时带有序号
节点,且 zk 会保证节点的全局有序性。利用以上两个特性,可以利用 zk 实现分布式锁。
注意:这里的每一个节点都是监听它的上一个节点,而不是监听序号最小的节点。因为临时节点是带有序号的,而且序号不会回退,所以只需要监听比其小 1 的节点。只要比他小 1 的节点被删除,则它就可以拿到锁去操作资源
分布式锁实现
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; /** * zookeeper 实现分布式锁 */ public class DistributedLock { private int sessionTimeout = 2000; private String connectString = "node1:2181,node2:2181,node3:2181"; private ZooKeeper zooKeeper ; private String waitPath ; private CountDownLatch countDownLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1); private String currentNode; public DistributedLock() throws IOException, InterruptedException, KeeperException { //1、建立连接 zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher -> { //监听节点 //释放 countDownLatch 当zookeeper连接成功后,需要释放 if (watcher.getState() == Watcher.Event.KeeperState.SyncConnected){ countDownLatch.countDown(); } //释放 waitLatch if (watcher.getType() == Watcher.Event.EventType.NodeDeleted && watcher.getPath().equals(waitPath)){ waitLatch.countDown(); } }); //等待 zookeeper 连接建立之后进行后续操作 countDownLatch.await(); //2、判断 /locks 节点是否存在 不存在创建节点为持久性节点 Stat exists = zooKeeper.exists("/locks", false); if (exists == null){ zooKeeper.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } /** * 加锁 */ public void zkLock() { //3、创建临时有序节点 try { currentNode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //4、判断该节点序号是否是最小的,如果是就获取锁,不是就监听它的前一个节点 List<String> nodes = zooKeeper.getChildren("/locks", false); if (nodes.size() == 1){ //说明只有一个节点,就直接返回 return; }else { //对集合进行排序 Collections.sort(nodes); // 获取节点名称 String thisNode = currentNode.substring("/locks/".length()); // 通过节点名称获取该节点在 nodes 集合中的位置 int index = nodes.indexOf(thisNode); // 判断 if (index == -1){ System.out.println("数据异常"); }else if (index == 0){ //说明就只有一个节点 return; }else { // 获取当前节点的前一个节点的路径 waitPath = "/locks/"+nodes.get(index - 1); // 监听前一个节点 zooKeeper.getData(waitPath, true, null); waitLatch.await(); return; } } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } /** * 解锁 */ public void unZkLock() throws InterruptedException, KeeperException { //5、删除节点 zooKeeper.delete(currentNode,-1); } }
分布式锁测试
package com.ausware.springbootzookeeper.com.ausware.distributed; import org.apache.zookeeper.KeeperException; import java.io.IOException; import java.util.concurrent.TimeUnit; /** * 分布式锁测试 */ public class DistributedLockTest { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { DistributedLock lock1 = new DistributedLock(); DistributedLock lock2 = new DistributedLock(); new Thread(()->{ try { lock1.zkLock(); System.out.println("线程一启动,获取到锁"); TimeUnit.SECONDS.sleep(5); lock1.unZkLock(); System.out.println("线程一释放锁"); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } },"线程一").start(); new Thread(()->{ try { lock2.zkLock(); System.out.println("线程二启动,获取到锁"); TimeUnit.SECONDS.sleep(5); lock2.unZkLock(); System.out.println("线程二释放锁"); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } },"线程一").start(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。