当前位置:   article > 正文

Zookeeper分布式锁小示例

Zookeeper分布式锁小示例

目录

 

一、引言

1.1 为何用分布式锁

1.2 怎么用分布式锁

1.3 Zookeeper分布式锁简介

二、Zookeeper使用案例

三、Zookeeper分布式锁小示例

3.1 源码展示:

3.2 轻量解读

3.3 结构重构

4. 尾注(2018年9月12日)

4.1 本文第二三节的案例描述以及代码分析是错误的!!!

4.2 Zookeeper 的分布式锁实现原理的基础依赖

4.3  Zookeeper 的分布式锁实现原理的基础依赖


 

一、引言

前两天一个小伙伴对分布式锁有些疑问,闲下来了顺带写个示例展示一下。代码很基础,重在思维的理解。

文末会有对Redis和Zookeeper的分布式锁的实现依赖描述。

 

1.1 为何用分布式锁

当有很多的机器,机器彼此间不可探明彼此的存在,又:

  1. 不想把所有负载全压在一处(数据库),希望在能控制的地方尽量均衡负载。
  2. 对于某个对象写互斥,必须得先获取该对象的锁。

这个时候使用个分布式锁最好啦。

1.2 怎么用分布式锁

很久以前用过别人写的Redis分布式锁,小伙伴说到zookeeper实现的。实际上原理都是一样:

某个性能彪悍的服务持有所有机器唯一的key值。

自然,这方面稳定性、正确性、性能肯定是考虑要素咯。 

两者都能够在唯一节点(key)存放唯一值。而两者性能又是极其彪悍且稳定。自然用作分布式锁的服务很有效率。

1.3 Zookeeper分布式锁简介

比较通俗易懂的理解方式: Linux的文件系统。

给定一个路径一定能标识一个文件(不管他是文件还是文件夹还是连接还是块)。

Zookeeper也可以这么理解。因此只要探测到某个路径下有值,就可以判断该路径已经被占领。进而需要等待锁。

 

二、Zookeeper使用案例

使用Zookeeper做分布式锁, 知道它怎么用最为关键。

实现分布式锁直接看第三节

