当前位置:   article > 正文

分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)

分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)

目录

一、基于zookeeper的分布式锁

1.1 基于Zookeeper实现分布式锁的原理

1.1.1 分布式锁特性说明

1.1.1.1 特点分析

1.1.1.2 本质

1.1.2 Zookeeper 分布式锁实现原理

1.1.2.1 Zookeeper临时顺序节点特性

1.1.2.2 Zookeeper满足分布式锁基本要求

1.1.2.3 Watcher机制

1.1.2.3 总结

1.2 分布式锁流程说明

1.2.1 分布式锁流程图

1.2.2 流程说明

1.3 分布式锁代码实现

1.3.1 自己手写,实现Lock接口

1.3.1.1 分布式锁ZookeeperDistributedLock

1.3.1.2 模拟下单处理OrderServiceHandle

1.3.1.3 订单号生成类OrderCodeGenerator

1.3.1.4 分布式锁测试类TestZookeeperDistributedLock

1.3.1.5 测试效果

1.3.2 基于Apache Curator 框架调用

1.3.2.1 maven依赖

1.3.2.2 代码实现

1.3.2.2.1 分布式锁类CuratorDistributeLock

1.3.2.2.2 测试类TestCuratorDistributedLock

1.3.2.3 执行结果


一、基于zookeeper的分布式锁

1.1 基于Zookeeper实现分布式锁的原理

1.1.1 分布式锁特性说明

1.1.1.1 特点分析
  • 每次只能一个占用锁;
  • 可以重复进入锁;
  • 只有占用者才可以解锁;
  • 获取锁和释放锁都需要原子
  • 不能产生死锁
  • 尽量满足性能
1.1.1.2 本质

同步互斥,使得处理任务能够一个一个逐步的过临界资源。

1.1.2 Zookeeper 分布式锁实现原理

1.1.2.1 Zookeeper临时顺序节点特性

zookeeper中有一种临时顺序节点,它具有以下特征:

  • 时效性,当会话结束,节点将自动被删除
  • 顺序性,当多个应用向其注册顺序节点时,每个顺序号将只能被一个应用获取
1.1.2.2 Zookeeper满足分布式锁基本要求
  1. 因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的 每次只能一个占用锁,因为只有它一个获取到,所以可以实现 重复进入 ,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁
  2. zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的
  3. 因为注册的是临时节点,在会话期间内有效,所以不会产生死锁
  4. zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能
1.1.2.3 Watcher机制

Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事
件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然
后客户端根据 Watcher 通知状态和事件类型做出业务上的改变。

在实现分布式锁的时候,主要利用这个机制,实现释放锁的时候,通知等待锁的线程竞争锁。

1.1.2.3 总结

综上可知,Zookeeper其实是基于临时顺序节点特性实现的分布式锁。当然,还结合了他的Watcher机制,实现释放锁的时候,通知等待锁的线程去竞争锁。

1.2 分布式锁流程说明

1.2.1 分布式锁流程图

1.2.2 流程说明

  1. client判断/lock目录是否存在,如果不存在则向其注册/lock的持久节点
  2. client向/lock/目录下注册/lock/Node-前缀的临时顺序节点,并得到顺序号
  3. client获取/lock/目录下的所有临时顺序子节点
  4. client判断临时子节点序号中是否存在比自身的序号小的节点。如果不存在,则获取到锁;如果存在,则对象该临时节点做watch监控
  5. 获得锁的线程,执行业务逻辑,执行完之后,删除临时节点,完成锁的释放。
  6. 等待锁的线程如果收到监控的临时节点被删除的通知,则再重复4、5、6步骤,进入下一个获得锁、释放锁的循环。

1.3 分布式锁代码实现

1.3.1 自己手写,实现Lock接口

