赞
踩
从官网上可以看到zookeeper的数据模型图如下:
分布式锁是控制分布式系统同步访问共享资源的一种方式。
在分布式系统中,如果有两个系统需要对同一个资源进行操作的时候,就需要通过一些互斥的手段来防止其他系统的干扰,以保证一致性,此时就需要使用到分布式锁了。
常见的分布式锁实现有redis分布式锁,zookeeper分布式锁,下面介绍下zookeeper分布式锁的实现。
1、原理
使用zookeeper临时节点+watch机制实现。
由于zookeeper节点不可重名,所以对于同一个节点(/lock/createOrder),如果有多个系统同时想要创建时,只会有一个系统成功,成功的系统即为获取到锁,可以进行下一步的业务操作,没有获取到锁的系统,可以对该临时节点设置监听,以便再临时节点删除后,可以获取锁资源。
下图为第一种分布式锁的实现原理图:
具体代码如下:
package com.xiaohuihui.zookeeper; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; import java.io.UnsupportedEncodingException; /** * @Desription: 小灰灰的zk序列化 * @Author: yangchenhui */ public class XiaohuihuiZkSerializer implements ZkSerializer { String charset = "UTF-8"; @Override public byte[] serialize(Object data) throws ZkMarshallingError { try { return String.valueOf(data).getBytes(charset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); throw new ZkMarshallingError(e); } } @Override public Object deserialize(byte[] bytes) throws ZkMarshallingError { try { return new String(bytes, charset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); throw new ZkMarshallingError(e); } } } package com.xiaohuihui.zookeeper; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @Desription: zk分布式锁 * @Author: yangchenhui */ public class ZkDistributeLock implements Lock { /** * zk节点 */ private String lockPath; /** * zk客户端 */ private ZkClient zkClient; private final String connectString = "localhost:2181"; private final int sessionTimeout = 2000; private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>(); public ZkDistributeLock(String lockPath) { this.lockPath = lockPath; zkClient = new ZkClient(connectString, sessionTimeout); zkClient.setZkSerializer(new XiaohuihuiZkSerializer()); } public ZkDistributeLock(String lockPath, ZkClient zkClient) { this.lockPath = lockPath; this.zkClient = zkClient; } @Override public void lock() { boolean hasLock = tryLock(); if (!hasLock) { // 没有获取到锁,等待获取锁 waitForLock(); // 再次尝试获取锁 lock(); } } /** * 监听zk节点变化,等待获取锁 */ private void waitForLock() { CountDownLatch downLatch = new CountDownLatch(1); IZkDataListener zkDataListener = new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("=======> 有人修改了zk节点数据"); } @Override public void handleDataDeleted(String dataPath) throws Exception { downLatch.countDown(); System.out.println("=======> zk节点被删除,可以重新获取锁"); } }; zkClient.subscribeDataChanges(lockPath, zkDataListener); // 如果当前zk节点存在,阻塞线程 if (zkClient.exists(lockPath)) { try { downLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消注册 zkClient.unsubscribeDataChanges(lockPath, zkDataListener); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { // 保证锁的可重入性(在一个业务的执行过程中,如果有多个地方需要获取到锁,需要实现锁的可重入性),通过本地线程变量实现 Integer count = this.reentrantCount.get(); if (count != null && count > 0) { // 表示当前线程已经获取到锁 this.reentrantCount.set(++count); return true; } // 当前线程没有获取到锁,尝试创建zk节点,创建成功即为获取到锁 try { zkClient.createEphemeral(lockPath); this.reentrantCount.set(1); return true; } catch (ZkNodeExistsException e) { return false; } } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { // 判断当前线程是否持有锁 Integer count = this.reentrantCount.get(); if (null != count && count > 1) { // 当前线程有超过1处获取锁 this.reentrantCount.set(--count); return; } else if (null != count && count.equals(1)) { // 只有一处获取过锁 this.reentrantCount.set(null); } // 删除zk节点 zkClient.delete(lockPath); } @Override public Condition newCondition() { return null; } } package com.xiaohuihui.zookeeper; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * @Desription: zk分布式锁测试 * @Author: yangchenhui */ public class ZkDistributeLockTest { public static void main(String[] args) { // 并发数 int currency = 50; // 循环屏障 CyclicBarrier cb = new CyclicBarrier(currency); // 多线程模拟高并发 for (int i = 0; i < currency; i++) { new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " ==============> 准备好"); // 等待一起出发 try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } ZkDistributeLock lock = new ZkDistributeLock("/xiaohuihui zkDistributeLock"); // ZkDistributeLock2 lock = new ZkDistributeLock2("/xiaohuihui zkDistributeLock"); try { lock.lock(); System.out.println(Thread.currentThread().getName() + " 获得锁!"); } finally { lock.unlock(); } } }).start(); } } }
创建锁的时候需要考虑到可重入性,否则在同一个线程中,如果有多个地方需要获取到锁,获取不到的地方会产生阻塞,所以需要实现可重入性,使用ThreadLocal可以实现。
运行效果如下:
这种情况下,只要节点删除,所有设置监听的客户端都会收到通知,产生惊群效应,极大的浪费系统资源,需要进行改进,实现思路如下图:
这样只会最小号收到通知,具体实现如下:
package com.xiaohuihui.zookeeper; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @Desription: 使用zk临时顺序节点实现分布式锁,避免惊群效应 * @Author: yangchenhui */ public class ZkDistributeLock2 implements Lock { /** * zk节点 */ private String lockPath; /** * zk客户端 */ private ZkClient zkClient; private final String connectString = "localhost:2181"; private final int sessionTimeout = 2000; /** * 重入锁计数 */ private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>(); /** * 当前节点路径 */ private ThreadLocal<String> currentPath = new ThreadLocal<>(); /** * 上个节点路径 */ private ThreadLocal<String> beforePath = new ThreadLocal<>(); public ZkDistributeLock2(String lockPath) { this.lockPath = lockPath; zkClient = new ZkClient(connectString, sessionTimeout); zkClient.setZkSerializer(new XiaohuihuiZkSerializer()); if (!this.zkClient.exists(lockPath)) { try { this.zkClient.createPersistent(lockPath); } catch (ZkNodeExistsException e) { } } } @Override public void lock() { // 获取锁 boolean hasLock = tryLock(); if (!hasLock) { // 没有获取到锁,阻塞监听 waitLock(); // 再次尝试获取锁 lock(); } } /** * 阻塞监听,等待获取zk锁 */ private void waitLock() { CountDownLatch downLatch = new CountDownLatch(1); IZkDataListener zkDataListener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { downLatch.countDown(); System.out.println("=======> zk节点被删除,可以重新获取锁"); } @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("=======> 有人修改了zk节点数据"); } }; // 此时只需要监听上一个zk节点,不会引起惊群效应 zkClient.subscribeDataChanges(this.beforePath.get(), zkDataListener); // 如果前一个zk节点存在,阻塞线程 if (zkClient.exists(this.beforePath.get())) { try { downLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消注册 zkClient.unsubscribeDataChanges(this.beforePath.get(), zkDataListener); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { // 保证锁的可重入性(在一个业务的执行过程中,如果有多个地方需要获取到锁,需要实现锁的可重入性),通过本地线程变量实现 Integer count = this.reentrantCount.get(); if (count != null && count > 0) { // 表示当前线程已经获取到锁 this.reentrantCount.set(++count); return true; } // 如果当前节点不存在,创建临时顺序节点 if (this.currentPath.get() == null) { this.currentPath.set(zkClient.createEphemeralSequential(lockPath + "/", "xiaohuihui testlock2")); } // 获取所有子节点,判断当前节点是不是最小的节点 List<String> children = zkClient.getChildren(lockPath); Collections.sort(children); if (this.currentPath.get().equals(lockPath + "/" + children.get(0))) { // 当前节点即为最小节点 this.reentrantCount.set(1); return true; } else { // 获取到当前节点的前一个节点,设置beforePath int currentPathIndex = children.indexOf(this.currentPath.get().substring(lockPath.length() + 1)); this.beforePath.set(lockPath + "/" + children.get(currentPathIndex - 1)); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { // 判断当前线程是否持有锁 Integer count = this.reentrantCount.get(); if (count != null) { // 当前线程已经在多处获取锁 if (count > 1) { this.reentrantCount.set(--count); return; } else { this.reentrantCount.set(null); } } // 删除当前zk节点 zkClient.delete(this.currentPath.get()); } @Override public Condition newCondition() { return null; } }
运行效果如下,已经避免惊群效应:
PS:未完待续
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。