赞
踩
目录
1.3.1.1 分布式锁ZookeeperDistributedLock
1.3.1.2 模拟下单处理OrderServiceHandle
1.3.1.3 订单号生成类OrderCodeGenerator
1.3.1.4 分布式锁测试类TestZookeeperDistributedLock
1.3.2.2.1 分布式锁类CuratorDistributeLock
1.3.2.2.2 测试类TestCuratorDistributedLock
同步互斥,使得处理任务能够一个一个逐步的过临界资源。
zookeeper中有一种临时顺序节点,它具有以下特征:
Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事
件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然
后客户端根据 Watcher 通知状态和事件类型做出业务上的改变。
在实现分布式锁的时候,主要利用这个机制,实现释放锁的时候,通知等待锁的线程竞争锁。
综上可知,Zookeeper其实是基于临时顺序节点特性实现的分布式锁。当然,还结合了他的Watcher机制,实现释放锁的时候,通知等待锁的线程去竞争锁。
- package com.ningzhaosheng.distributelock.zookeeper;
-
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.I0Itec.zkclient.serialize.SerializableSerializer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
-
-
- /**
- * @author ningzhaosheng
- * @date 2024/4/17 18:13:38
- * @description 基于zookeeper实现的分布式锁
- */
- public class ZookeeperDistributedLock implements Lock {
-
- private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);
-
- // zookeeper 地址
- private String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
- // zookeeper 锁目录
- private String LOCK_PATH = "/LOCK";
-
- // 创建 zookeeper客户端zkClient
- private ZkClient client = null;
-
- private CountDownLatch cdl;
-
- // 当前请求的节点前一个节点
- private String beforePath;
- // 当前请求的节点
- private String currentPath;
-
-
- /**
- * 初始化客户端和创建LOCK目录
- *
- * @param ZOOKEEPER_IP_PORT
- * @param LOCK_PATH
- */
- public ZookeeperDistributedLock(String ZOOKEEPER_IP_PORT, String LOCK_PATH) {
- this.ZOOKEEPER_IP_PORT = ZOOKEEPER_IP_PORT;
- this.LOCK_PATH = LOCK_PATH;
- client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
- // 判断有没有LOCK目录,没有则创建
- if (!this.client.exists(LOCK_PATH)) {
- this.client.createPersistent(LOCK_PATH);
- }
- }
-
- @Override
- public void lock() {
- if (!tryLock()) {
- //对次小节点进行监听
- waitForLock();
- lock();
- } else {
- logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
- }
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
-
- }
-
- @Override
- public boolean tryLock() {
- // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
- if (currentPath == null || currentPath.length() <= 0) {
- // 创建一个临时顺序节点
- currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
- System.out.println("---------------------------->" + currentPath);
- }
-
- // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
- List<String> childrens = this.client.getChildren(LOCK_PATH);
- //由小到大排序所有子节点
- Collections.sort(childrens);
- //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
- if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
- return true;
- }
- //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
- else {
- int wz = Collections.binarySearch(childrens, currentPath.substring(6));
- beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
- }
-
- return false;
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- //等待锁,对次小节点进行监听
- private void waitForLock() {
- IZkDataListener listener = new IZkDataListener() {
- public void handleDataDeleted(String dataPath) throws Exception {
- logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
- if (cdl != null) {
- cdl.countDown();
- }
- }
-
- public void handleDataChange(String dataPath, Object data) throws Exception {
-
- }
- };
-
- // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
- this.client.subscribeDataChanges(beforePath, listener);
- if (this.client.exists(beforePath)) {
- cdl = new CountDownLatch(1);
- try {
- cdl.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- this.client.unsubscribeDataChanges(beforePath, listener);
- }
-
- @Override
- public void unlock() {
- // 删除当前临时节点
- client.delete(currentPath);
- }
-
- @Override
- public Condition newCondition() {
- return null;
- }
-
- }
-
- package com.ningzhaosheng.distributelock.zookeeper;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.locks.Lock;
-
- /**
- * @author ningzhaosheng
- * @date 2024/4/17 21:45:46
- * @description 模拟订单处理
- */
- public class OrderServiceHandle implements Runnable {
- private static OrderCodeGenerator ong = new OrderCodeGenerator();
-
- private Logger logger = LoggerFactory.getLogger(OrderServiceHandle.class);
-
- // 按照线程数初始化倒计数器,倒计数器
- private CountDownLatch cdl = null;
-
- private Lock lock = null;
-
- public OrderServiceHandle(CountDownLatch cdl, Lock lock) {
- this.cdl = cdl;
- this.lock = lock;
- }
-
- // 创建订单
- public void createOrder() {
- String orderCode = null;
-
- //准备获取锁
- lock.lock();
- try {
- // 获取订单编号
- orderCode = ong.getOrderCode();
- } catch (Exception e) {
- // TODO: handle exception
- } finally {
- //完成业务逻辑以后释放锁
- lock.unlock();
- }
-
- // ……业务代码
-
- logger.info("insert into DB使用id:=======================>" + orderCode);
- }
-
-
- @Override
- public void run() {
- try {
- // 等待其他线程初始化
- cdl.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- // 创建订单
- createOrder();
- }
- }
-
- package com.ningzhaosheng.distributelock.zookeeper;
-
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
- /**
- * @author ningzhaosheng
- * @date 2024/4/17 21:44:06
- * @description 生成订单号
- */
- public class OrderCodeGenerator {
- // 自增长序列
- private static int i = 0;
-
- // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
- public String getOrderCode() {
- Date now = new Date();
- SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
- return sdf.format(now) + ++i;
- }
- }
-
- package com.ningzhaosheng.distributelock.zookeeper;
-
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.locks.Lock;
-
- /**
- * @author ningzhaosheng
- * @date 2024/4/17 21:48:28
- * @description zookeeper分布式锁测试类
- */
- public class TestZookeeperDistributedLock {
- public static void main(String[] args) {
- // zookeeper 地址
- String ZOOKEEPER_IP_PORT = "192.168.31.9:2181";
- // zookeeper 锁目录
- String LOCK_PATH = "/LOCK";
- // 线程并发数
- int NUM = 10;
-
- CountDownLatch cdl = new CountDownLatch(NUM);
- for (int i = 1; i <= NUM; i++) {
- // 按照线程数迭代实例化线程
- Lock lock = new ZookeeperDistributedLock(ZOOKEEPER_IP_PORT, LOCK_PATH);
- new Thread(new OrderServiceHandle(cdl, lock)).start();
- // 创建一个线程,倒计数器减1
- cdl.countDown();
- }
- }
- }
-
从上图执行结果中可以看出,在多线程情况下,分布式锁获取和释放正常。
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>5.2.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>5.2.0</version>
- </dependency>
这里模拟业务使用分布式锁,还是使用的OrderServiceHandle类,这里只给出分布式锁实现类和测试类,不再给出OrderServiceHandle代码,可以参考上一小节的OrderServiceHandle类。
- package com.ningzhaosheng.distributelock.zookeeper.curator;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
-
- /**
- * @author ningzhaosheng
- * @date 2024/4/17 22:03:45
- * @description 实现Lock接口(其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装)
- */
- public class CuratorDistributeLock implements Lock {
-
- private CuratorFramework client;
- private InterProcessMutex mutex;
-
- public CuratorDistributeLock(String connString, String lockPath) {
- this(connString, lockPath, new ExponentialBackoffRetry(3000,5));
- }
-
- public CuratorDistributeLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {
- try {
- client = CuratorFrameworkFactory.builder()
- .connectString(connString)
- .retryPolicy(retryPolicy)
- .build();
- client.start();
-
- mutex = new InterProcessMutex(client, lockPath);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void lock() {
- try {
- // 获取锁
- mutex.acquire();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
-
- }
-
- @Override
- public boolean tryLock() {
- return false;
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- @Override
- public void unlock() {
- try {
- // 释放锁
- mutex.release();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public Condition newCondition() {
- return null;
- }
- }
-
- package com.ningzhaosheng.distributelock.zookeeper.curator;
-
- import com.ningzhaosheng.distributelock.zookeeper.OrderServiceHandle;
-
- import java.util.concurrent.CountDownLatch;
-
- /**
- * @author ningzhaosheng
- * @date 2024/4/17 21:54:33
- * @description 基于 apache curator分布式锁测试类
- */
- public class TestCuratorDistributedLock {
- private static final String ZK_ADDRESS = "192.168.31.9:2181";
- private static final String LOCK_PATH = "/distributed_lock";
-
- public static void main(String[] args) {
-
- int NUM = 10;
- CountDownLatch cdl = new CountDownLatch(NUM);
- for (int i = 1; i <= NUM; i++) {
- // 按照线程数迭代实例化线程
- /** 创建CuratorDistributeLock
- * 其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装
- */
- CuratorDistributeLock curatorDistributeLock = new CuratorDistributeLock(ZK_ADDRESS,LOCK_PATH);
- new Thread(new OrderServiceHandle(cdl, curatorDistributeLock)).start();
- // 创建一个线程,倒计数器减1
- cdl.countDown();
- }
- }
- }
-
从执行结果可以看出,基于apche curator框架实现zookeeper锁,它也是按照临时顺序节点的顺序获取锁的,每次获得锁的节点都是最小顺序节点,然后等待锁的线程,会基于watcher机制,每次给最小临时顺序节点加回调,监听节点的变更(即释放锁的线程会删除节点),然后再重新判断最小临时顺序节点,最小的获得锁执行,依次循环完成。
好了,本次内容就分享到这,欢迎关注本博主。如果有帮助到大家,欢迎大家点赞+关注+收藏,有疑问也欢迎大家评论留言!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。