当前位置:   article > 正文

zk实现分布式锁_zk wait lock for

zk wait lock for

一、为什么需要分布式锁

    如果服务运行在多台服务器上,或者运行在多个JVM上,对于一些公共资源,就需要有锁保证资源的独享性。比如,网上银行存钱的时候,客户端发出存钱的指令,被分发到多台服务器上,同时执行了存钱的服务,就可能会导致数据库中有两条相同的记录。所以需要保证同一时刻,只有一个客户端在执行这条指令。

二、分布式锁方案

1、每一次获取锁的时候,都在根节点下创建临时序列化节点,释放锁的时候删除该节点;

2、获取锁时,调用根节点的getChildren()方法,查看所有的子节点,如果发现自己创建的节点是子节点中序列号最小,就定义客户端获取到了锁;否则,找到比自己小的前一个节点,注册监听事件;如果比自己小的节点不存在,再次判断自己所创建的节点是否最小。

zk分布式锁的原理不做多述,直接上代码:

I. 分布式锁的接口

  1. public interface DistributedLock {
  2. /**
  3. * 获取分布式锁
  4. * @return
  5. */
  6. boolean dLock();
  7. /**
  8. * 在一定时间内获取分布式锁
  9. * @param time
  10. * @return
  11. */
  12. boolean dLock(long time);
  13. /**
  14. * 释放分布式锁
  15. */
  16. void unDLock();
  17. }

