当前位置:   article > 正文

Zookeeper基本介绍&Zookeeper分布式锁

Zookeeper基本介绍&Zookeeper分布式锁

Zookeeper

zookeeper应用场景

  1. 配置中心
  2. 命名服务
  3. Master选举
  4. 集群管理
  5. 分布式队列
  6. 分布式锁

zookeeper数据模型

层次命名空间

从官网上可以看到zookeeper的数据模型图如下:
在这里插入图片描述

  1. 类似Linux文件系统,以(/)为 根
  2. 节点既可以存储数据,也可以可以存储子节点,所以可以认为zookeeper节点既是文件夹,其本身也是一个文件
  3. 节点的路径 总是表示为规范的,绝对的,以(/)分割的路径

znode

  1. 名称唯一,命名规范
  2. 节点类型:持久节点、顺序节点、临时节点、临时顺序节点
  3. 节点命名规则:
    null不能作为节点
    zookeeper为zookeeper保留节点
    不能够使用相对路径,具体可以查看官网

zookeeper的具体使用

分布式锁

分布式锁是控制分布式系统同步访问共享资源的一种方式。
在分布式系统中,如果有两个系统需要对同一个资源进行操作的时候,就需要通过一些互斥的手段来防止其他系统的干扰,以保证一致性,此时就需要使用到分布式锁了。
常见的分布式锁实现有redis分布式锁,zookeeper分布式锁,下面介绍下zookeeper分布式锁的实现。

1、原理
使用zookeeper临时节点+watch机制实现。
由于zookeeper节点不可重名,所以对于同一个节点(/lock/createOrder),如果有多个系统同时想要创建时,只会有一个系统成功,成功的系统即为获取到锁,可以进行下一步的业务操作,没有获取到锁的系统,可以对该临时节点设置监听,以便再临时节点删除后,可以获取锁资源。
下图为第一种分布式锁的实现原理图:
在这里插入图片描述
具体代码如下:

package com.xiaohuihui.zookeeper;

import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;

import java.io.UnsupportedEncodingException;

/**
 * @Desription: 小灰灰的zk序列化
 * @Author: yangchenhui
 */
public class XiaohuihuiZkSerializer implements ZkSerializer {

    String charset = "UTF-8";

    @Override
    public byte[] serialize(Object data) throws ZkMarshallingError {
        try {
            return String.valueOf(data).getBytes(charset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            throw new ZkMarshallingError(e);
        }
    }

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            return new String(bytes, charset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            throw new ZkMarshallingError(e);
        }
    }
}


package com.xiaohuihui.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Desription: zk分布式锁
 * @Author: yangchenhui
 */
public class ZkDistributeLock implements Lock {

    /**
     * zk节点
     */
    private String lockPath;
    /**
     * zk客户端
     */
    private ZkClient zkClient;
    private final String connectString = "localhost:2181";
    private final int sessionTimeout = 2000;

    private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();

    public ZkDistributeLock(String lockPath) {
        this.lockPath = lockPath;
        zkClient = new ZkClient(connectString, sessionTimeout);
        zkClient.setZkSerializer(new XiaohuihuiZkSerializer());
    }

    public ZkDistributeLock(String lockPath, ZkClient zkClient) {
        this.lockPath = lockPath;
        this.zkClient = zkClient;
    }

    @Override
    public void lock() {
        boolean hasLock = tryLock();
        if (!hasLock) {
            // 没有获取到锁,等待获取锁
            waitForLock();
            // 再次尝试获取锁
            lock();
        }
    }

