赞
踩
数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。
<dependencies> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>4.0.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.8.0-beta4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.8.0-beta4</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
package com.shu.registrationcenter; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.codehaus.jackson.map.ObjectMapper; import java.io.InputStream; import java.util.Properties; import java.util.Scanner; /** * @author 31380 * @description * @create 2024/3/17 13:52 */ public class ConfigCenter { private static final String CONNECT_ADDR = "127.0.0.1:2181"; private static final int SESSION_TIMEOUT = 5000; private static final String ROOT_PATH_DB = "/app1/configCenter/DbConfig"; private static final String ROOT_PATH_APPSERVER1 = "/app1/configCenter/Appserver1"; private static final String ROOT_PATH_APPERVER2 = "/app1/configCenter/Appserver2"; private static final String DB_CONFIG_PATH = "dbConfig.properties"; private static final String DB_CONFIG_PATH1 = "db1Config.properties"; private static final String APPSERVER1_CONFIG_PATH = "appserver1Config.properties"; private static final String APPSERVER1_CONFIG_PATH1 = "appserver3Config.properties"; private static final String APPSERVER2_CONFIG_PATH = "appserver2Config.properties"; private static final String APPSERVER2_CONFIG_PATH2 = "appserver4Config.properties"; /** * 创建数据库配置中心 * @param path 路径 * @param configName 配置文件名 */ public void createConfigCenter(String path, String configName) { // 创建连接 CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curator.start(); try { // 创建根节点,如果不存在则创建 if (curator.checkExists().forPath(path) == null) { curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); } // 使用 ClassLoader 获取资源文件的输入流 InputStream inputStream = Appserver1.class.getClassLoader().getResourceAsStream(configName); if (inputStream != null) { // 加载属性文件 Properties properties = new Properties(); properties.load(inputStream); // 创建 ObjectMapper 实例 ObjectMapper objectMapper = new ObjectMapper(); // 将配置属性转换为 JSON 数据 String jsonData = objectMapper.writeValueAsString(properties); // 写入 JSON 数据到 ZooKeeper 节点 curator.setData().forPath(path, jsonData.getBytes()); System.out.println("节点 " + path + " 写入成功!"); // 关闭输入流 inputStream.close(); } else { System.out.println("资源文件未找到!"); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭连接 curator.close(); } } /** * 更新数据库配置中心 */ public void updateConfigCenter(String path, String configName) { // 创建连接 CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curator.start(); try { // 使用 ClassLoader 获取资源文件的输入流 InputStream inputStream = Appserver1.class.getClassLoader().getResourceAsStream(configName); if (inputStream != null) { // 加载属性文件 Properties properties = new Properties(); properties.load(inputStream); // 创建 ObjectMapper 实例 ObjectMapper objectMapper = new ObjectMapper(); // 将配置属性转换为 JSON 数据 String jsonData = objectMapper.writeValueAsString(properties); // 写入 JSON 数据到 ZooKeeper 节点 curator.setData().forPath(path, jsonData.getBytes()); System.out.println("节点 " + path + " 更新成功!"); // 关闭输入流 inputStream.close(); } else { System.out.println("资源文件未找到!"); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭连接 curator.close(); } } /** * 修改数据 * * @param args */ public static void main(String[] args) { ConfigCenter configCenter = new ConfigCenter(); configCenter.createConfigCenter(ROOT_PATH_DB, DB_CONFIG_PATH); configCenter.createConfigCenter(ROOT_PATH_APPSERVER1, APPSERVER1_CONFIG_PATH); configCenter.createConfigCenter(ROOT_PATH_APPERVER2, APPSERVER2_CONFIG_PATH); // 等待键盘输入,输入任意字符后继续,1.更新数据库配置中心,2.更新 Appserver1 配置中心,3.更新 Appserver2 配置中心 Scanner scanner = new Scanner(System.in); System.out.println("请选择操作:1.更新数据库配置中心,2.更新 Appserver1 配置中心,3.更新 Appserver2 配置中心, 4 退出" ); int choice = scanner.nextInt(); switch (choice) { case 1: configCenter.updateConfigCenter(ROOT_PATH_DB, DB_CONFIG_PATH1); break; case 2: configCenter.updateConfigCenter(ROOT_PATH_APPSERVER1, APPSERVER1_CONFIG_PATH1); break; case 3: configCenter.updateConfigCenter(ROOT_PATH_APPERVER2, APPSERVER2_CONFIG_PATH2); break; case 4: // 关闭 Scanner 对象 scanner.close(); break; default: break; } } }
package com.shu.registrationcenter; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * @author 31380 * @description Appserver1,NodeCache 监听节点数据变化,但是不监听节点的子节点变化,如果需要监听子节点变化,可以使用 PathChildrenCache * @create 2024/3/17 14:03 */ public class Appserver1 { private static final String CONNECT_ADDR = "127.0.0.1:2181"; private static final int SESSION_TIMEOUT = 5000; private static final String ROOT_PATH_APPERVER1 = "/app1/configCenter/Appserver1"; public static void main(String[] args) { // 创建连接 CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curator.start(); try { // 创建 NodeCache 监听指定节点 final NodeCache nodeCache = new NodeCache(curator, ROOT_PATH_APPERVER1); // 设置为 true,表示缓存数据 nodeCache.start(true); // 监听节点数据变化 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { if (nodeCache.getCurrentData() != null) { // 获取节点的当前数据 String data = new String(nodeCache.getCurrentData().getData()); System.out.println("节点数据变化:" + data); } else { System.out.println("节点已删除!"); } } }); // 读取节点数据 System.out.println("节点数据:" + new String(nodeCache.getCurrentData().getData())); // 为了保持监听线程活动,让程序不退出 Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭连接 curator.close(); } } }
package com.shu.registrationcenter; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * @author 31380 * @description * @create 2024/3/17 14:03 */ public class Appserver2 { private static final String CONNECT_ADDR = "127.0.0.1:2181"; private static final int SESSION_TIMEOUT = 5000; private static final String ROOT_PATH_APPERVER2 = "/app1/configCenter/Appserver2"; public static void main(String[] args) { // 创建连接 CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build(); curator.start(); try { // 创建 NodeCache 监听指定节点 final NodeCache nodeCache = new NodeCache(curator, ROOT_PATH_APPERVER2); // 设置为 true,表示缓存数据 nodeCache.start(true); // 监听节点数据变化 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { if (nodeCache.getCurrentData() != null) { // 获取节点的当前数据 String data = new String(nodeCache.getCurrentData().getData()); System.out.println("节点数据变化:" + data); } else { System.out.println("节点已删除!"); } } }); // 读取节点数据 System.out.println("节点数据:" + new String(nodeCache.getCurrentData().getData())); // 为了保持监听线程活动,让程序不退出 Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭连接 curator.close(); } } }
注意:这个这里代码监听中NodeCache只能监听子节点的变化,以上就简单的实现了发布与订阅的功能,但是设计到后面的思考,服务如何平滑过渡?
负载均衡(Load Balance)是一种相当常见的计算机网络技术,用来对多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源进行分配负载,以达到优化资源使用、最大化吞吐率、最小化响应时间和避免过载的目的。
假设现在要我们拥有10台Web服务,1~10,当发生请求时依次提供服务
package com.shu.loadbalancing; import java.io.Serializable; import java.util.Objects; /** * @author 31380 * @description WebNodeData * @create 2024/3/17 19:31 */ public class WebNodeData implements Serializable { private static final long serialVersionUID = 4848971284812662834L; private String ip; private Integer port; private Integer weight; private String path; public WebNodeData() { } public WebNodeData(String ip, Integer port, Integer weight, String path) { this.ip = ip; this.port = port; this.weight = weight; this.path = path; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WebNodeData nodeData = (WebNodeData) o; return ip.equals(nodeData.ip) && port.equals(nodeData.port); } @Override public int hashCode() { return Objects.hash(ip, port); } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public Integer getWeight() { return weight; } public void setWeight(Integer weight) { this.weight = weight; } @Override public String toString() { return "NodeData{" + "ip='" + ip + '\'' + ", port=" + port + ", weight=" + weight + ", path='" + path + '\'' + '}'; } }
package com.shu.loadbalancing; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; /** * @author 31380 * @description * @create 2024/3/17 19:33 */ public class Service { protected CuratorFramework client; String connectString = "127.0.0.1:2181"; int sessionTimeout = 5000; int connectionTimeout = 5000; /** * 启动客户端 */ public void start() { client = CuratorUtils.createCuratorFramework(connectString, sessionTimeout, connectionTimeout); client.start(); System.out.println("客户端启动成功"); } /** * 关闭客户端 */ public void close() { client.close(); System.out.println("客户端关闭成功"); } }
package com.shu.loadbalancing; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** * @author 31380 * @description Service * @create 2024/3/17 19:36 */ public class WebService extends Service{ public final static String SERVICE_PATH = "/web"; private WebNodeData nodeData; public WebService(CuratorFramework client, WebNodeData nodeData) { super.client = client; this.nodeData = nodeData; } //节点初始化 public void init() throws Exception { String path = SERVICE_PATH + "/" + nodeData.getPath(); Stat stat = client.checkExists().forPath(path); if (stat == null) { String parentpath = CuratorUtils.getparrentpath(path); CuratorUtils.createParentPath(parentpath, client); client.create().withMode(CreateMode.EPHEMERAL).forPath(path, DataUtil.getBytesFromObject(this.nodeData)); } else { //如果已经存在,则更新数据 client.setData().forPath(path, DataUtil.getBytesFromObject(this.nodeData)); } } }
package com.shu.loadbalancing; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; /** * @author 31380 * @description * @create 2024/3/17 19:40 */ public class LoadBalanceService extends Service { private List<WebNodeData> nodeDatas = new ArrayList<>(16); public LoadBalanceService(CuratorFramework client) { super.client = client; } //拿去已经注册上来的所有节点 public void init() throws Exception { List<String> children = client.getChildren().forPath(WebService.SERVICE_PATH); for (String path : children) { path = WebService.SERVICE_PATH + "/" + path; try { byte[] data = client.getData().forPath(path); WebNodeData nodeData = (WebNodeData) DataUtil.getObjectFromBytes(data); nodeDatas.add(nodeData); } catch (Exception e) { e.printStackTrace(); } } } public void register() throws Exception { PathChildrenCache watcher = new PathChildrenCache(client, WebService.SERVICE_PATH, true/*,false, service*/); watcher.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { System.out.println(pathChildrenCacheEvent.getData().getPath() + "上线"); //新服务注册 WebNodeData data = (WebNodeData) DataUtil.getObjectFromBytes(pathChildrenCacheEvent.getData().getData()); nodeDatas.add(data); } if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { //服务下架或宕机 System.out.println(pathChildrenCacheEvent.getData().getPath() + "下线"); WebNodeData data = (WebNodeData) DataUtil.getObjectFromBytes(pathChildrenCacheEvent.getData().getData()); nodeDatas.remove(data); } } }); watcher.start(PathChildrenCache.StartMode.NORMAL); } //负载算法,随机选择当前在线的一台服务 public WebNodeData loadBalance() { ThreadLocalRandom random = ThreadLocalRandom.current(); WebNodeData result = null; if (nodeDatas.isEmpty()) { return null; } synchronized (nodeDatas) { if (nodeDatas.isEmpty()) { return null; } int all = 0; for (WebNodeData nodeData : nodeDatas) { all += nodeData.getWeight(); } int index = random.nextInt(all); for (WebNodeData nodeData : nodeDatas) { if (index <= nodeData.getWeight()) { result = nodeData; break; } index -= nodeData.getWeight(); } } return result; } }
package com.shu.loadbalancing; import java.io.*; /** * @author 31380 * @description * @create 2024/3/17 19:32 */ public class DataUtil { //将object转换为bytes public static byte[] getBytesFromObject(Object object) throws IOException { ObjectOutputStream out = null; ByteArrayOutputStream bos = null; try { // System.out.println(object); bos = new ByteArrayOutputStream(); out = new ObjectOutputStream(bos); out.writeObject(object); out.flush(); byte[] yourBytes = bos.toByteArray(); return yourBytes; } finally { if (bos != null) { bos.close(); } if (out != null) { out.close(); } } } // public static Object getObjectFromBytes(byte[] bytes) throws IOException, ClassNotFoundException { ObjectInputStream in = null; try { in = new ObjectInputStream(new ByteArrayInputStream(bytes)); return in.readObject(); } finally { if (in != null) { in.close(); } } } }
package com.shu.loadbalancing; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import java.util.Scanner; /** * @author 31380 * @description * @create 2024/3/17 19:43 */ public class LoadBalancingTest { public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryNTimes(3, 100); for(int i = 0; i < 10; i ++){ CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 30*60*1000, 5*1000, retryPolicy); WebNodeData nodeData = new WebNodeData("127.0.0.1:2181"+i, 8080, i, "web"+i); WebService webService = new WebService(client, nodeData); webService.start(); webService.init(); } CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 30*60*1000, 5*1000, retryPolicy); LoadBalanceService loadBalanceService = new LoadBalanceService(client); loadBalanceService.start(); loadBalanceService.init(); loadBalanceService.register(); while (true){ Scanner sc = new Scanner( System.in); String nextCommand = sc.nextLine(); WebNodeData nodeData = loadBalanceService.loadBalance(); System.out.println("本次请求由:" + nodeData.getIp() + ":" + nodeData.getPort() + " 执行"); } } }
package com.shu.id; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; /** * @author 31380 * @description * @create 2024/3/20 14:10 */ public class ZookeeperUniqueIDGenerator { private static final String CONNECT_STRING = "localhost:2181"; // ZooKeeper连接字符串 private static final int SESSION_TIMEOUT = 5000; // 会话超时时间 private static final String ROOT_PATH = "/jdbs"; // 根节点路径 CuratorFramework client; public ZookeeperUniqueIDGenerator() { // 创建连接 client = CuratorUtils.createCuratorFramework(CONNECT_STRING, SESSION_TIMEOUT, 5000); // 启动连接 client.start(); } /** * 生成唯一ID * @param path 节点路径 * @param prefix 节点前缀 * @return */ public String generate(String path, String prefix) { // 指定类型的任务下面通过调用create()接口来创建一个顺序节点,例如创建“job-”节点。 String fullPath = ROOT_PATH + "/" + path + "/" + prefix + "-"; try { // 创建一个顺序节点 String createdPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(fullPath); // 返回节点名称 return path+"-"+createdPath.substring(createdPath.lastIndexOf("/") + 1); } catch (Exception e) { e.printStackTrace(); } return null; } /** * 关闭 */ public void close() { client.close(); } public static void main(String[] args) { ZookeeperUniqueIDGenerator zookeeperUniqueIDGenerator = new ZookeeperUniqueIDGenerator(); // 循环生成10个唯一ID for (int i = 0; i < 10; i++) { System.out.println(zookeeperUniqueIDGenerator.generate("web", "job")); } zookeeperUniqueIDGenerator.close(); } }
参考第一个数据发布与订阅
排他锁(Exclusive Locks,简称X锁),又称为写锁或独占锁,是一种基本的锁类型。如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取和更新操作,其他任何事务都不能再对这个数据对象进行任何类型的操作——直到T1释放了排他锁
package com.shu.lock; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; /** * @author 31380 * @description * @create 2024/3/20 14:59 */ public class ExclusiveLocksTest { private static final String ZK_ADDRESS = "127.0.0.1:2181"; private static final String LOCK_PATH = "/exclusive_lock"; // 客户端数量 private static final int CLIENT_COUNT = 10; public static void main(String[] args) { for (int i = 0; i < CLIENT_COUNT; i++) { new Thread(() -> { CuratorFramework client = CuratorUtils.createCuratorFramework(ZK_ADDRESS, 5000, 5000); client.start(); InterProcessMutex locks = new InterProcessMutex(client, LOCK_PATH); try { locks.acquire(); System.out.println(Thread.currentThread().getName() + "获取到锁"); // 模拟业务处理 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "释放锁"); } catch (Exception e) { e.printStackTrace(); }finally { try { locks.release(); } catch (Exception e) { e.printStackTrace(); } client.close(); } }).start(); } } }
共享锁(Shared Locks,简称S锁),又称为读锁,同样是一种基本的锁类型。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁——直到该数据对象上的所有共享锁都被释放。共享锁和排他锁最根本的区别在于,加上排他锁后,数据对象只对一个事务可见,而加上共享锁后,数据对所有事务都可见。下面我们就来看看如何借助ZooKeeper来实现共享锁。
package com.shu.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; import org.apache.curator.retry.RetryNTimes; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author 31380 * @description * @create 2024/3/20 16:53 */ public class InterProcessReadWriteLockTest { private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSS"); private static ExecutorService threadPool = Executors.newFixedThreadPool(10); private static String zkLockPath = "/Aaron/Lock2"; private static CuratorFramework zkClient; public static void init() { // 创建客户端 zkClient = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") // ZK Server地址信息 .connectionTimeoutMs(15 * 1000) // 连接超时时间: 15s .sessionTimeoutMs( 60 * 1000 ) // 会话超时时间: 60s // 重试策略: 重试3次, 每次间隔1s .retryPolicy(new RetryNTimes(3, 1000)) .build(); // 启动客户端 zkClient.start(); System.out.println("---------------------- 系统上线 ----------------------"); } /** * 测试: 读锁为共享锁 */ public void test1Read() { System.out.println("\n---------------------- Test 1 : Read ----------------------"); for(int i=1; i<=3; i++) { String taskName = "读任务#"+i; Runnable task = new ReadTask(taskName, zkClient, zkLockPath); threadPool.execute( task ); } // 主线程等待所有任务执行完毕 try{ Thread.sleep( 10*1000 ); } catch (Exception e) {} } /** * 测试: 写锁为互斥锁 */ public void test2Write() { System.out.println("\n---------------------- Test 2 : Write ----------------------"); for(int i=1; i<=3; i++) { String taskName = "写任务#"+i; Runnable task = new WriteTask(taskName, zkClient, zkLockPath); threadPool.execute( task ); } // 主线程等待所有任务执行完毕 try{ Thread.sleep( 30*1000 ); } catch (Exception e) {} } /** * 测试: 读写互斥 */ public void test2ReadWrite() { System.out.println("\n---------------------- Test 3 : Read Write ----------------------"); for(int i=1; i<=8; i++) { Runnable task = null; Boolean isReadTask = i%2 == 0; if( isReadTask ) { task = new ReadTask( "读任务#"+i, zkClient, zkLockPath ); } else { task = new WriteTask( "写任务#"+i, zkClient, zkLockPath ); } threadPool.execute( task ); } // 主线程等待所有任务执行完毕 try{ Thread.sleep( 40*1000 ); } catch (Exception e) {} } public static void close() { // 关闭客户端 zkClient.close(); System.out.println("---------------------- 系统下线 ----------------------"); } /** * 打印信息 * @param msg */ private static void info(String msg) { String time = formatter.format(LocalTime.now()); String thread = Thread.currentThread().getName(); String log = "["+time+"] "+ " <"+ thread +"> " + msg; System.out.println(log); } /** * 读任务 */ private static class ReadTask implements Runnable { private String taskName; private InterProcessMutex readLock; public ReadTask(String taskName, CuratorFramework zkClient, String zkLockPath) { this.taskName = taskName; InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); this.readLock = interProcessReadWriteLock.readLock(); } @Override public void run() { try{ readLock.acquire(); info(taskName + ": 成功获取读锁 #1"); // 模拟业务耗时 Thread.sleep( RandomUtils.nextLong(100, 500) ); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放读锁 #1"); try { readLock.release(); } catch (Exception e) { e.printStackTrace(); } } } } /** * 写任务 */ private static class WriteTask implements Runnable { private String taskName; private InterProcessMutex writeLock; public WriteTask(String taskName, CuratorFramework zkClient, String zkLockPath) { this.taskName = taskName; InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, zkLockPath); this.writeLock = interProcessReadWriteLock.writeLock(); } @Override public void run() { try{ writeLock.acquire(); info(taskName + ": 成功获取写锁 #1"); // 模拟业务耗时 Thread.sleep(RandomUtils.nextLong(100, 500)); } catch (Exception e) { System.out.println( taskName + ": Happen Exception: " + e.getMessage()); } finally { info(taskName + ": 释放写锁 #1\n"); try { writeLock.release(); } catch (Exception e) { e.printStackTrace(); } } } } // 随机数工具类 static class RandomUtils { public static long nextLong(int min, int max) { return min + (long)(Math.random() * ((max - min) + 1)); } } public static void main(String[] args) { InterProcessReadWriteLockTest test = new InterProcessReadWriteLockTest(); init(); test.test1Read(); test.test2Write(); test.test2ReadWrite(); close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。