II. 分布式锁的实现

  1. public class DefaultDistributedLock implements DistributedLock, Watcher {
  2. private ZooKeeper zkClient;
  3. // 分布式锁持久化节点名称
  4. private static String LOCK_PERSIST= "/DIS_LOCK";
  5. // 临时节点前缀
  6. private static String LOCK_ELEPHANT_PREFIX = LOCK_PERSIST+"/dis_";
  7. // zk连接的ip
  8. private String ips;
  9. // session过期时间
  10. private static int sessionTimeout = 300000;
  11. // 主线程等待连接建立好后才启动
  12. private CountDownLatch connectedSemaphore = new CountDownLatch(1);
  13. // 当前线程创建临时节点后返回的路径
  14. private String selfPath;
  15. // 等锁路径
  16. private String waitPath;
  17. private String lockName;
  18. private CountDownLatch latch;
  19. public DefaultDistributedLock(String ips, String lockName) {
  20. this(ips,sessionTimeout, lockName);
  21. }
  22. public DefaultDistributedLock(String ips, int sessionTimeout, String lockName) {
  23. this.ips = ips;
  24. this.sessionTimeout = sessionTimeout;
  25. this.lockName = lockName;
  26. createRootNode(LOCK_PERSIST,"根节点");
  27. }
  28. public boolean dLock() {
  29. try {
  30. selfPath = zkClient.create(LOCK_ELEPHANT_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  31. CreateMode.EPHEMERAL_SEQUENTIAL);
  32. System.out.println(lockName+" 创建临时节点路径"+selfPath);
  33. return checkMinPath(selfPath);
  34. } catch (Exception e) {
  35. e.printStackTrace();
  36. return false;
  37. }
  38. }
  39. public boolean dLock(long time){
  40. try {
  41. if(dLock()){
  42. return true;
  43. }
  44. return waitForLock(time);
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. }
  48. return false;
  49. }
  50. public void unDLock() {
  51. System.out.println(lockName+"删除本节点:" + selfPath);
  52. try {
  53. zkClient.delete(selfPath, -1);
  54. selfPath = null;
  55. releaseConnection();
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. } catch (KeeperException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. public void releaseConnection() {
  63. if (this.zkClient != null) {
  64. try {
  65. this.zkClient.close();
  66. } catch (InterruptedException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. System.out.println(lockName+"释放连接");
  71. }
  72. // 判断比自己小的一个节点是否存在,如果不存在,直接返回,无需等待
  73. private boolean waitForLock(long t) throws KeeperException, InterruptedException {
  74. Stat stat = zkClient.exists(waitPath, true);
  75. if(stat != null){
  76. this.latch = new CountDownLatch(1);
  77. // 如果超过等待时间会抛出异常
  78. latch.await(t, TimeUnit.MILLISECONDS);
  79. this.latch = null;
  80. }
  81. return true;
  82. }
  83. // 校验本线程创建的节点是否是所有节点中的最小节点
  84. private boolean checkMinPath(String selfPath) throws KeeperException, InterruptedException {
  85. List<String> subNodes = zkClient.getChildren(LOCK_PERSIST,false);
  86. Collections.sort(subNodes);
  87. String str = selfPath.substring(LOCK_PERSIST.length()+1);
  88. int index = subNodes.indexOf(str);
  89. switch (index){
  90. case -1:{
  91. System.out.println(lockName+"--本节点已不在了..." + selfPath);
  92. return false;
  93. }
  94. case 0:{
  95. System.out.println(lockName+"--本节点是最小节点..." + selfPath);
  96. return true;
  97. }
  98. default:{
  99. waitPath = LOCK_PERSIST+"/"+subNodes.get(index-1);
  100. System.out.println(lockName+"--获取子节点中,排在我前面的"+ waitPath);
  101. // 对前一个节点注册监听事件
  102. try {
  103. zkClient.getData(waitPath,true,new Stat());
  104. return false;
  105. } catch (Exception e) {
  106. // 跑出异常的时候,判断节点是否存在
  107. if(zkClient.exists(waitPath, false) == null){
  108. System.out.println(lockName+ "--子节点中,排在我前面的" + waitPath + "已失踪,重新检查");
  109. return checkMinPath(selfPath);
  110. }else{
  111. throw new RuntimeException(waitPath+"node disappered");
  112. }
  113. }
  114. }
  115. }
  116. }
  117. // 创建根节点,根节点不需要进行watch
  118. private boolean createRootNode(String path, String data) {
  119. try {
  120. createConnection();
  121. if(zkClient.exists(path, false) == null){
  122. String retPath = zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  123. System.out.println("创建根节点:path" + retPath + "content" + data);
  124. }
  125. return true;
  126. } catch (Exception e) {
  127. e.printStackTrace();
  128. }
  129. return false;
  130. }
  131. private void createConnection() throws IOException, InterruptedException {
  132. if(zkClient == null){
  133. zkClient = new ZooKeeper(ips, sessionTimeout, this);
  134. connectedSemaphore.await();
  135. }
  136. }
  137. public void process(WatchedEvent watchedEvent) {
  138. if(watchedEvent == null){
  139. return;
  140. }
  141. Event.EventType eventType = watchedEvent.getType();
  142. Event.KeeperState state = watchedEvent.getState();
  143. if(Event.KeeperState.SyncConnected == state){
  144. if(Event.EventType.None == eventType){
  145. System.out.println("正在启动连接服务器");
  146. connectedSemaphore.countDown();
  147. }else if (Event.KeeperState.Disconnected == state) {
  148. System.out.println("与ZK服务器断开连接");
  149. } else if (Event.KeeperState.Expired == state) {
  150. System.out.println("会话失效");
  151. }
  152. }
  153. }
  154. }

3、测试用例

  1. public class DisMainTest {
  2. private static final int THREAD_NUM = 100;
  3. private static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);
  4. public static void main(String[] args) {
  5. final String ips = "localhost:2181,localhost:2182";
  6. for(int i=0; i< THREAD_NUM;i++){
  7. final int threadId = i + 1;
  8. new Thread(){
  9. @Override
  10. public void run() {
  11. try {
  12. DistributedLock dc = new DefaultDistributedLock(ips, threadId + "");
  13. if (dc.dLock()) {
  14. System.out.println(threadId+"获取到锁,并执行了任务");
  15. }
  16. dc.unDLock();
  17. threadSemaphore.countDown();
  18. } catch (Exception e) {
  19. System.out.println("第" + threadId + "个线程抛出的异常:");
  20. e.printStackTrace();
  21. }
  22. }
  23. }.start();
  24. }
  25. try {
  26. threadSemaphore.await();
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/997525
推荐阅读
相关标签
  

闽ICP备14008679号