如下代码片段

  1. import java.io.IOException;
  2. import java.util.List;
  3. import java.util.concurrent.CountDownLatch;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.KeeperException;
  6. import org.apache.zookeeper.WatchedEvent;
  7. import org.apache.zookeeper.Watcher;
  8. import org.apache.zookeeper.ZooDefs;
  9. import org.apache.zookeeper.ZooKeeper;
  10. import org.apache.zookeeper.data.Stat;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. public class ZkConnection implements Watcher {
  14. private static final Logger logger = LoggerFactory.getLogger(ZkConnection.class);
  15. private String conn = "10.33.108.67:2800";
  16. private int timeOut = 3000;
  17. private ZooKeeper zooKeeper;
  18. private CountDownLatch latch = new CountDownLatch(1);
  19. public ZkConnection() throws IOException, InterruptedException {
  20. zooKeeper = new ZooKeeper(conn, timeOut, this);
  21. // 连接会耗时. 可以选择一直等待连接完毕或者等待一定时长
  22. latch.await();
  23. }
  24. public void createNode(String path, String value) throws KeeperException, InterruptedException {
  25. // 选择掉线立即删除的模式, 2018年9月12日 并且!!! 带有自增序列.
  26. // 注意:临时节点不能有子节点
  27. String node;
  28. logger.info("去zookeeper上创建以path:{} 为路径的节点, 是顺序节点. 通过序列号判断是第几个创建的. 创建结果:{}", path,
  29. node = zooKeeper.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
  30. List<String> children = zooKeeper.getChildren(path.substring(0, path.lastIndexOf("/")), false);
  31. int idx = Integer.valueOf(node.substring(path.length()));
  32. int min = Integer.MAX_VALUE;
  33. for (String s: children) {
  34. int current;
  35. if ((current = Integer.valueOf(s.substring(path.length()))) < min) {
  36. min = current;
  37. }
  38. }
  39. logger.info("如果接下来的这个值:{} 最小值:{}(因为代码限制一旦手动释放或者断开连接, 节点删除), 则表示获取到了锁, 否则是没有获取到的。", idx, min);
  40. }
  41. public boolean exist(String path) throws KeeperException, InterruptedException {
  42. Stat stat = zooKeeper.exists(path, this);
  43. return stat != null;
  44. }
  45. public String get(String path) throws KeeperException, InterruptedException, IOException {
  46. if (exist(path)) {
  47. return new String(zooKeeper.getData(path, this, null));
  48. }
  49. throw new IOException("not exist");
  50. }
  51. public void release() throws InterruptedException {
  52. zooKeeper.close();
  53. }
  54. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  55. ZkConnection connection = new ZkConnection();
  56. String path = "/lock/getLock";
  57. for (int i = 0; i < 1000; i ++) {
  58. new Thread(new Runnable() {
  59. @Override
  60. public void run() {
  61. try {
  62. connection.createNode(path, "Hello World");
  63. } catch (KeeperException e) {
  64. e.printStackTrace();
  65. } catch (InterruptedException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. }).start();
  70. }
  71. connection.release();
  72. }
  73. @Override
  74. public void process(WatchedEvent watchedEvent) {
  75. if (watchedEvent == null) {
  76. return;
  77. }
  78. Event.KeeperState state = watchedEvent.getState();
  79. Event.EventType type = watchedEvent.getType();
  80. if (Event.KeeperState.SyncConnected == state
  81. && Event.EventType.None == type) {
  82. // 连接成功
  83. latch.countDown();
  84. } else {
  85. throw new RuntimeException("Error!");
  86. }
  87. }
  88. }

可以看到输出。

 

三、Zookeeper分布式锁小示例

3.1 源码展示:

ZkClient.java:

  1. import org.apache.commons.collections.CollectionUtils;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.stereotype.Service;
  8. import javax.annotation.PreDestroy;
  9. import java.io.IOException;
  10. import java.util.Collections;
  11. import java.util.List;
  12. import java.util.concurrent.CountDownLatch;
  13. /**
  14. * Created by yupeng.qin on 2016/4/20.
  15. */
  16. // @Service
  17. public class ZkClient implements Watcher {
  18. private static final Logger logger = LoggerFactory.getLogger(ZkClient.class);
  19. // @Value("#{placeholderConfigurer['zookeeper.conn.url']}")
  20. private String conn = "192.168.56.101:2181";
  21. // @Value("#{placeholderConfigurer['zookeeper.conn.timeOut']}")
  22. private Integer timeOut = 3000;
  23. private final static String ROOT = "/lock";
  24. private ZooKeeper zooKeeper;
  25. private CountDownLatch latch;
  26. private static ZkClient client = new ZkClient();
  27. public static ZkClient getInstance() {
  28. return client;
  29. }
  30. public ZkClient() {
  31. tryConnect();
  32. try {
  33. createNode(ROOT, false);
  34. } catch (KeeperException e) {
  35. // do something
  36. } catch (InterruptedException e) {
  37. // do something
  38. }
  39. }
  40. private void tryConnect() {
  41. int i = 0;
  42. do {
  43. try {
  44. if (conn()) {
  45. return;
  46. }
  47. } catch (Exception e) {
  48. logger.error("conn error. ", e);
  49. }
  50. } while (i++ < 3);
  51. throw new RuntimeException("Error!");
  52. }
  53. private boolean conn() throws IOException, InterruptedException {
  54. latch = new CountDownLatch(1);
  55. zooKeeper = new ZooKeeper(conn, timeOut, this);
  56. latch.await();
  57. logger.info("Zookeeper Connection Created! Ready to connection! url :{}", conn);
  58. return true;
  59. }
  60. /**
  61. * 这里的一个缺点是 关注返回值, 该值用于unlock节点
  62. * @param lock 申请的锁
  63. * @return 一个临时的变量
  64. */
  65. public String lock(String lock) {
  66. int rand = (int) (Math.random() * 100);
  67. lock = ROOT+"/"+lock;
  68. String lockCode = lock;
  69. try {
  70. try{
  71. createNode(lock, false);
  72. } catch (Exception e) {
  73. // do nothing
  74. }
  75. lockCode = lockCode(rand, lock);
  76. createNode(lockCode, true);
  77. while (true) {
  78. String current = lockCode.substring(lockCode.lastIndexOf('/')+1);
  79. List<String> child = zooKeeper.getChildren(lock, this);
  80. if (CollectionUtils.size(child) <= 1) {
  81. if (logger.isDebugEnabled()) {
  82. logger.debug("运气好, {} 直接拿到了锁", lockCode);
  83. }
  84. return lockCode;
  85. }
  86. if (logger.isDebugEnabled()) {
  87. logger.debug("运气不佳, {} 还有这么多 {} 等待", lockCode, child.size());
  88. }
  89. Collections.sort(child);
  90. if (child.get(0).equals(current)) {
  91. return lockCode;
  92. }
  93. Thread.sleep(50);
  94. }
  95. } catch (KeeperException e) {
  96. // do something
  97. } catch (InterruptedException e) {
  98. // do something
  99. }
  100. return lockCode;
  101. }
  102. public void unlock(String code) throws KeeperException, InterruptedException {
  103. if (logger.isDebugEnabled()) {
  104. logger.debug("{} 节点已经被删除", code);
  105. }
  106. zooKeeper.delete(code, -1);
  107. }
  108. private String lockCode(int rand, String lock) {
  109. return new StringBuilder(lock).append("/").append(System.currentTimeMillis())
  110. .append("T").append(rand).toString();
  111. }
  112. private void createNode(String node, boolean lose) throws KeeperException, InterruptedException {
  113. if (!exist(node)) {
  114. // 选择掉线立即删除的节点
  115. zooKeeper.create(node, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
  116. lose ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT);
  117. }
  118. }
  119. private boolean exist(String path) throws KeeperException, InterruptedException {
  120. Stat stat = zooKeeper.exists(path, this);
  121. return stat != null;
  122. }
  123. @PreDestroy
  124. public void release() throws InterruptedException {
  125. zooKeeper.close();
  126. }
  127. @Override
  128. public void process(WatchedEvent watchedEvent) {
  129. if (watchedEvent == null) {
  130. return;
  131. }
  132. Event.KeeperState state = watchedEvent.getState();
  133. Event.EventType type = watchedEvent.getType();
  134. if (Event.KeeperState.SyncConnected == state) {
  135. if ( Event.EventType.None == type ) {
  136. latch.countDown();
  137. } else if (Event.EventType.NodeDeleted == type) {
  138. }
  139. } else if (Event.KeeperState.Disconnected == state){
  140. tryConnect();
  141. }
  142. throw new RuntimeException("Error!");
  143. }
  144. }

 

 

运行的线程: RequestThread.java:

  1. import org.apache.zookeeper.KeeperException;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. public class RequestThread extends Thread {
  5. private static final Logger logger = LoggerFactory.getLogger(RequestThread.class);
  6. private int threadId;
  7. private String lock;
  8. public RequestThread(int threadId, String lock) {
  9. this.threadId = threadId;
  10. this.lock = lock;
  11. }
  12. public void run() {
  13. logger.debug("线程号:{} 申请获得 {} 的锁", threadId, lock);
  14. String code = ZkClient.getInstance().lock(lock);
  15. logger.debug("线程号:{} 申请获得 {} 的锁 成功!", threadId, lock);
  16. try {
  17. Thread.sleep(1000);
  18. ZkClient.getInstance().unlock(code);
  19. } catch (KeeperException e) {
  20. e.printStackTrace();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. }

 

测试的代码:

ZkClientTest.java:

  1. public class ZkClientTest {
  2. public static void main(String[] args) {
  3. String[] locks = {"5000", "6000", "7000", "8000"};
  4. for (int i = 0; i < 10; i ++) {
  5. new Thread(new RequestThread(i, locks[i%locks.length])).start();
  6. }
  7. }
  8. }



能看得到输出还是很感人的。

 

3.2 轻量解读

登陆zk服务器, 查看服务器上现有的节点(可以分别在程序运行时查和运行结束之后查):

运行结束后:

  1. [zk: localhost:2181(CONNECTED) 6] ls /
  2. [lock, zookeeper]
  3. [zk: localhost:2181(CONNECTED) 7] ls /lock
  4. [7000, 100, 200, 8000, 300, 6000, 5000, 400, 500, 600, 700, 800]
  5. [zk: localhost:2181(CONNECTED) 8] ls /lock/7000
  6. []

可以看到创建的根节点和根节点下面的永久节点。但是 永久节点下面的子节点因为每次获取到锁用完了就直接删除掉了,因此看不到。

 

可以在运行时查看:

  1. [zk: localhost:2181(CONNECTED) 21] ls /lock/7000
  2. [1461208585829T41, 1461208585805T13]
  3. [zk: localhost:2181(CONNECTED) 22] ls /lock/7000
  4. []

可以看到运行的时候还是有的。

 

同时从这上面也能轻微看出Zookeeper的数据结构。

 

 

3.3 结构重构

很显然, 上面这种写法是很蛋疼的:

  1. 系统的可靠性依赖于给定lock的唯一性。
  2. 还需要处理返回值。

一个优秀的工具是无需使用者过多的操作,更无需知道内部的实现方式就能实现工具的可靠。

想想依赖现有的系统架构怎么让工具更好用呢?

 

Zk在创建节点的时候会有一点并发问题。 例如A线程查询到X节点未被创造,于是决定去创造X节点。 但是在A查询并计划创造的这一途中B线程已经去创建完成。 这个时候A线程创建X节点是会失败的。

 

4. 尾注(2018年9月12日)

4.1 本文第二三节的案例描述以及代码分析是错误的!!!

2018年9月12日 重新看了一遍本博客, 发现上述博客并无用处, 会有 3.3小节中描述的并发问题。本博客不再加以修正。

正确的出路方案为: 依赖 zookeeper创建的顺序节点,通过zookeeper创建顺序节点的能力, 判断当前自己创建的节点是不是序列号最小的节点, 从而获知是否获取到的锁。

 

4.2 Zookeeper 的分布式锁实现原理的基础依赖

zookeeper 创建顺序节点的自增机制, 并发条件下创建的节点会在节点名称后面添加自增序列。 此自增序列来源于 zookeeper 内部的同步机制。每次节点创建的commit一定是得到多数派的赞同投票之后,才会提交到集群数据中。 后续才会分配新的节点序号。

4.3  Zookeeper 的分布式锁实现原理的基础依赖

一样的, 普通的SET会有并发状况。 而 Redis 提供了 SETNX 指令:http://doc.redisfans.com/string/setnx.html 返回值用于描述受影响的数据条数。 基于Redis的单线程, 此逻辑有效。

如上。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/965762
推荐阅读
相关标签
  

闽ICP备14008679号