当前位置:   article > 正文

zookeeper实用(2):实现分布式锁

zookeeper实用(2):实现分布式锁

场景:

分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。

在分布式环境下分布式锁的使用场景,比如秒杀情况,分布式任务,限制访问(同一时间只能发起一次)等。在单机环境中不推荐使用分布式锁,因为分布式锁使用会有一些性能开销的。

分布式锁的实现方式很多,比如DB,redis,zookeeper或者自己实现锁服务。

测试代码:

  1. package com.example.demo.util;
  2. import org.I0Itec.zkclient.ZkClient;
  3. import java.util.Comparator;
  4. import java.util.List;
  5. import java.util.concurrent.CountDownLatch;
  6. public final class DLock
  7. {
  8. private ZkClient zkClient;
  9. private String lockName;
  10. private String thisReadLock;
  11. private String thisWriteLock;
  12. /**
  13. * 分布式锁连接zookeeper 以及 初始化锁主节点
  14. *
  15. * @param hostUrl zookeeper 连接url
  16. * @param lockName 锁主节点
  17. */
  18. public void connect(String hostUrl , String lockName)
  19. {
  20. this.lockName = lockName;
  21. zkClient = new ZkClient(hostUrl);
  22. if (!zkClient.exists(lockName))
  23. zkClient.createPersistent(lockName);
  24. }
  25. /**
  26. * 获取读锁
  27. */
  28. public void lockRead()
  29. {
  30. CountDownLatch readLatch = new CountDownLatch(1);
  31. // 创建此临时节点, 获取带有顺序号的完整节点
  32. String thisLockNodeBuilder = lockName + "/" + LockType.READ + "-";
  33. thisReadLock = zkClient.createEphemeralSequential(thisLockNodeBuilder , "");
  34. // 找到此读锁前一个写锁。 找到节点的子节点
  35. List<String> tmp_nodes = zkClient.getChildren(lockName);
  36. sortNodes(tmp_nodes);
  37. tmp_nodes.forEach(System.out::println);
  38. int tmp_index = 0;
  39. // 倒序循环,直到读锁前面所有的写锁释放,读锁才执行
  40. for (int i = tmp_nodes.size() - 1; i >= 0; i--)
  41. {
  42. if (thisReadLock.equals(lockName + "/" + tmp_nodes.get(i)))
  43. {
  44. tmp_index = i;
  45. } else if (i < tmp_index && tmp_nodes.get(i).split("-")[0].equals(LockType.WRITE.toString()))
  46. {
  47. // 找到当前读锁之前的一个写锁
  48. // 先监听此写锁(监听该写锁是否释放),再阻塞当前读锁
  49. zkClient.subscribeChildChanges(lockName + "/" + tmp_nodes.get(i) , (parentPath , currentChilds) -> readLatch.countDown());
  50. try
  51. {
  52. readLatch.await();
  53. } catch (InterruptedException e)
  54. {
  55. e.printStackTrace();
  56. }
  57. break;
  58. }
  59. }
  60. }
  61. /**
  62. * 释放读锁
  63. */
  64. public void unLockRead()
  65. {
  66. if (this.thisReadLock != null)
  67. {
  68. zkClient.delete(thisReadLock);
  69. thisReadLock = null;
  70. }
  71. }
  72. /**
  73. * 获取写锁
  74. */
  75. public void lockWrite()
  76. {
  77. CountDownLatch writeLatch = new CountDownLatch(1);
  78. // 创建此临时节点, 获取带有顺序号的完整节点
  79. String thisLockNodeBuilder = lockName + "/" + LockType.WRITE + "-";
  80. thisWriteLock = zkClient.createEphemeralSequential(thisLockNodeBuilder , "");
  81. List<String> tmp_nodes = zkClient.getChildren(lockName);
  82. sortNodes(tmp_nodes);
  83. // 倒序循环,直到拿到最前面的写锁,它释放了,后面的写锁才能执行
  84. for (int i = tmp_nodes.size() - 1; i >= 0; i--)
  85. {
  86. if (thisWriteLock.equals(lockName + "/" + tmp_nodes.get(i)))
  87. {
  88. // 在锁列表中找到此写锁
  89. if (i > 0)
  90. {
  91. // 如果此写锁前面还有锁
  92. // 监听前面的锁(监听前面的锁是否释放), 然后阻塞当前写锁获取
  93. zkClient.subscribeChildChanges(lockName + "/" + tmp_nodes.get(i - 1) , (parentPath , currentChilds) -> writeLatch.countDown());
  94. try
  95. {
  96. // 阻塞当前写锁获取
  97. writeLatch.await();
  98. } catch (InterruptedException e)
  99. {
  100. e.printStackTrace();
  101. }
  102. break;
  103. }
  104. }
  105. }
  106. }
  107. /**
  108. * 释放写锁
  109. */
  110. public void unLockWrite()
  111. {
  112. if (thisWriteLock != null)
  113. {
  114. zkClient.delete(thisWriteLock);
  115. thisWriteLock = null;
  116. }
  117. }
  118. /**
  119. * 重新对节点进行排序,按照顺序号排序
  120. *
  121. * @param nodes 临时节点
  122. */
  123. private void sortNodes(List<String> nodes)
  124. {
  125. nodes.sort(Comparator.comparing(o -> o.split("-")[1]));
  126. }
  127. /**
  128. * 锁类型枚举
  129. */
  130. private enum LockType
  131. {
  132. READ,
  133. WRITE;
  134. }
  135. }

