当前位置:   article > 正文

分布式锁3: zk实现分布式锁3 使用临时顺序节点+watch监听实现阻塞锁_zk watch 与监听事件

zk watch 与监听事件

一  zk实现分布式锁

1.1 使用临时顺序节点 的问题

接上一篇文章,每个请求要想正常的执行完成,最终都是要创建节点,如果能够避免争抢必然可以提高性能。这里借助于zk的临时序列化节点,实现分布式锁

1. 主要修改了构造方法和lock方法:

2.并添加了getPreNode获取前置节点的方法。

存在的问题就是羊群效应。 

1.2  使用临时顺序节点+watch监听实现阻塞锁

假如当前有1000个节点在等待锁,如果获得锁的客户端释放锁时,这1000个客户端都会被唤醒,这种情况称为“羊群效应”。

在这种羊群效应中,zookeeper需要通知1000个客户端,这会阻塞其他的操作,最好的情况应该只唤醒新的最小节点对应的客户端。应该怎么做呢?在设置事件监听时,每个客户端应该对刚好在它之前的子节点设置事件监听,例如子节点列表为/locks/lock-0000000000、/locks/lock-0000000001、/locks/lock-0000000002,序号为1的客户端监听序号为0的子节点删除消息,序号为2的监听序号为1的子节点删除消息。

1.3  使用临时顺序节点+watch监听实现阻塞锁的算法逻辑

1.客户端连接zookeeper,并在/lock下创建临时的且有序的子节点,第一个客户端对应的子节点为/locks/lock-0000000000,第二个为/locks/lock-0000000001,以此类推;

2.客户端获取/lock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听刚好在自己之前一位的子节点删除消息,获得子节点变更通知后重复此步骤直至获得锁;

3.执行业务代码;

4.完成业务流程后,删除对应的子节点释放锁。

二  操作步骤

2.1 代码

2.1.1 zk的客户端

初始化则创建临时序列化节点

2.1.2 分布式锁代码

  1. package com.atguigu.distributed.lock.config;
  2. import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.apache.zookeeper.*;
  5. import java.util.List;
  6. import java.util.concurrent.CountDownLatch;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.locks.Condition;
  9. import java.util.concurrent.locks.Lock;
  10. /**
  11. * @ClassName: ZkDistributedTempLock
  12. * @Description: TODO
  13. * @Author: admin
  14. * @Date: 2024/01/06 11:05:09 
  15. * @Version: V1.0
  16. **/
  17. public class ZkDistributedTempLock implements Lock {
  18. private static final String ROOT_PATH = "/d-zk";
  19. private String path;
  20. private ZooKeeper zooKeeper;
  21. public ZkDistributedTempLock(ZooKeeper zooKeeper,String lockName) throws KeeperException, InterruptedException {
  22. this.zooKeeper = zooKeeper;
  23. // this.path = ROOT_PATH + "/" + lockName+"-";
  24. this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  25. }
  26. public void lock() {
  27. try {
  28. String preNode = getPreNode(path);
  29. // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
  30. if (StringUtils.isEmpty(preNode)) {
  31. return;
  32. } else {//有前一个节点,阻塞,对前一个节点进行监听
  33. CountDownLatch countDownLatch = new CountDownLatch(1);
  34. if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
  35. @Override
  36. public void process(WatchedEvent event) {
  37. System.out.println("当前节点=="+path+" 前一个节点:"+ROOT_PATH + "/" + preNode);
  38. countDownLatch.countDown();
  39. }
  40. }) == null) {
  41. return;
  42. }
  43. // 阻塞。。。。
  44. countDownLatch.await();
  45. return;
  46. }
  47. } catch (InterruptedException | KeeperException e) {
  48. // 重新检查。是否获取到锁
  49. try {
  50. Thread.sleep(20);
  51. } catch (InterruptedException ex) {
  52. ex.printStackTrace();
  53. }
  54. lock();
  55. }
  56. }
  57. @Override
  58. public void lockInterruptibly() throws InterruptedException {
  59. }
  60. @Override
  61. public boolean tryLock() {
  62. return false;
  63. }
  64. @Override
  65. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  66. return false;
  67. }
  68. @Override
  69. public void unlock(){
  70. try {
  71. this.zooKeeper.delete(path, 0);
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. } catch (KeeperException e) {
  75. e.printStackTrace();
  76. }
  77. }
  78. /**
  79. * 获取指定节点的前节点
  80. * @param path
  81. * @return
  82. */
  83. private String getPreNode(String path){
  84. System.out.println("path:"+path);
  85. try {
  86. // 获取当前节点的序列化号
  87. Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
  88. // 获取根路径下的所有序列化子节点
  89. List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
  90. // 判空
  91. if (CollectionUtils.isEmpty(nodes)){
  92. return null;
  93. }
  94. // 获取前一个节点
  95. Long flag = 0L;
  96. String preNode = null;
  97. for (String node : nodes) {
  98. // 获取每个节点的序列化号
  99. Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
  100. if (serial < curSerial && serial > flag){
  101. flag = serial;
  102. preNode = node;
  103. }
  104. }
  105. return preNode;
  106. } catch (KeeperException e) {
  107. e.printStackTrace();
  108. } catch (InterruptedException e) {
  109. e.printStackTrace();
  110. }
  111. return null;
  112. }
  113. @Override
  114. public Condition newCondition() {
  115. return null;
  116. }
  117. }

2.1.3 service层

2.1.4 controller层

2.2 nginx代理多服务节点访问

1.服务启动

2.nginx启动

3.jemeter访问

2.3 测试

数据库初始化

测试后

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/852263
推荐阅读
相关标签
  

闽ICP备14008679号