赞
踩
目录
三、InterProcessReadWriteLock读写锁
前面我们已经介绍过zookeeper是分布式锁的实现方案之一,它是利用临时有序节点的特性来保证唯一性的。
Zookeeper分布式锁原理:
在zookeeper中,一个分布式锁对应到zookeeper中的一个文件夹,每个客户端线程请求获取分布式锁的时候,就需要在这个文件夹创建一个临时有序节点,有两种情况:
阻塞-唤醒逻辑:把文件夹下的节点顺序排一下序,找到当前节点的前一个节点,使用watcher机制监听前面一个节点的变化,当前一个节点被删除时会触发Watch事件,进而唤醒当前阻塞的线程。
如果前一个节点对应的客户端崩溃了,则节点对应的Watch事件也会触发,也会唤醒后一个节点对应的客户端线程,此时仍需要判断当前节点是第一个节点之后才能获取锁,否则继续进入阻塞并监听前面一个节点。
当某个客户端线程第一次获取锁成功之后,在JVM内存中的一个ConcurrentMap中存储当前线程对应的锁路径及重入次数,后面同一个线程再次获取锁时,先检查该Map中当前锁是否已被当前线程占用即可,如果已占用,则只需要递增重入次数即可。
注意:只考虑同一个客户端、同一个线程获取同一个分布式锁的可重入性。
释放锁时,首先将锁的重入次数减一,然后判断重入次数是否已经为0:
- package com.wsh.zookeeper.zookeeperapidemo.curator;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessLock;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class CuratorInterProcessMutexDemo {
-
- private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessMutexDemo.class);
- private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
-
- /**
- * Curator对象
- */
- private static CuratorFramework curatorFramework;
-
- static {
- //初始化curator对象
- curatorFramework = CuratorFrameworkFactory
- .builder()
- //会话超时时间
- .sessionTimeoutMs(5000)
- //服务器集群地址
- .connectString(ZOOKEEPER_SERVER_ADDRESS)
- //重试策略
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
- //开启客户端
- curatorFramework.start();
- }
-
-
- private static void getLock(InterProcessLock interProcessLock) {
- try {
- interProcessLock.acquire();
- logger.info("线程:" + Thread.currentThread().getName() + "获取锁成功......");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- interProcessLock.release();
- logger.info("线程:" + Thread.currentThread().getName() + "释放锁成功......");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- public static void main(String[] args) {
- //分布式可重入排它锁
- InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/distributeLock");
- new Thread(() -> getLock(interProcessMutex), "T1").start();
- new Thread(() -> getLock(interProcessMutex), "T2").start();
- new Thread(() -> getLock(interProcessMutex), "T3").start();
- new Thread(() -> getLock(interProcessMutex), "T4").start();
- }
-
- }

运行程序,观察后端日志:
- 14:41:37.532 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T2获取锁成功......
- 14:41:37.538 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T2释放锁成功......
-
- 14:41:37.543 [T4] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T4获取锁成功......
- 14:41:37.554 [T4] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T4释放锁成功......
-
- 14:41:37.555 [T3] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T3获取锁成功......
- 14:41:37.559 [T3] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T3释放锁成功......
-
- 14:41:37.560 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T1获取锁成功......
- 14:41:37.564 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessMutexDemo - 线程:T1释放锁成功......
如上可见,多个线程争夺一个共享锁的时候,相互之间是互斥的。流程分析如下图所示:
curator实现了跨JVM的可重入读写互斥锁。读写锁包含了读锁、写锁两个,它们的互斥关系如下:
案例:读写互斥
- package com.wsh.zookeeper.zookeeperapidemo.curator;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class CuratorInterProcessReadWriteLockDemo {
-
- private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessReadWriteLockDemo.class);
- private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
-
- /**
- * Curator对象
- */
- private static CuratorFramework curatorFramework;
- /**
- * curator读锁对象
- */
- private static InterProcessMutex readLock;
- /**
- * curator写锁对象
- */
- private static InterProcessMutex writeLock;
-
- static {
- //初始化curator对象
- curatorFramework = CuratorFrameworkFactory
- .builder()
- //会话超时时间
- .sessionTimeoutMs(5000)
- //服务器集群地址
- .connectString(ZOOKEEPER_SERVER_ADDRESS)
- //重试策略
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
- //开启客户端
- curatorFramework.start();
- InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/distribute-lock");
- readLock = interProcessReadWriteLock.readLock();
- writeLock = interProcessReadWriteLock.writeLock();
- }
-
- public static void main(String[] args) {
- new Thread(() -> {
- try {
- readLock.acquire();
- logger.info("线程:" + Thread.currentThread().getName() + "获取读锁...");
- Thread.sleep(3000);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- readLock.release();
- logger.info("线程:" + Thread.currentThread().getName() + "释放读锁...");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "T1").start();
-
- new Thread(() -> {
- try {
- Thread.sleep(2000);
- writeLock.acquire();
- logger.info("线程:" + Thread.currentThread().getName() + "获取写锁...");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- writeLock.release();
- logger.info("线程:" + Thread.currentThread().getName() + "释放写锁...");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "T2").start();
- }
-
- }

- 15:22:11.460 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1获取读锁...
- 15:22:14.468 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1释放读锁...
-
- 15:22:14.470 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2获取写锁...
- 15:22:14.473 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2释放写锁...
由上日志可见,读锁先获取到锁资源后,写锁必须等待锁释放之后,才能获取锁。
案例:读读不互斥
- package com.wsh.zookeeper.zookeeperapidemo.curator;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class CuratorInterProcessReadWriteLockDemo {
-
- private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessReadWriteLockDemo.class);
- private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
-
- /**
- * Curator对象
- */
- private static CuratorFramework curatorFramework;
- /**
- * curator读锁对象
- */
- private static InterProcessMutex readLock;
- /**
- * curator写锁对象
- */
- private static InterProcessMutex writeLock;
-
- static {
- //初始化curator对象
- curatorFramework = CuratorFrameworkFactory
- .builder()
- //会话超时时间
- .sessionTimeoutMs(5000)
- //服务器集群地址
- .connectString(ZOOKEEPER_SERVER_ADDRESS)
- //重试策略
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
- //开启客户端
- curatorFramework.start();
- InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/distribute-lock");
- readLock = interProcessReadWriteLock.readLock();
- writeLock = interProcessReadWriteLock.writeLock();
- }
-
- public static void main(String[] args) {
- new Thread(() -> {
- try {
- readLock.acquire();
- logger.info("线程:" + Thread.currentThread().getName() + "获取读锁...");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- readLock.release();
- logger.info("线程:" + Thread.currentThread().getName() + "释放读锁...");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "T1").start();
-
- new Thread(() -> {
- try {
- readLock.acquire();
- logger.info("线程:" + Thread.currentThread().getName() + "获取读锁...");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- readLock.release();
- logger.info("线程:" + Thread.currentThread().getName() + "释放读锁...");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "T2").start();
- }
-
- }

- 15:32:30.347 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2获取读锁...
- 15:32:30.347 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1获取读锁...
-
- 15:32:30.354 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1释放读锁...
- 15:32:30.354 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2释放读锁...
由上面的日志可见,两个读线程之间获取锁资源的话,是可以同时获取到锁的,并不互斥。
案例:写写互斥
- package com.wsh.zookeeper.zookeeperapidemo.curator;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class CuratorInterProcessReadWriteLockDemo {
-
- private static final Logger logger = LoggerFactory.getLogger(CuratorInterProcessReadWriteLockDemo.class);
- private static final String ZOOKEEPER_SERVER_ADDRESS = "192.168.179.128:2181,192.168.179.129:2181,192.168.179.133:2181";
-
- /**
- * Curator对象
- */
- private static CuratorFramework curatorFramework;
- /**
- * curator读锁对象
- */
- private static InterProcessMutex readLock;
- /**
- * curator写锁对象
- */
- private static InterProcessMutex writeLock;
-
- static {
- //初始化curator对象
- curatorFramework = CuratorFrameworkFactory
- .builder()
- //会话超时时间
- .sessionTimeoutMs(5000)
- //服务器集群地址
- .connectString(ZOOKEEPER_SERVER_ADDRESS)
- //重试策略
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .build();
- //开启客户端
- curatorFramework.start();
- InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, "/distribute-lock");
- readLock = interProcessReadWriteLock.readLock();
- writeLock = interProcessReadWriteLock.writeLock();
- }
-
- public static void main(String[] args) {
- new Thread(() -> {
- try {
- writeLock.acquire();
- logger.info("线程:" + Thread.currentThread().getName() + "获取写锁...");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- writeLock.release();
- logger.info("线程:" + Thread.currentThread().getName() + "释放写锁...");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "T1").start();
-
- new Thread(() -> {
- try {
- writeLock.acquire();
- logger.info("线程:" + Thread.currentThread().getName() + "获取写锁...");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- writeLock.release();
- logger.info("线程:" + Thread.currentThread().getName() + "释放写锁...");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }, "T2").start();
- }
-
- }

- 15:33:28.701 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1获取写锁...
- 15:33:28.706 [T1] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T1释放写锁...
-
- 15:33:28.711 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2获取写锁...
- 15:33:28.717 [T2] INFO com.wsh.zookeeper.zookeeperapidemo.curator.CuratorInterProcessReadWriteLockDemo - 线程:T2释放写锁...
如上日志可见,两个线程同时尝试获取锁资源的时候,某个时刻只会有一个线程能获取成功,其他线程必须等待释放锁后才能尝试去获取锁。
本文主要总结了curator分布式锁的实现原理,以及通过案例详解介绍了curator的可重入排它锁以及读写锁的使用,在分布式系统开发中,实现分布式锁的方式有很多种,如数据库方式、redis、zookeeper等,具体根据业务场景选择合适的即可。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。