模拟多客户端并发请求:

  1. import com.example.demo.util.DLock;
  2. import org.springframework.beans.factory.annotation.Value;
  3. public class TestDLock {
  4. private static String hostUrl = "127.0.0.1:2181";
  5. public static void main(String[] args) {
  6. dLockClient1();
  7. dLockClient2();
  8. dLockClient3();
  9. dLockClient4();
  10. }
  11. public static void dLockClient1(){
  12. // 客户端1
  13. DLock lock=new DLock();
  14. lock.connect(hostUrl, "/lock");
  15. lock.lockRead();
  16. System.out.println("I am testNode 1");
  17. try
  18. {
  19. System.out.println("I am TestNode 1");
  20. System.out.println("睡眠10s 之后释放分布式读锁, 开始倒计时");
  21. for (int i = 0; i < 10; i++)
  22. {
  23. System.out.println(10-i);
  24. Thread.sleep(1000);
  25. }
  26. } catch (InterruptedException e)
  27. {
  28. e.printStackTrace();
  29. }
  30. lock.unLockRead();
  31. }
  32. public static void dLockClient2(){
  33. // 客户端2
  34. DLock lock=new DLock();
  35. lock.connect(hostUrl, "/lock");
  36. lock.lockRead();
  37. System.out.println("I am TestNode 2");
  38. lock.unLockRead();
  39. }
  40. public static void dLockClient3(){
  41. // 客户端3
  42. DLock lock=new DLock();
  43. lock.connect(hostUrl, "/lock");
  44. lock.lockWrite();
  45. System.out.println("I am testNode 3");
  46. System.out.println("睡眠10s 之后释放分布式写锁, 开始倒计时");
  47. for (int i = 0; i < 10; i++)
  48. {
  49. System.out.println(10-i);
  50. try
  51. {
  52. Thread.sleep(1000);
  53. } catch (InterruptedException e)
  54. {
  55. e.printStackTrace();
  56. }
  57. }
  58. lock.unLockWrite();
  59. }
  60. public static void dLockClient4(){
  61. // 客户端4
  62. DLock lock=new DLock();
  63. lock.connect(hostUrl, "/lock");
  64. lock.lockRead();
  65. System.out.println("I am TestNode 4");
  66. lock.unLockRead();
  67. }
  68. }

