当前位置:   article > 正文

Curator学习笔记(一)- 读写锁

curator读写操作

Curator Recipes是netfix开源的zookeeper客户端框架,因为zookeeper客户端在使用上很不方便,因此curator recipes对其进行了封装,并提供了十分丰富的功能。如下图所示。

基本涵盖了常用的分布式调度功能。那么这块他们是怎么做的。考虑到好的代码肯定做了很多优化,里边会有很多设计模式。但是作者目前还达不到那种一眼就看出其代码的精髓锁着,因此这块作者还是按照老样子。小了解其大概得轮廓。以后再复习设计模式的时候。再去思考这些能够真正提升自身功力的东西。

首先要使用curator提供的功能,需要导入相关的包

  1. <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
  2. <dependency>
  3. <groupId>org.apache.curator</groupId>
  4. <artifactId>curator-framework</artifactId>
  5. <version>5.1.0</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
  8. <dependency>
  9. <groupId>org.apache.curator</groupId>
  10. <artifactId>curator-recipes</artifactId>
  11. <version>5.1.0</version>
  12. </dependency>

这里我们将zookeeper客户端交给spring进行管理。

  1. @Configuration
  2. public class TestCurd {
  3. @Bean
  4. public CuratorFramework main() {
  5. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  6. //创建连接对象
  7. CuratorFramework client = CuratorFrameworkFactory.builder()
  8. //IP地址端口号
  9. .connectString("127.0.0.1:2181")
  10. //客户端与服务器之间的会话超时时间
  11. .sessionTimeoutMs(1000000)
  12. //当客户端与服务器之间会话超时3s后,进行一次重连
  13. .retryPolicy(retryPolicy)
  14. //命名空间,当我们创建节点的时候,以/create为父节点
  15. .namespace("create")
  16. //构建连接对象
  17. .build();
  18. //打开连接
  19. client.start();
  20. //是否成功建立连接,true :建立, false:没有建立
  21. System.out.println(client.isStarted());
  22. return client;
  23. }
  24. }

编写相关测试方法

  1. @GetMapping(value = "/lock2")
  2. public void lock2() throws Exception {
  3. // 读写锁
  4. InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
  5. // 获取读锁对象
  6. InterProcessLock interProcessLock=interProcessReadWriteLock.readLock();
  7. System.out.println("等待获取锁对象!");
  8. // 获取锁
  9. interProcessLock.acquire();
  10. for (int i = 1; i <= 10; i++) {
  11. Thread.sleep(3000);
  12. System.out.println(i);
  13. }
  14. // 释放锁
  15. interProcessLock.release();
  16. System.out.println("等待释放锁!");
  17. }
  18. @GetMapping(value = "/lock3")
  19. public void lock3() throws Exception {
  20. // 读写锁
  21. InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");
  22. // 获取写锁对象
  23. InterProcessLock interProcessLock=interProcessReadWriteLock.writeLock();
  24. System.out.println("等待获取锁对象!");
  25. // 获取锁
  26. interProcessLock.acquire();
  27. for (int i = 1; i <= 10; i++) {
  28. Thread.sleep(3000);
  29. System.out.println(i);
  30. }
  31. // 释放锁
  32. interProcessLock.release();
  33. System.out.println("等待释放锁!");
  34. }

 这块我们我们看到curator提供了读写锁。我们发现在初始化的时候。curator就已经将读锁和写锁进行了初始化。而我们真正在使用的时候也就是直接使用。

  1. public InterProcessReadWriteLock(CuratorFramework client, String basePath) {
  2. this(client, basePath, (byte[])null);
  3. }
  4. public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
  5.         lockData = lockData == null ? null : Arrays.copyOf(lockData, lockData.length);
  6.         //写锁
  7. this.writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__WRIT__", lockData, 1, new InterProcessReadWriteLock.SortingLockInternalsDriver() {
  8. public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {
  9. return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
  10. }
  11. });
  12. //读锁
  13. this.readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__READ__", lockData, 2147483647, new InterProcessReadWriteLock.SortingLockInternalsDriver() {
  14. public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {
  15. return InterProcessReadWriteLock.this.readLockPredicate(children, sequenceNodeName);
  16. }
  17. });
  18. }