    /**
     * 监听zk节点变化,等待获取锁
     */
    private void waitForLock() {
        CountDownLatch downLatch = new CountDownLatch(1);
        IZkDataListener zkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("=======>  有人修改了zk节点数据");
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                downLatch.countDown();
                System.out.println("=======>  zk节点被删除,可以重新获取锁");
            }
        };
        zkClient.subscribeDataChanges(lockPath, zkDataListener);

        // 如果当前zk节点存在,阻塞线程
        if (zkClient.exists(lockPath)) {
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 取消注册
        zkClient.unsubscribeDataChanges(lockPath, zkDataListener);

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        // 保证锁的可重入性(在一个业务的执行过程中,如果有多个地方需要获取到锁,需要实现锁的可重入性),通过本地线程变量实现
        Integer count = this.reentrantCount.get();
        if (count != null && count > 0) {
            // 表示当前线程已经获取到锁
            this.reentrantCount.set(++count);
            return true;
        }

        // 当前线程没有获取到锁,尝试创建zk节点,创建成功即为获取到锁
        try {
            zkClient.createEphemeral(lockPath);
            this.reentrantCount.set(1);
            return true;
        } catch (ZkNodeExistsException e) {
            return false;
        }

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        // 判断当前线程是否持有锁
        Integer count = this.reentrantCount.get();
        if (null != count && count > 1) {
            // 当前线程有超过1处获取锁
            this.reentrantCount.set(--count);
            return;
        } else if (null != count && count.equals(1)) {
            // 只有一处获取过锁
            this.reentrantCount.set(null);
        }
        // 删除zk节点
        zkClient.delete(lockPath);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

package com.xiaohuihui.zookeeper;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Desription: zk分布式锁测试
 * @Author: yangchenhui
 */
public class ZkDistributeLockTest {

    public static void main(String[] args) {
        // 并发数
        int currency = 50;

        // 循环屏障
        CyclicBarrier cb = new CyclicBarrier(currency);

        // 多线程模拟高并发
        for (int i = 0; i < currency; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    System.out.println(Thread.currentThread().getName() + "  ==============> 准备好");
                    // 等待一起出发
                    try {
                        cb.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    ZkDistributeLock lock = new ZkDistributeLock("/xiaohuihui zkDistributeLock");
//                    ZkDistributeLock2 lock = new ZkDistributeLock2("/xiaohuihui zkDistributeLock");

                    try {
                        lock.lock();
                        System.out.println(Thread.currentThread().getName() + " 获得锁!");
                    } finally {
                        lock.unlock();
                    }
                }
            }).start();

        }

    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223

创建锁的时候需要考虑到可重入性,否则在同一个线程中,如果有多个地方需要获取到锁,获取不到的地方会产生阻塞,所以需要实现可重入性,使用ThreadLocal可以实现。

运行效果如下:
在这里插入图片描述
这种情况下,只要节点删除,所有设置监听的客户端都会收到通知,产生惊群效应,极大的浪费系统资源,需要进行改进,实现思路如下图:
在这里插入图片描述
这样只会最小号收到通知,具体实现如下:

package com.xiaohuihui.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Desription: 使用zk临时顺序节点实现分布式锁,避免惊群效应
 * @Author: yangchenhui
 */
public class ZkDistributeLock2 implements Lock {

    /**
     * zk节点
     */
    private String lockPath;
    /**
     * zk客户端
     */
    private ZkClient zkClient;
    private final String connectString = "localhost:2181";
    private final int sessionTimeout = 2000;

    /**
     * 重入锁计数
     */
    private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
    /**
     * 当前节点路径
     */
    private ThreadLocal<String> currentPath = new ThreadLocal<>();
    /**
     * 上个节点路径
     */
    private ThreadLocal<String> beforePath = new ThreadLocal<>();

    public ZkDistributeLock2(String lockPath) {
        this.lockPath = lockPath;
        zkClient = new ZkClient(connectString, sessionTimeout);
        zkClient.setZkSerializer(new XiaohuihuiZkSerializer());
        if (!this.zkClient.exists(lockPath)) {
            try {
                this.zkClient.createPersistent(lockPath);
            } catch (ZkNodeExistsException e) {

            }
        }
    }

    @Override
    public void lock() {
        // 获取锁
        boolean hasLock = tryLock();
        if (!hasLock) {
            // 没有获取到锁,阻塞监听
            waitLock();
            // 再次尝试获取锁
            lock();
        }
    }

    /**
     * 阻塞监听,等待获取zk锁
     */
    private void waitLock() {
        CountDownLatch downLatch = new CountDownLatch(1);
        IZkDataListener zkDataListener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                downLatch.countDown();
                System.out.println("=======>  zk节点被删除,可以重新获取锁");
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("=======>  有人修改了zk节点数据");
            }
        };
        // 此时只需要监听上一个zk节点,不会引起惊群效应
        zkClient.subscribeDataChanges(this.beforePath.get(), zkDataListener);

        // 如果前一个zk节点存在,阻塞线程
        if (zkClient.exists(this.beforePath.get())) {
            try {
                downLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 取消注册
        zkClient.unsubscribeDataChanges(this.beforePath.get(), zkDataListener);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        // 保证锁的可重入性(在一个业务的执行过程中,如果有多个地方需要获取到锁,需要实现锁的可重入性),通过本地线程变量实现
        Integer count = this.reentrantCount.get();
        if (count != null && count > 0) {
            // 表示当前线程已经获取到锁
            this.reentrantCount.set(++count);
            return true;
        }

        // 如果当前节点不存在,创建临时顺序节点
        if (this.currentPath.get() == null) {
            this.currentPath.set(zkClient.createEphemeralSequential(lockPath + "/", "xiaohuihui testlock2"));
        }

        // 获取所有子节点,判断当前节点是不是最小的节点
        List<String> children = zkClient.getChildren(lockPath);
        Collections.sort(children);
        if (this.currentPath.get().equals(lockPath + "/" + children.get(0))) {
            // 当前节点即为最小节点
            this.reentrantCount.set(1);
            return true;
        } else {
            // 获取到当前节点的前一个节点,设置beforePath
            int currentPathIndex = children.indexOf(this.currentPath.get().substring(lockPath.length() + 1));
            this.beforePath.set(lockPath + "/" + children.get(currentPathIndex - 1));
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        // 判断当前线程是否持有锁
        Integer count = this.reentrantCount.get();
        if (count != null) {
            // 当前线程已经在多处获取锁
            if (count > 1) {
                this.reentrantCount.set(--count);
                return;
            } else {
                this.reentrantCount.set(null);
            }
        }
        // 删除当前zk节点
        zkClient.delete(this.currentPath.get());
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163

运行效果如下,已经避免惊群效应:
在这里插入图片描述

PS:未完待续

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/593384
推荐阅读
相关标签
  

闽ICP备14008679号