赞
踩
实现一把分布式锁通常有很多方法,比较常见的有 Redis 和 Zookeeper。
Redis分布式锁可参考之前的文章:
Zookeeper能实现分布式锁,是因为它有一个特性,就是多个线程去 Zookeeper 里面去创建同一个节点的时候,只会有一个线程执行成功。
锁可理解为 ZooKeeper 上的一个节点,
Zookeeper 的分布式锁实现: 实现有两种方案:
一般我们认为,ZooKeeper 的分布式锁是基于临时顺序节点,然后通过监听机制来实现的
。即方案2。
锁节点:即锁对象
获取锁:锁请求者加锁,则创建锁节点。
注意:
这里锁请求者监听的都是 锁节点的删除操作。释放锁:只需要将会话关闭,临时节点就删除了,即释放了锁。
羊群效应:如果所有的锁请求者都来监听锁持有者,当锁持有者的节点被删除以后,所有的锁请求者都会通知到,即都会同时去竞争锁,但是只有一个锁请求者能拿到锁。这就是羊群效应。
一般不推荐使用。这里我们自己简单实现一下。使用的是 ZkClient客户端。
public class ZkClientConnectUtils {
private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开
/**
* 使用匿名监听类
* @throws Exception
*/
public static ZkClient zKClientConnnect() throws Exception {
ZkClient zkClient = new ZkClient(CONNECT_STR, 3000,60000);
TimeUnit.SECONDS.sleep(3);
return zkClient;
}
}
/**
* 自定义zk分布式锁:定义通用锁接口
*/
public interface ZKLock {
/**
* 加锁
*/
void lock();
/**
* 释放锁
*/
void unlock();
}
/** * 抽象锁对象:用到了模板方法设计模式,具体抽象方法由子类实现 */ public abstract class AbstractZKLock implements ZKLock { protected static String path = "/lock"; protected ZkClient zkClient = null; public AbstractZKLock() { initClient(); } public void initClient(){ try { zkClient = createClient(); } catch (Exception e) { e.printStackTrace(); System.out.println("初始化 zk客户端连接失败,errorMessage=" + e.getMessage()); } } /** * 交给子类创建 * @return */ protected abstract ZkClient createClient() throws Exception; /** * lock方法(模板方法设计模式):获取锁的方法 * 1.如果锁获取成功,那么业务代码继续往下走。 * 2.如果锁获取失败,lock方法需要等待重新获取锁 * 2.1等待到了前面那个获取锁的客户端释放锁以后(zk监听机制), * 2.2然后再去重新获取锁 */ @Override public void lock() { // 尝试去获取锁 if(tryLock()){ System.out.println(Thread.currentThread().getName() + "--->获取锁成功!"); }else { // 获取失败,在这里等待 waitforlock(); // 重新获取锁 lock(); } } @Override public void unlock() { // 因为加锁创建的是临时节点,所以会话关闭,临时节点就删除了,即释放了锁 zkClient.close(); } /** * 获取锁 * @return */ protected abstract boolean tryLock(); /** * 获取失败,等待其他释放锁,重新获取锁 */ protected abstract void waitforlock(); }
/** * 基于ZkClient客户端实现锁: */ public class ZkClientLock extends AbstractZKLock { private CountDownLatch cdl = null; @Override protected ZkClient createClient() throws Exception{ return ZkClientConnectUtils.zKClientConnnect(); } /** * 尝试获取锁 * @return */ @Override protected boolean tryLock() { try { // 加锁:创建临时节点,创建成功表示加锁成功,否则加锁失败。zookeeper 的特性,节点名不能重复,否则创建失败。 if(zkClient.exists(path)) { zkClient.delete(path); } //创建临时节点 zkClient.createEphemeral(path); return true; } catch (RuntimeException e) { return false; } } /** * 等待获取锁: * 等前面那个获取锁成功的客户端释放锁 * 没有获取到锁的客户端都会走到这里 * 1、没有获取到锁的要注册对 path节点的watcher * 2、这个方法需要等待 */ @Override protected void waitforlock() { IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { } // 一旦 path节点被删除(释放锁)以后,就会触发这个方法 @Override public void handleDataDeleted(String dataPath) throws Exception { // 让等待的代码不再等待了 // 即 waitforlock方法执行结束,重新去获取锁 if (cdl != null) { cdl.countDown(); } } }; // 注册对 path节点的watcher zkClient.subscribeDataChanges(path, iZkDataListener); // 等待 if (zkClient.exists(path)) { cdl = new CountDownLatch(1); try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 取消该客户端的订阅关系 zkClient.unsubscribeDataChanges(path, iZkDataListener); } }
public class OrderService { private OrderNumGenereteUtils orderNumFactory = new OrderNumGenereteUtils(); // ZkClient分布式锁 private ZKLock lock = new ZkClientLock(); /** * 创建订单号,模拟业务 */ public void createOrderNum() { lock.lock(); String orderNum = generateOrderNum(); System.out.println(Thread.currentThread().getName() + "创建了订单号:[" + orderNum + "]"); lock.unlock(); } /** * 生成时间格式的订单编号 * @return */ private static int orderNum = 0; public String generateOrderNum() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"); return simpleDateFormat.format(new Date()) + ++orderNum; } }
public class ZkClientLockTest { private static Integer count = 50; private static CountDownLatch cdl = new CountDownLatch(count); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < count; i++) { new Thread(new Runnable() { @Override public void run() { // 模拟50个并发同时去创建订单号。 OrderService orderService = new OrderService(); try { //线程运行起来时先等待。 cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } orderService.createOrderNum(); } }).start(); cdl.countDown(); } TimeUnit.MINUTES.sleep(3); } }
并发模拟 50个线程同时去创建订单号。运行ok。
锁节点:即锁对象
获取锁:锁请求者加锁,则创建锁节点。
注意:
这里锁请求者监听的是它上一个节点的删除操作。释放锁:只需要将会话关闭,临时节点就删除了,即释放了锁。
在实际的开发中,建议直接使用 Curator客户端中的各种官方实现的分布式锁。我们没必要“重复造轮子“”。
Curator客户端提供的 几种锁方案:
下面我们以 InterProcessMutex为例。
前面文章有介绍过 Curator客户端的使用。
public class CuratorClientConnectUtils { private static String CONNECT_STR = "192.168.198.110:2181"; //集群模式下用逗号隔开 private static class InnerClass{ private static CuratorFramework client = clientConnect2(); } public static CuratorFramework getInstance(){ return InnerClass.client; } public static void main(String[] args){ //CuratorFramework client = clientConnect1(); CuratorFramework client = clientConnect2(); //启动客户端,必须要有 client.start(); System.out.println("==CuratorFramework==" + client); } /** * 使用工厂类CuratorFrameworkFactory的静态newClient()方法。 * @throws Exception */ public static CuratorFramework clientConnect1() { // 重试策略, 失败重连3次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3); //创建客户端实例 CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy); return client; } /** * 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。 * @throws Exception */ public static CuratorFramework clientConnect2() { // 重试策略, 失败重连3次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(CONNECT_STR) .sessionTimeoutMs(3000) // 会话超时时间 .connectionTimeoutMs(50000) // 连接超时时间 .retryPolicy(retryPolicy) .namespace("Curator_Workspace") // 指定隔离名称,表示所有节点的操作都会在该工作空间下进行。不指定时,使用自定义的节点path .build(); return client; } public static void closeZKClient() { if (InnerClass.client != null) { InnerClass.client.close(); } } }
public class OrderService { /** * 分布式可重入排它锁 */ public InterProcessMutex interProcessMutex; // 简单点,构造方法注入锁实例 public OrderService(InterProcessMutex interProcessMutex) { this.interProcessMutex = interProcessMutex; } /** * 创建订单号,模拟业务 */ public void createOrderNum() throws Exception { //加锁 interProcessMutex.acquire(); String orderNum = generateOrderNum(); System.out.println(Thread.currentThread().getName() + "创建了订单号:[" + orderNum + "]"); //释放锁 interProcessMutex.release(); } /** * 生成时间格式的订单编号 * @return */ private static int orderNum = 0; public String generateOrderNum() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-"); return simpleDateFormat.format(new Date()) + ++orderNum; } }
public class InterProcessMutexTest { private static Integer count = 50; private static CountDownLatch cdl = new CountDownLatch(count); public static void main(String[] args) { CuratorFramework client = CuratorClientConnectUtils.getInstance(); client.start(); InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/curator/lock"); for (int i = 0; i < count; i++) { new Thread(new Runnable() { @Override public void run() { // 模拟50个并发同时去创建订单号。 OrderService orderService = new OrderService(interProcessMutex); try { //线程运行起来时先等待。 cdl.await(); orderService.createOrderNum(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }).start(); cdl.countDown(); } } }
并发模拟 50个线程同时去创建订单号。运行ok。
注意:
我们可以通过锁的构造方法传入path。不传时底层path会有默认值。
下面简单看一下 Curator客户端实现分布式锁的源码。
3.1 查看acquire() -> internalLock()方法
3.2 查看 attemptLock()方法
上面 createsTheLock()方法和 internalLockLoop()方法是重点。底层都是 Curator客户端的API使用。
3.2.1 createsTheLock()方法:
级联创建临时有序节点,即获取锁逻辑。返回节点路径。
3.2.2 internalLockLoop()方法
这个方法实现了 监听的机制,自行查看。
注意:
usingWatcher是一次性监听。
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; }
4.1 查看 release()方法
4.2 查看releaseLock()方法
– 求知若饥,虚心若愚。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。