1.3.1.1 分布式锁ZookeeperDistributedLock
  1. package com.ningzhaosheng.distributelock.zookeeper;
  2. import org.I0Itec.zkclient.IZkDataListener;
  3. import org.I0Itec.zkclient.ZkClient;
  4. import org.I0Itec.zkclient.serialize.SerializableSerializer;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.util.Collections;
  8. import java.util.List;
  9. import java.util.concurrent.CountDownLatch;
  10. import java.util.concurrent.TimeUnit;
  11. import java.util.concurrent.locks.Condition;
  12. import java.util.concurrent.locks.Lock;
  13. /**
  14. * @author ningzhaosheng
  15. * @date 2024/4/17 18:13:38
  16. * @description 基于zookeeper实现的分布式锁
  17. */
  18. public class ZookeeperDistributedLock implements Lock {
  19. private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);
  20. // zookeeper 地址
  21. private String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
  22. // zookeeper 锁目录
  23. private String LOCK_PATH = "/LOCK";
  24. // 创建 zookeeper客户端zkClient
  25. private ZkClient client = null;
  26. private CountDownLatch cdl;
  27. // 当前请求的节点前一个节点
  28. private String beforePath;
  29. // 当前请求的节点
  30. private String currentPath;
  31. /**
  32. * 初始化客户端和创建LOCK目录
  33. *
  34. * @param ZOOKEEPER_IP_PORT
  35. * @param LOCK_PATH
  36. */
  37. public ZookeeperDistributedLock(String ZOOKEEPER_IP_PORT, String LOCK_PATH) {
  38. this.ZOOKEEPER_IP_PORT = ZOOKEEPER_IP_PORT;
  39. this.LOCK_PATH = LOCK_PATH;
  40. client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
  41. // 判断有没有LOCK目录,没有则创建
  42. if (!this.client.exists(LOCK_PATH)) {
  43. this.client.createPersistent(LOCK_PATH);
  44. }
  45. }
  46. @Override
  47. public void lock() {
  48. if (!tryLock()) {
  49. //对次小节点进行监听
  50. waitForLock();
  51. lock();
  52. } else {
  53. logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
  54. }
  55. }
  56. @Override
  57. public void lockInterruptibly() throws InterruptedException {
  58. }
  59. @Override
  60. public boolean tryLock() {
  61. // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
  62. if (currentPath == null || currentPath.length() <= 0) {
  63. // 创建一个临时顺序节点
  64. currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
  65. System.out.println("---------------------------->" + currentPath);
  66. }
  67. // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
  68. List<String> childrens = this.client.getChildren(LOCK_PATH);
  69. //由小到大排序所有子节点
  70. Collections.sort(childrens);
  71. //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
  72. if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
  73. return true;
  74. }
  75. //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
  76. else {
  77. int wz = Collections.binarySearch(childrens, currentPath.substring(6));
  78. beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
  79. }
  80. return false;
  81. }
  82. @Override
  83. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  84. return false;
  85. }
  86. //等待锁,对次小节点进行监听
  87. private void waitForLock() {
  88. IZkDataListener listener = new IZkDataListener() {
  89. public void handleDataDeleted(String dataPath) throws Exception {
  90. logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
  91. if (cdl != null) {
  92. cdl.countDown();
  93. }
  94. }
  95. public void handleDataChange(String dataPath, Object data) throws Exception {
  96. }
  97. };
  98. // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
  99. this.client.subscribeDataChanges(beforePath, listener);
  100. if (this.client.exists(beforePath)) {
  101. cdl = new CountDownLatch(1);
  102. try {
  103. cdl.await();
  104. } catch (InterruptedException e) {
  105. e.printStackTrace();
  106. }
  107. }
  108. this.client.unsubscribeDataChanges(beforePath, listener);
  109. }
  110. @Override
  111. public void unlock() {
  112. // 删除当前临时节点
  113. client.delete(currentPath);
  114. }
  115. @Override
  116. public Condition newCondition() {
  117. return null;
  118. }
  119. }