运行结果:

  1. READ-0000000000
  2. I am testNode 1
  3. I am TestNode 1
  4. 睡眠10s 之后释放分布式读锁, 开始倒计时
  5. 10
  6. 9
  7. 8
  8. 7
  9. 6
  10. 5
  11. 4
  12. 3
  13. 2
  14. 1
  15. 17:44:29.643 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980000 after 12ms
  16. 17:44:29.804 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980000, packet:: clientPath:null serverPath:null finished:false header:: 5,2 replyHeader:: 5,30064771079,0 request:: '/lock/READ-0000000000,-1 response:: null
  17. 17:44:29.805 [main] DEBUG org.I0Itec.zkclient.ZkConnection - Creating new ZookKeeper instance to connect to 127.0.0.1:2181.
  18. 17:44:29.805 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@1e643faf
  19. 17:44:29.805 [ZkClient-EventThread-15-127.0.0.1:2181] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
  20. 17:44:29.809 [main] DEBUG org.I0Itec.zkclient.ZkClient - Awaiting connection to Zookeeper server
  21. 17:44:29.809 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
  22. 17:44:29.810 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
  23. 17:44:29.812 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
  24. 17:44:29.812 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on 127.0.0.1/127.0.0.1:2181
  25. 17:44:29.827 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x100026588980001, negotiated timeout = 30000
  26. 17:44:29.827 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Received event: WatchedEvent state:SyncConnected type:None path:null
  27. 17:44:29.827 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
  28. 17:44:29.828 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Leaving process event
  29. 17:44:29.828 [main] DEBUG org.I0Itec.zkclient.ZkClient - State is SyncConnected
  30. 17:44:29.838 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,30064771080,0 request:: '/lock,F response:: s{30064771077,30064771077,1607247859574,1607247859574,0,2,0,0,0,0,30064771079}
  31. 17:44:29.847 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 2,1 replyHeader:: 2,30064771081,0 request:: '/lock/READ-,#ffffffacffffffed057400,v{s{31,s{'world,'anyone}}},3 response:: '/lock/READ-0000000001
  32. 17:44:29.851 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 3,8 replyHeader:: 3,30064771081,0 request:: '/lock,F response:: v{'READ-0000000001}
  33. READ-0000000001
  34. I am TestNode 2
  35. 17:44:29.859 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980001, packet:: clientPath:null serverPath:null finished:false header:: 4,2 replyHeader:: 4,30064771082,0 request:: '/lock/READ-0000000001,-1 response:: null
  36. 17:44:29.860 [main] DEBUG org.I0Itec.zkclient.ZkConnection - Creating new ZookKeeper instance to connect to 127.0.0.1:2181.
  37. 17:44:29.860 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6e8dacdf
  38. 17:44:29.860 [ZkClient-EventThread-18-127.0.0.1:2181] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
  39. 17:44:29.862 [main] DEBUG org.I0Itec.zkclient.ZkClient - Awaiting connection to Zookeeper server
  40. 17:44:29.862 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
  41. 17:44:29.862 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
  42. 17:44:29.863 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
  43. 17:44:29.863 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on 127.0.0.1/127.0.0.1:2181
  44. 17:44:29.870 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x100026588980002, negotiated timeout = 30000
  45. 17:44:29.870 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Received event: WatchedEvent state:SyncConnected type:None path:null
  46. 17:44:29.870 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
  47. 17:44:29.871 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Leaving process event
  48. 17:44:29.871 [main] DEBUG org.I0Itec.zkclient.ZkClient - State is SyncConnected
  49. 17:44:29.875 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,30064771083,0 request:: '/lock,F response:: s{30064771077,30064771077,1607247859574,1607247859574,0,4,0,0,0,0,30064771082}
  50. 17:44:29.882 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 2,1 replyHeader:: 2,30064771084,0 request:: '/lock/WRITE-,#ffffffacffffffed057400,v{s{31,s{'world,'anyone}}},3 response:: '/lock/WRITE-0000000002
  51. 17:44:29.885 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 3,8 replyHeader:: 3,30064771084,0 request:: '/lock,F response:: v{'WRITE-0000000002}
  52. I am testNode 3
  53. 睡眠10s 之后释放分布式写锁, 开始倒计时
  54. 10
  55. 9
  56. 8
  57. 7
  58. 6
  59. 5
  60. 4
  61. 3
  62. 2
  63. 1
  64. 17:44:39.790 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980000 after 5ms
  65. 17:44:39.864 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980001 after 3ms
  66. 17:44:39.898 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x100026588980002 after 5ms
  67. 17:44:39.992 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980002, packet:: clientPath:null serverPath:null finished:false header:: 4,2 replyHeader:: 4,30064771085,0 request:: '/lock/WRITE-0000000002,-1 response:: null
  68. 17:44:39.993 [main] DEBUG org.I0Itec.zkclient.ZkConnection - Creating new ZookKeeper instance to connect to 127.0.0.1:2181.
  69. 17:44:39.994 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@7a79be86
  70. 17:44:39.994 [ZkClient-EventThread-21-127.0.0.1:2181] INFO org.I0Itec.zkclient.ZkEventThread - Starting ZkClient event thread.
  71. 17:44:40.002 [main] DEBUG org.I0Itec.zkclient.ZkClient - Awaiting connection to Zookeeper server
  72. 17:44:40.002 [main] INFO org.I0Itec.zkclient.ZkClient - Waiting for keeper state SyncConnected
  73. 17:44:40.003 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
  74. 17:44:40.005 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
  75. 17:44:40.005 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on 127.0.0.1/127.0.0.1:2181
  76. 17:44:40.021 [main-SendThread(127.0.0.1:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x100026588980003, negotiated timeout = 30000
  77. 17:44:40.021 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Received event: WatchedEvent state:SyncConnected type:None path:null
  78. 17:44:40.021 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient - zookeeper state changed (SyncConnected)
  79. 17:44:40.021 [main-EventThread] DEBUG org.I0Itec.zkclient.ZkClient - Leaving process event
  80. 17:44:40.022 [main] DEBUG org.I0Itec.zkclient.ZkClient - State is SyncConnected
  81. 17:44:40.032 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 1,3 replyHeader:: 1,30064771086,0 request:: '/lock,F response:: s{30064771077,30064771077,1607247859574,1607247859574,0,6,0,0,0,0,30064771085}
  82. 17:44:40.042 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 2,1 replyHeader:: 2,30064771087,0 request:: '/lock/READ-,#ffffffacffffffed057400,v{s{31,s{'world,'anyone}}},3 response:: '/lock/READ-0000000003
  83. 17:44:40.046 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 3,8 replyHeader:: 3,30064771087,0 request:: '/lock,F response:: v{'READ-0000000003}
  84. READ-0000000003
  85. I am TestNode 4
  86. 17:44:40.052 [main-SendThread(127.0.0.1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x100026588980003, packet:: clientPath:null serverPath:null finished:false header:: 4,2 replyHeader:: 4,30064771088,0 request:: '/lock/READ-0000000003,-1 response:: null
  87. Process finished with exit code 0

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/965761
推荐阅读
相关标签
  

闽ICP备14008679号