获取读锁

  1. //获取锁
  2. public void acquire() throws Exception {
  3. if (!this.internalLock(-1L, (TimeUnit)null)) {
  4. throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
  5. }
  6. }
  7. //获取锁
  8. private boolean internalLock(long time, TimeUnit unit) throws Exception {
  9. Thread currentThread = Thread.currentThread();
  10. //通过绑定thread的方式对该线程重入的次数进行记录。
  11. InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
  12. if (lockData != null) {
  13. //如果发生了重入,那么这里就将重入的次数进行加一操作
  14. lockData.lockCount.incrementAndGet();
  15. //表示获取到锁
  16. return true;
  17. } else {
  18. //如果第一次加锁,或者中途获取锁失败。那么进行尝试
  19. String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
  20. if (lockPath != null) {
  21. InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
  22. this.threadData.put(currentThread, newLockData);
  23. return true;
  24. } else {
  25. return false;
  26. }
  27. }
  28. }

  1. String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
  2. long startMillis = System.currentTimeMillis();
  3. Long millisToWait = unit != null ? unit.toMillis(time) : null;
  4. byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
  5. int retryCount = 0;
  6. String ourPath = null;
  7. boolean hasTheLock = false;
  8. boolean isDone = false;
  9. while(!isDone) {
  10. isDone = true;
  11. try {
  12. //通过初始化的driver获取锁
  13. ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
  14. //判断是否拿到锁,这里对读锁和写锁进行兼容。
  15. hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
  16. } catch (NoNodeException var14) {
  17. if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
  18. throw var14;
  19. }
  20. isDone = false;
  21. }
  22. }
  23. return hasTheLock ? ourPath : null;
  24. }

获取锁

  1. public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
  2. String ourPath;
  3. //通过我们注入到spring ioc中的client操作zk,通过判断是否存在该路径进行加锁
  4. if (lockNodeBytes != null) {
  5. ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
  6. } else {
  7. ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
  8. }
  9. //拿到路径之后,就返回
  10. return ourPath;

判断是否拿到锁的根据是这里的maxLeasse,写锁这里为1,读锁为2147483647

  1. public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {
  2. int ourIndex = children.indexOf(sequenceNodeName);
  3. validateOurIndex(sequenceNodeName, ourIndex);
  4. boolean getsTheLock = ourIndex < maxLeases;
  5. String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
  6. return new PredicateResults(pathToWatch, getsTheLock);
  7. }

通过上述分析,我们大概了解了curator做分布式锁的基本过程,通过对path路径的是否存在进行加锁。锁的重入是针对于线程本身来说的。在单个jvm中线程的中断对其他线程的轮询没有任何影响。只有当当前线程运行完毕并删除zk中的节点,其他线程才可以进行加锁。相反在读锁中,通过与数字2147483647进行对比来判断是否可以加锁。这里的2147483647就是读锁的上线。

在锁释放的这个问题上。我们看到也是通过从lackdata中获取重入的次数,然后进行递减的。因为这个lockdata和线程进行绑定。所以在线程轮转中是没有数据消失的问题的。

  1. public void release() throws Exception {
  2. Thread currentThread = Thread.currentThread();
  3. //拿到当前线程的重入数据
  4. InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
  5. if (lockData == null) {
  6. throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
  7. } else {
  8. //进行锁的重入次数的释放
  9. int newLockCount = lockData.lockCount.decrementAndGet();
  10. if (newLockCount <= 0) {
  11. if (newLockCount < 0) {
  12. throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
  13. } else {
  14. try {
  15. //如果锁被释放完毕。那么就开始真正的释放
  16. this.internals.releaseLock(lockData.lockPath);
  17. } finally {
  18. this.threadData.remove(currentThread);
  19. }
  20. }
  21. }
  22. }
  23. }

锁的释放也很简单,直接删除

  1. final void releaseLock(String lockPath) throws Exception {
  2. this.client.removeWatchers();
  3. this.revocable.set((Object)null);
  4. this.deleteOurPath(lockPath);
  5. }

总结:通过分析,curator读写锁是通过对zk节点的存在与否进行判断的从而进行加锁的,对于读锁来说只有在不存在的时候线程才能加锁成功。通过将线程和重入次数的绑定,来实现的锁重入机制。当锁被释放之后,通过删除节点来通知其他线程进行加锁。对于读锁来说,单个线程最大或者的读锁数量也是有限制的。通过序列号的方式与写锁进行区别。读锁这块的详细实现作者还没有想明白,以后想明白了再补上。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/674449
推荐阅读
相关标签
  

闽ICP备14008679号