1.3.1.2 模拟下单处理OrderServiceHandle
  1. package com.ningzhaosheng.distributelock.zookeeper;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.concurrent.locks.Lock;
  6. /**
  7. * @author ningzhaosheng
  8. * @date 2024/4/17 21:45:46
  9. * @description 模拟订单处理
  10. */
  11. public class OrderServiceHandle implements Runnable {
  12. private static OrderCodeGenerator ong = new OrderCodeGenerator();
  13. private Logger logger = LoggerFactory.getLogger(OrderServiceHandle.class);
  14. // 按照线程数初始化倒计数器,倒计数器
  15. private CountDownLatch cdl = null;
  16. private Lock lock = null;
  17. public OrderServiceHandle(CountDownLatch cdl, Lock lock) {
  18. this.cdl = cdl;
  19. this.lock = lock;
  20. }
  21. // 创建订单
  22. public void createOrder() {
  23. String orderCode = null;
  24. //准备获取锁
  25. lock.lock();
  26. try {
  27. // 获取订单编号
  28. orderCode = ong.getOrderCode();
  29. } catch (Exception e) {
  30. // TODO: handle exception
  31. } finally {
  32. //完成业务逻辑以后释放锁
  33. lock.unlock();
  34. }
  35. // ……业务代码
  36. logger.info("insert into DB使用id:=======================>" + orderCode);
  37. }
  38. @Override
  39. public void run() {
  40. try {
  41. // 等待其他线程初始化
  42. cdl.await();
  43. } catch (InterruptedException e) {
  44. // TODO Auto-generated catch block
  45. e.printStackTrace();
  46. }
  47. // 创建订单
  48. createOrder();
  49. }
  50. }
1.3.1.3 订单号生成类OrderCodeGenerator
  1. package com.ningzhaosheng.distributelock.zookeeper;
  2. import java.text.SimpleDateFormat;
  3. import java.util.Date;
  4. /**
  5. * @author ningzhaosheng
  6. * @date 2024/4/17 21:44:06
  7. * @description 生成订单号
  8. */
  9. public class OrderCodeGenerator {
  10. // 自增长序列
  11. private static int i = 0;
  12. // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
  13. public String getOrderCode() {
  14. Date now = new Date();
  15. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
  16. return sdf.format(now) + ++i;
  17. }
  18. }
1.3.1.4 分布式锁测试类TestZookeeperDistributedLock
  1. package com.ningzhaosheng.distributelock.zookeeper;
  2. import java.util.concurrent.CountDownLatch;
  3. import java.util.concurrent.locks.Lock;
  4. /**
  5. * @author ningzhaosheng
  6. * @date 2024/4/17 21:48:28
  7. * @description zookeeper分布式锁测试类
  8. */
  9. public class TestZookeeperDistributedLock {
  10. public static void main(String[] args) {
  11. // zookeeper 地址
  12. String ZOOKEEPER_IP_PORT = "192.168.31.9:2181";
  13. // zookeeper 锁目录
  14. String LOCK_PATH = "/LOCK";
  15. // 线程并发数
  16. int NUM = 10;
  17. CountDownLatch cdl = new CountDownLatch(NUM);
  18. for (int i = 1; i <= NUM; i++) {
  19. // 按照线程数迭代实例化线程
  20. Lock lock = new ZookeeperDistributedLock(ZOOKEEPER_IP_PORT, LOCK_PATH);
  21. new Thread(new OrderServiceHandle(cdl, lock)).start();
  22. // 创建一个线程,倒计数器减1
  23. cdl.countDown();
  24. }
  25. }
  26. }
1.3.1.5 测试效果

从上图执行结果中可以看出,在多线程情况下,分布式锁获取和释放正常。

1.3.2 基于Apache Curator 框架调用

1.3.2.1 maven依赖
  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-framework</artifactId>
  4. <version>5.2.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-recipes</artifactId>
  9. <version>5.2.0</version>
  10. </dependency>
