赞
踩
目录
前两天一个小伙伴对分布式锁有些疑问,闲下来了顺带写个示例展示一下。代码很基础,重在思维的理解。
文末会有对Redis和Zookeeper的分布式锁的实现依赖描述。
当有很多的机器,机器彼此间不可探明彼此的存在,又:
这个时候使用个分布式锁最好啦。
很久以前用过别人写的Redis分布式锁,小伙伴说到zookeeper实现的。实际上原理都是一样:
某个性能彪悍的服务持有所有机器唯一的key值。
自然,这方面稳定性、正确性、性能肯定是考虑要素咯。
两者都能够在唯一节点(key)存放唯一值。而两者性能又是极其彪悍且稳定。自然用作分布式锁的服务很有效率。
比较通俗易懂的理解方式: Linux的文件系统。
给定一个路径一定能标识一个文件(不管他是文件还是文件夹还是连接还是块)。
Zookeeper也可以这么理解。因此只要探测到某个路径下有值,就可以判断该路径已经被占领。进而需要等待锁。
使用Zookeeper做分布式锁, 知道它怎么用最为关键。
实现分布式锁直接看第三节
如下代码片段
-
- import java.io.IOException;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- import org.apache.zookeeper.CreateMode;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooDefs;
- import org.apache.zookeeper.ZooKeeper;
- import org.apache.zookeeper.data.Stat;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class ZkConnection implements Watcher {
-
- private static final Logger logger = LoggerFactory.getLogger(ZkConnection.class);
-
- private String conn = "10.33.108.67:2800";
- private int timeOut = 3000;
-
- private ZooKeeper zooKeeper;
- private CountDownLatch latch = new CountDownLatch(1);
-
- public ZkConnection() throws IOException, InterruptedException {
- zooKeeper = new ZooKeeper(conn, timeOut, this);
- // 连接会耗时. 可以选择一直等待连接完毕或者等待一定时长
- latch.await();
- }
-
- public void createNode(String path, String value) throws KeeperException, InterruptedException {
- // 选择掉线立即删除的模式, 2018年9月12日 并且!!! 带有自增序列.
- // 注意:临时节点不能有子节点
- String node;
- logger.info("去zookeeper上创建以path:{} 为路径的节点, 是顺序节点. 通过序列号判断是第几个创建的. 创建结果:{}", path,
- node = zooKeeper.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
- List<String> children = zooKeeper.getChildren(path.substring(0, path.lastIndexOf("/")), false);
- int idx = Integer.valueOf(node.substring(path.length()));
- int min = Integer.MAX_VALUE;
- for (String s: children) {
- int current;
- if ((current = Integer.valueOf(s.substring(path.length()))) < min) {
- min = current;
- }
- }
- logger.info("如果接下来的这个值:{} 最小值:{}(因为代码限制一旦手动释放或者断开连接, 节点删除), 则表示获取到了锁, 否则是没有获取到的。", idx, min);
- }
-
- public boolean exist(String path) throws KeeperException, InterruptedException {
- Stat stat = zooKeeper.exists(path, this);
- return stat != null;
- }
-
- public String get(String path) throws KeeperException, InterruptedException, IOException {
- if (exist(path)) {
- return new String(zooKeeper.getData(path, this, null));
- }
- throw new IOException("not exist");
- }
-
- public void release() throws InterruptedException {
- zooKeeper.close();
- }
-
- public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
-
- ZkConnection connection = new ZkConnection();
- String path = "/lock/getLock";
- for (int i = 0; i < 1000; i ++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- connection.createNode(path, "Hello World");
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- connection.release();
- }
-
- @Override
- public void process(WatchedEvent watchedEvent) {
- if (watchedEvent == null) {
- return;
- }
- Event.KeeperState state = watchedEvent.getState();
- Event.EventType type = watchedEvent.getType();
-
- if (Event.KeeperState.SyncConnected == state
- && Event.EventType.None == type) {
- // 连接成功
- latch.countDown();
- } else {
- throw new RuntimeException("Error!");
- }
- }
- }
可以看到输出。
ZkClient.java:
- import org.apache.commons.collections.CollectionUtils;
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.PreDestroy;
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- /**
- * Created by yupeng.qin on 2016/4/20.
- */
- // @Service
- public class ZkClient implements Watcher {
- private static final Logger logger = LoggerFactory.getLogger(ZkClient.class);
-
- // @Value("#{placeholderConfigurer['zookeeper.conn.url']}")
- private String conn = "192.168.56.101:2181";
- // @Value("#{placeholderConfigurer['zookeeper.conn.timeOut']}")
- private Integer timeOut = 3000;
-
- private final static String ROOT = "/lock";
-
- private ZooKeeper zooKeeper;
- private CountDownLatch latch;
-
- private static ZkClient client = new ZkClient();
- public static ZkClient getInstance() {
- return client;
- }
-
-
- public ZkClient() {
- tryConnect();
- try {
- createNode(ROOT, false);
- } catch (KeeperException e) {
- // do something
- } catch (InterruptedException e) {
- // do something
- }
- }
- private void tryConnect() {
- int i = 0;
- do {
- try {
- if (conn()) {
- return;
- }
- } catch (Exception e) {
- logger.error("conn error. ", e);
- }
- } while (i++ < 3);
- throw new RuntimeException("Error!");
- }
- private boolean conn() throws IOException, InterruptedException {
- latch = new CountDownLatch(1);
- zooKeeper = new ZooKeeper(conn, timeOut, this);
- latch.await();
- logger.info("Zookeeper Connection Created! Ready to connection! url :{}", conn);
- return true;
- }
-
-
- /**
- * 这里的一个缺点是 关注返回值, 该值用于unlock节点
- * @param lock 申请的锁
- * @return 一个临时的变量
- */
- public String lock(String lock) {
- int rand = (int) (Math.random() * 100);
- lock = ROOT+"/"+lock;
- String lockCode = lock;
- try {
- try{
- createNode(lock, false);
- } catch (Exception e) {
- // do nothing
- }
- lockCode = lockCode(rand, lock);
- createNode(lockCode, true);
- while (true) {
-
- String current = lockCode.substring(lockCode.lastIndexOf('/')+1);
-
- List<String> child = zooKeeper.getChildren(lock, this);
- if (CollectionUtils.size(child) <= 1) {
- if (logger.isDebugEnabled()) {
- logger.debug("运气好, {} 直接拿到了锁", lockCode);
- }
- return lockCode;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("运气不佳, {} 还有这么多 {} 等待", lockCode, child.size());
- }
- Collections.sort(child);
- if (child.get(0).equals(current)) {
- return lockCode;
- }
- Thread.sleep(50);
- }
- } catch (KeeperException e) {
- // do something
- } catch (InterruptedException e) {
- // do something
- }
- return lockCode;
- }
-
- public void unlock(String code) throws KeeperException, InterruptedException {
- if (logger.isDebugEnabled()) {
- logger.debug("{} 节点已经被删除", code);
- }
- zooKeeper.delete(code, -1);
- }
-
- private String lockCode(int rand, String lock) {
- return new StringBuilder(lock).append("/").append(System.currentTimeMillis())
- .append("T").append(rand).toString();
- }
-
- private void createNode(String node, boolean lose) throws KeeperException, InterruptedException {
- if (!exist(node)) {
- // 选择掉线立即删除的节点
- zooKeeper.create(node, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- lose ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT);
- }
- }
-
- private boolean exist(String path) throws KeeperException, InterruptedException {
- Stat stat = zooKeeper.exists(path, this);
- return stat != null;
- }
-
- @PreDestroy
- public void release() throws InterruptedException {
- zooKeeper.close();
- }
-
- @Override
- public void process(WatchedEvent watchedEvent) {
- if (watchedEvent == null) {
- return;
- }
- Event.KeeperState state = watchedEvent.getState();
- Event.EventType type = watchedEvent.getType();
-
- if (Event.KeeperState.SyncConnected == state) {
- if ( Event.EventType.None == type ) {
- latch.countDown();
- } else if (Event.EventType.NodeDeleted == type) {
-
- }
- } else if (Event.KeeperState.Disconnected == state){
- tryConnect();
- }
- throw new RuntimeException("Error!");
- }
- }
运行的线程: RequestThread.java:
- import org.apache.zookeeper.KeeperException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class RequestThread extends Thread {
-
- private static final Logger logger = LoggerFactory.getLogger(RequestThread.class);
-
- private int threadId;
- private String lock;
-
- public RequestThread(int threadId, String lock) {
- this.threadId = threadId;
- this.lock = lock;
- }
-
- public void run() {
- logger.debug("线程号:{} 申请获得 {} 的锁", threadId, lock);
- String code = ZkClient.getInstance().lock(lock);
- logger.debug("线程号:{} 申请获得 {} 的锁 成功!", threadId, lock);
- try {
- Thread.sleep(1000);
- ZkClient.getInstance().unlock(code);
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
测试的代码:
ZkClientTest.java:
- public class ZkClientTest {
-
- public static void main(String[] args) {
- String[] locks = {"5000", "6000", "7000", "8000"};
-
- for (int i = 0; i < 10; i ++) {
- new Thread(new RequestThread(i, locks[i%locks.length])).start();
- }
- }
-
- }
能看得到输出还是很感人的。
登陆zk服务器, 查看服务器上现有的节点(可以分别在程序运行时查和运行结束之后查):
运行结束后:
- [zk: localhost:2181(CONNECTED) 6] ls /
- [lock, zookeeper]
- [zk: localhost:2181(CONNECTED) 7] ls /lock
- [7000, 100, 200, 8000, 300, 6000, 5000, 400, 500, 600, 700, 800]
- [zk: localhost:2181(CONNECTED) 8] ls /lock/7000
- []
可以看到创建的根节点和根节点下面的永久节点。但是 永久节点下面的子节点因为每次获取到锁用完了就直接删除掉了,因此看不到。
可以在运行时查看:
- [zk: localhost:2181(CONNECTED) 21] ls /lock/7000
- [1461208585829T41, 1461208585805T13]
- [zk: localhost:2181(CONNECTED) 22] ls /lock/7000
- []
可以看到运行的时候还是有的。
同时从这上面也能轻微看出Zookeeper的数据结构。
很显然, 上面这种写法是很蛋疼的:
一个优秀的工具是无需使用者过多的操作,更无需知道内部的实现方式就能实现工具的可靠。
想想依赖现有的系统架构怎么让工具更好用呢?
Zk在创建节点的时候会有一点并发问题。 例如A线程查询到X节点未被创造,于是决定去创造X节点。 但是在A查询并计划创造的这一途中B线程已经去创建完成。 这个时候A线程创建X节点是会失败的。
2018年9月12日 重新看了一遍本博客, 发现上述博客并无用处, 会有 3.3小节中描述的并发问题。本博客不再加以修正。
正确的出路方案为: 依赖 zookeeper创建的顺序节点,通过zookeeper创建顺序节点的能力, 判断当前自己创建的节点是不是序列号最小的节点, 从而获知是否获取到的锁。
zookeeper 创建顺序节点的自增机制, 并发条件下创建的节点会在节点名称后面添加自增序列。 此自增序列来源于 zookeeper 内部的同步机制。每次节点创建的commit一定是得到多数派的赞同投票之后,才会提交到集群数据中。 后续才会分配新的节点序号。
一样的, 普通的SET会有并发状况。 而 Redis 提供了 SETNX 指令:http://doc.redisfans.com/string/setnx.html 返回值用于描述受影响的数据条数。 基于Redis的单线程, 此逻辑有效。
如上。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。