1.3.2.2 代码实现

这里模拟业务使用分布式锁,还是使用的OrderServiceHandle类,这里只给出分布式锁实现类和测试类,不再给出OrderServiceHandle代码,可以参考上一小节的OrderServiceHandle类。

1.3.2.2.1 分布式锁类CuratorDistributeLock
  1. package com.ningzhaosheng.distributelock.zookeeper.curator;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.concurrent.locks.Condition;
  8. import java.util.concurrent.locks.Lock;
  9. /**
  10. * @author ningzhaosheng
  11. * @date 2024/4/17 22:03:45
  12. * @description 实现Lock接口(其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装)
  13. */
  14. public class CuratorDistributeLock implements Lock {
  15. private CuratorFramework client;
  16. private InterProcessMutex mutex;
  17. public CuratorDistributeLock(String connString, String lockPath) {
  18. this(connString, lockPath, new ExponentialBackoffRetry(3000,5));
  19. }
  20. public CuratorDistributeLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {
  21. try {
  22. client = CuratorFrameworkFactory.builder()
  23. .connectString(connString)
  24. .retryPolicy(retryPolicy)
  25. .build();
  26. client.start();
  27. mutex = new InterProcessMutex(client, lockPath);
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. @Override
  33. public void lock() {
  34. try {
  35. // 获取锁
  36. mutex.acquire();
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. @Override
  42. public void lockInterruptibly() throws InterruptedException {
  43. }
  44. @Override
  45. public boolean tryLock() {
  46. return false;
  47. }
  48. @Override
  49. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
  50. return false;
  51. }
  52. @Override
  53. public void unlock() {
  54. try {
  55. // 释放锁
  56. mutex.release();
  57. } catch (Exception e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. @Override
  62. public Condition newCondition() {
  63. return null;
  64. }
  65. }
1.3.2.2.2 测试类TestCuratorDistributedLock
  1. package com.ningzhaosheng.distributelock.zookeeper.curator;
  2. import com.ningzhaosheng.distributelock.zookeeper.OrderServiceHandle;
  3. import java.util.concurrent.CountDownLatch;
  4. /**
  5. * @author ningzhaosheng
  6. * @date 2024/4/17 21:54:33
  7. * @description 基于 apache curator分布式锁测试类
  8. */
  9. public class TestCuratorDistributedLock {
  10. private static final String ZK_ADDRESS = "192.168.31.9:2181";
  11. private static final String LOCK_PATH = "/distributed_lock";
  12. public static void main(String[] args) {
  13. int NUM = 10;
  14. CountDownLatch cdl = new CountDownLatch(NUM);
  15. for (int i = 1; i <= NUM; i++) {
  16. // 按照线程数迭代实例化线程
  17. /** 创建CuratorDistributeLock
  18. * 其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装
  19. */
  20. CuratorDistributeLock curatorDistributeLock = new CuratorDistributeLock(ZK_ADDRESS,LOCK_PATH);
  21. new Thread(new OrderServiceHandle(cdl, curatorDistributeLock)).start();
  22. // 创建一个线程,倒计数器减1
  23. cdl.countDown();
  24. }
  25. }
  26. }
1.3.2.3 执行结果

从执行结果可以看出,基于apche curator框架实现zookeeper锁,它也是按照临时顺序节点的顺序获取锁的,每次获得锁的节点都是最小顺序节点,然后等待锁的线程,会基于watcher机制,每次给最小临时顺序节点加回调,监听节点的变更(即释放锁的线程会删除节点),然后再重新判断最小临时顺序节点,最小的获得锁执行,依次循环完成。

好了,本次内容就分享到这,欢迎关注本博主。如果有帮助到大家,欢迎大家点赞+关注+收藏,有疑问也欢迎大家评论留言!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/473376
推荐阅读
相关标签
  

闽ICP备14008679号