赞
踩
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=D:\\Tools\\Zookeeper\\ServerA\\data dataLogDir=D:\\Tools\\Zookeeper\\ServerA\\log # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ## Metrics Providers # # https://prometheus.io Metrics Exporter #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider #metricsProvider.httpHost=0.0.0.0 #metricsProvider.httpPort=7000 #metricsProvider.exportJvmInfo=true server.1=localhost:2886:3886 server.2=localhost:2887:3887 server.3=localhost:2888:3888
server.A=B:C:D;其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。
@echo off
start cmd /k "cd /d D:\Tools\Zookeeper\ServerA\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerA...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerB\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerB...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerC\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerC...
echo All ZooKeeper servers have been started.
命令行语法 | 功能描述 |
---|---|
help | 显示所有操作命令 |
ls path | 使用ls命令来查看当前znode的子节点[可监听], -w 监听子节点变化, -s 附加次级信息 |
create | 普通创建, -s 含有序列, -e 临时(重启或超时消失) |
get path | 获得节点的值[可监听] -w 监听节点内容变化, -s 附加次级信息 |
set | 设置节点的具体值 |
stat | 查看节点的状态 |
delete | 删除节点 |
deleteall | 递归删除节点 |
[zk: bigdata01:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
这个命令就需要自己去练了
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>4.0.0</version> <scope>test</scope> </dependency>
使用CuratorFrameworkFactory这个工厂类的两个静态方法来创建一个客户端
package com.shu; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import java.util.ArrayList; import java.util.List; /** * @author 31380 * @description * @create 2024/3/16 18:39 */ public class CuratorUtils { /** * 创建连接 * * @param connectionString 连接地址 * @param sessionTimeout 会话超时时间 * @param connectionTimeout 连接超时时间 * @return */ public static CuratorFramework createCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout) { return CuratorFrameworkFactory.builder() .connectString(connectionString) .sessionTimeoutMs(sessionTimeout) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .connectionTimeoutMs(connectionTimeout) .build(); } /** * 创建连接 * * @param connectionString 连接地址 * @param sessionTimeout 会话超时时间 * @param connectionTimeout 连接超时时间 * @param retryPolicy 重试策略 * @return */ public static CuratorFramework createCuratorFrameworkWithRetry(String connectionString, int sessionTimeout, int connectionTimeout, RetryPolicy retryPolicy ) { return CuratorFrameworkFactory.builder() .connectString(connectionString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); } /** * 创建一个隔离的命名空间 */ public static CuratorFramework createNamespaceCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout, String namespace) { return CuratorFrameworkFactory.builder() .connectString(connectionString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .namespace(namespace) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); } /** * ZooDefs.Perms.READ:读权限 * ZooDefs.Perms.WRITE:写权限 * ZooDefs.Perms.CREATE:创建子节点权限 * ZooDefs.Perms.DELETE:删除权限 * ZooDefs.Perms.ADMIN:管理权限 * ZooDefs.Perms.ALL:所有权限 * 以下是一些常用的身份验证方案: * Ids.ANYONE_ID_UNSAFE:表示任何人都可以访问 * Ids.AUTH_IDS:表示使用已验证的用户身份 * Ids.OPEN_ACL_UNSAFE:表示开放的ACL,任何人都可以访问 * ACL acl = new ACL(ZooDefs.Perms.READ, new Id("myUser", "myPassword")); * @return */ public static List<ACL> getAclList() { ArrayList<ACL> acls = new ArrayList<>(); // 权限设置 ACL acl = new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE); // 添加权限 acls.add(acl); return acls; } }
package com.shu.base; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import java.util.List; /** * @author 31380 * @description * @create 2024/3/16 18:43 */ public class CuratorCreatTest { /** * 总结:curator创建节点方法 * 1.创建节点,如果节点已经存在则抛出异常 create().forPath() * 2.withMode():节点类型: CreateMode.EPHEMERAL 临时节点,CreateMode.PERSISTENT 永久节点 * 3.递归创建节点 creatingParentsIfNeeded() * 4.查询所有子节点 getChildren().forPath() * 5.删除节点 delete().forPath() * 6.判断节点是否存在 checkExists().forPath() * 7.关闭连接 close() * @param args * @throws Exception */ public static void main(String[] args) throws Exception { CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); // 创建节点,如果节点已经存在则抛出异常 try { curatorClint.create().forPath("/test"); } catch (Exception e) { System.out.println("创建节点失败!"+e.getMessage()); } // 删除节点 try { curatorClint.delete().forPath("/test"); System.out.println("删除节点成功!"); } catch (Exception e) { System.out.println("删除节点失败!"+e.getMessage()); } /** * 临时节点(EPHEMERAL):临时创建的,会话结束节点自动被删除,也可以手动删除,临时节点不能拥有子节点. * 持久节点(PERSISTENT):创建后永久存在,除非主动删除。 */ // 临时节点,当会话结束后,节点自动删除 curatorClint.create().withMode(CreateMode.EPHEMERAL) .forPath("/secondPath", "hello,word".getBytes()); System.out.println("临时节点:"+new String(curatorClint.getData().forPath("/secondPath"))); // 永久节点 curatorClint.create().withMode(CreateMode.PERSISTENT) .forPath("/thirdPath", "hello,word".getBytes()); System.out.println("永久节点:"+new String(curatorClint.getData().forPath("/thirdPath"))); // 递归创建节点 curatorClint.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/parent/child", "hello,word".getBytes()); System.out.println("递归创建节点:"+new String(curatorClint.getData().forPath("/parent/child"))); // 查询所有子节点 List<String> list= curatorClint.getChildren().forPath("/"); System.out.println(list); // 关闭连接 curatorClint.close(); } }
package com.shu.base; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.data.Stat; import java.util.Date; /** * @author 31380 * @description 读取节点数据 * @create 2024/3/17 11:28 */ public class CuratorReadTest { /** * 总结: * 1. 读取单个节点数据:curatorClint.getData().forPath("/base/test") * 2. 读取多个节点数据:curatorClint.getChildren().forPath("/test").forEach(System.out::println) * 3. 读取节点数据并获取 stat:curatorClint.getData().storingStatIn(stat).forPath("/base/test") * 4:Stat:节点状态,包含节点的版本、数据长度、子节点数量、创建时间、修改时间、最近一次修改的事务 ID、数据版本、ACL 版本、临时节点 * @param args */ public static void main(String[] args) { // 地址 String connectString = "127.0.0.1:2181"; // 确保连接字符串正确 CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); // 读取单个节点数据 try { byte[] bytes = curatorClint.getData().forPath("/base/test"); System.out.println(new String(bytes)); System.out.println("读取节点数据成功!"); } catch (Exception e) { System.out.println("读取节点数据失败!"+e.getMessage()); } // 读取多个节点数据 try { curatorClint.getChildren().forPath("/test").forEach(System.out::println); System.out.println("读取多个节点数据成功!"); } catch (Exception e) { System.out.println("读取多个节点数据失败!"+e.getMessage()); } try { Stat stat = new Stat(); byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test"); String dataString = new String(data); System.out.println("节点数据:" + dataString); System.out.println("节点状态:"); System.out.println(" 节点创建版本:" + stat.getCversion()); System.out.println(" 数据长度:" + stat.getDataLength()); System.out.println(" 子节点数量:" + stat.getNumChildren()); System.out.println(" 创建时间:" + new Date(stat.getCtime())); System.out.println(" 修改时间:" + new Date(stat.getMtime())); System.out.println(" 最近一次修改的事务 ID:" + stat.getMzxid()); System.out.println(" 数据版本:" + stat.getVersion()); System.out.println(" ACL 版本:" + stat.getAversion()); System.out.println(" 临时节点:" + stat.getEphemeralOwner()); System.out.println("读取节点数据并获取 stat 成功!"); } catch (Exception e) { System.out.println("读取节点数据并获取 stat 失败:" + e.getMessage()); } // 关闭连接 curatorClint.close(); } }
package com.shu.base; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; /** * @author 31380 * @description * @create 2024/3/16 19:25 */ public class CuratorDeleteTest { /** * 总结: * 1. 删除节点:delete().forPath("/test") * 2. 如果存在子节点,删除子节点:delete().deletingChildrenIfNeeded().forPath("/parent") * 3. 递归删除节点:delete().deletingChildrenIfNeeded().forPath("/secondPath") * 4. 判断节点是否存在:checkExists().forPath("/secondPath") * 5. 关闭连接:close() * @param args * @throws Exception */ public static void main(String[] args) throws Exception { CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); // 删除节点 try { curatorClint.delete().forPath("/test"); System.out.println("删除节点成功!"); } catch (Exception e) { System.out.println("删除节点失败!"+e.getMessage()); } // 如果存在子节点,删除子节点 try { curatorClint.delete().deletingChildrenIfNeeded().forPath("/parent"); System.out.println("删除节点成功!"); } catch (Exception e) { System.out.println("删除节点失败!"+e.getMessage()); } // 递归删除节点 curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath"); // 判断节点是否存在 if (curatorClint.checkExists().forPath("/secondPath") == null) { System.out.println("节点不存在!"); } else { System.out.println("节点存在!"); } // 关闭连接 curatorClint.close(); } }
package com.shu.base; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.data.Stat; /** * @author 31380 * @description * @create 2024/3/17 11:35 */ public class CuratorUpdateTest { /** * 总计 * 1. 更新节点:setData().forPath("/test", "hello,word".getBytes()) * 2. 指定版本更新节点:setData().withVersion(1).forPath("/test", "hello,word".getBytes()) * @param args */ public static void main(String[] args) throws Exception { // 地址 String connectString = "127.0.0.1:2181"; //创建节点 CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); // 更新节点 try { curatorClint.setData().forPath("/base/test", "hello,word1111".getBytes()); // 获取节点数据 byte[] bytes = curatorClint.getData().forPath("/base/test"); System.out.println(new String(bytes)); System.out.println("更新节点成功!"); } catch (Exception e) { System.out.println("更新节点失败!"+e.getMessage()); } // 先获取节点的版本号 Stat stat = new Stat(); byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test"); String dataString = new String(data); System.out.println("节点数据:" + dataString); System.out.println("节点状态:"); System.out.println(" 数据版本:" + stat.getVersion()); // 指定版本更新节点:CAS 机制 try { curatorClint.setData().withVersion(stat.getVersion()).forPath("/base/test", "hello,word2222".getBytes()); // 获取节点数据 byte[] bytes = curatorClint.getData().forPath("/base/test"); System.out.println(new String(bytes)); System.out.println("更新节点成功!"); } catch (Exception e) { System.out.println("更新节点失败!"+e.getMessage()); } } }
package com.shu.base; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author 31380 * @description Curator异步操作 * @create 2024/3/17 11:42 */ public class CuratorAyncTest { /** * 总结: * 1 异步操作:inBackground() * 2.创建节点,如果节点已经存在则抛出异常 create().forPath() * 3.递归创建节点 creatingParentsIfNeeded() * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // 地址 String connectString = "127.0.0.1:2181"; //创建节点 CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); CountDownLatch cdl = new CountDownLatch(2); ExecutorService executorService = Executors.newFixedThreadPool(2); curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> { System.out.println("Code:" + event.getResultCode()); System.out.println("Type:" + event.getType()); System.out.println("Path:" + event.getPath()); cdl.countDown(); }, executorService).forPath("/test1", "hello,word".getBytes()); curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> { System.out.println("Code:" + event.getResultCode()); System.out.println("Type:" + event.getType()); System.out.println("Path:" + event.getPath()); cdl.countDown(); }).forPath("/test2", "hello,word".getBytes()); cdl.await(); executorService.shutdown(); curatorClint.close(); } /** * 事件类型 * CREATE, // 创建 * DELETE, // 删除 * EXISTS, // 存在 * GET_DATA, // 获取数据 * SET_DATA, // 设置数据 * CHILDREN, // 子节点 * SYNC, // 同步 * GET_ACL, // 获取ACL * SET_ACL, // 设置ACL * TRANSACTION, // 事务 * GET_CONFIG, // 获取配置 * RECONFIG, // 重新配置 * WATCHED, // 监听 * REMOVE_WATCHES, // 移除监听 * CLOSING; // 关闭 * @param args */ /** * 响应码 * OK(0), // OK * CONNECTIONLOSS(-4), // 连接丢失 * MARSHALLINGERROR(-7), // 编组错误 * UNIMPLEMENTED(-9), // 未实现 * OPERATIONTIMEOUT(-10), // 操作超时 * BADARGUMENTS(-8), // 错误参数 * APIERROR(-100), // API错误 * NONODE(-101), // 无节点· */ }
package com.shu.base; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; /** * @author 31380 * @description * @create 2024/3/17 11:03 */ public class CuratorSEQCreat { /** * 临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会有序列号。 * 持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会有序列号。 * @param args */ public static void main(String[] args) { CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); // 创建一个持久顺序节点A-1,A-2,A-3 try { curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes()); curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes()); curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes()); System.out.println("创建节点成功!"); } catch (Exception e) { System.out.println("创建节点失败!"+e.getMessage()); } // 创建一个临时顺序节点B-1,B-2,B-3 try { curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes()); curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes()); curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes()); System.out.println("创建节点成功!"); } catch (Exception e) { System.out.println("创建节点失败!"+e.getMessage()); } // 关闭连接 curatorClint.close(); } }
package com.shu.base; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.transaction.CuratorTransactionResult; import java.util.Collection; /** * @author 31380 * @description TODO * @create 2024/3/16 19:28 */ public class CuratorTransactionTest { public static void main(String[] args) throws Exception { CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); Collection<CuratorTransactionResult> commit = curatorClint.inTransaction() .create().forPath("/xiao", "456".getBytes()).and() .setData().forPath("/xiao", "123".getBytes()).and() .commit(); for (CuratorTransactionResult result : commit) { System.out.println(result.getForPath() + "--->" + result.getType()); } curatorClint.close(); } }
package com.shu.acl; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; /** * @author 31380 * @description * @create 2024/3/16 19:56 */ public class CuratorAclTest { public static void main(String[] args) { CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); // 创建节点,ACL为ip: try { curatorClint.create().withACL(CuratorUtils.getAclList()).forPath("/test"); System.out.println("创建节点成功!"); } catch (Exception e) { System.out.println("创建节点失败!"+e.getMessage()); } } } /** * @description * @author 31380 * @create 2024/3/17 11:12 * Schema 代表权限控制模式,分别为: * ● World 任何人 * ● Auth 不需要ID * ● Digest 用户名和密码方式的认证 * ● IP Address IP地址方式的认证 * perms(权限),ZooKeeper支持如下权限 * ● CREATE: 创建子节点 * ● READ: 获取子节点与自身节点的数据信息 * ● WRITE:在Znode节点上写数据 * ● DELETE:删除子节点 * ● ADMIN:设置ACL权限 * ———————————————— */ package com.shu.acl;
package com.shu.lock; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import java.text.SimpleDateFormat; import java.util.concurrent.CountDownLatch; /** * @author 31380 * @description 分布式锁 * @create 2024/3/17 13:12 */ public class LockTest { /** * 分布式锁:InterProcessMutex * 1: 获取锁,acquire() * 2: 释放锁,release() * 3: 创建 InterProcessMutex 对象 * 4: 调用 acquire() 方法获取锁 * 5: 业务操作 * 6: 调用 release() 方法释放锁 * @param args */ public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(10); String connectString = "127.0.0.1:2181"; String lockPath = "/lock"; CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000); curatorFramework.start(); InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); for (int i = 0; i < 10; i++) { new Thread(() -> { try { latch.await(); lock.acquire(); System.out.println(Thread.currentThread().getName() + "获取到锁"); // 模拟业务操作,生成订单号 Thread.sleep(1000); SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); String orderNo = sdf.format(System.currentTimeMillis()); System.out.println("生成的订单号:" + orderNo); } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } }, "Thread-" + i).start(); latch.countDown(); } } }
package com.shu.lock; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; /** * @author 31380 * @description 分布式计数器 * @create 2024/3/17 13:20 */ public class RecipeDisAtomicIntTest { /** * 分布式计数器:DistributedAtomicInteger * 1、创建DistributedAtomicInteger对象 * 2、调用add方法 * 3、获取当前值 * * @param args */ public static void main(String[] args) { String connectString = "127.0.0.1:2181"; String connectString2 = "127.0.0.1:2182"; String connectString3 = "127.0.0.1:2183"; CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000); curatorFramework.start(); DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(curatorFramework, "/atomic", null); try { AtomicValue<Integer> added = atomicInteger.add(1); System.out.println("1Result: " + added.succeeded()); // 获取当前值 System.out.println("2Result: " + added.postValue()); } catch (Exception e) { throw new RuntimeException(e); } // 客户端2 CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000); curatorFramework2.start(); DistributedAtomicInteger atomicInteger2 = new DistributedAtomicInteger(curatorFramework2, "/atomic", null); try { AtomicValue<Integer> added = atomicInteger2.add(1); System.out.println("2Result: " + added.succeeded()); // 获取当前值 System.out.println("2Result: " + added.postValue()); } catch (Exception e) { throw new RuntimeException(e); } // 客户端3 CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000); curatorFramework3.start(); DistributedAtomicInteger atomicInteger3 = new DistributedAtomicInteger(curatorFramework3, "/atomic", null); try { AtomicValue<Integer> added = atomicInteger3.add(1); System.out.println("3Result: " + added.succeeded()); // 获取当前值 System.out.println("3Result: " + added.postValue()); } catch (Exception e) { throw new RuntimeException(e); } } }
package com.shu.lock; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; /** * @author 31380 * @description * 分布式Barrier:分布式 Barrier 是一种常见的同步原语,用于在分布式系统中协调多个进程或线程的执行顺序。 * 它可以用来实现诸如等待直到所有参与者都准备好,然后一起执行某项任务,或者等待直到某些条件达成后再继续执行的场景。 * @create 2024/3/17 13:27 */ public class CycliBarrierTest { static DistributedBarrier barrier; public static void main(String[] args) { String connectString = "127.0.0.1:2181"; String path="/barrier"; CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000); curatorFramework.start(); // 等待所有的线程到达barrier 10个线程 for (int i = 0; i < 10; i++) { new Thread(() -> { try { barrier = new DistributedBarrier(curatorFramework, path); System.out.println(Thread.currentThread().getName() + "号barrier设置"); barrier.setBarrier(); barrier.waitOnBarrier(); System.out.println("启动..."); } catch (Exception e) { e.printStackTrace(); } }, "Thread-" + i).start(); } try { Thread.sleep(2000); barrier.removeBarrier(); } catch (Exception e) { e.printStackTrace(); } } }
package com.shu.master; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; /** * @author 31380 * @description 主节点选举 * @create 2024/3/17 13:03 */ public class MasterSelectTest { /** * 主节点选举:LeaderSelector * 1、创建LeaderSelector对象 * 2、调用start方法 * 3、添加监听器 * 4、关闭连接 * @param args */ public static void main(String[] args) { // 地址 String connectString = "127.0.0.1:2181"; String connectString2 = "127.0.0.1:2182"; String connectString3 = "127.0.0.1:2183"; // 创建并连接 CuratorFramework 实例 CuratorFramework curatorFramework1 = CuratorUtils.createCuratorFramework(connectString, 1000, 1000); CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000); CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000); curatorFramework1.start(); curatorFramework2.start(); curatorFramework3.start(); // 第一个节点 LeaderSelector leaderSelector1 = new LeaderSelector(curatorFramework1, "/master1", new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("节点1成为master节点"); Thread.sleep(10000); System.out.println("节点1完成master操作,释放master权利"); } }); leaderSelector1.autoRequeue(); leaderSelector1.start(); // 第二个节点 LeaderSelector leaderSelector2 = new LeaderSelector(curatorFramework2, "/master1", new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("节点2成为master节点"); Thread.sleep(10000); System.out.println("节点2完成master操作,释放master权利"); } }); leaderSelector2.autoRequeue(); leaderSelector2.start(); // 第三个节点 LeaderSelector leaderSelector3 = new LeaderSelector(curatorFramework3, "/master1", new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("节点3成为master节点"); Thread.sleep(10000); System.out.println("节点3完成master操作,释放master权利"); } }); leaderSelector3.autoRequeue(); leaderSelector3.start(); try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } }
package com.shu.watch; import com.shu.CuratorUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; /** * @author 31380 * @description * @create 2024/3/16 19:38 */ public class CuratorNodeCacheTest { /** * NodeCache:监听节点的新增、修改操作 * 1、创建NodeCache对象 * 2、调用start方法 * 3、添加监听器 * 4、关闭连接 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { String path = "/test"; CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000); System.out.println("连接成功!"); curatorClint.start(); final NodeCache nodeCache = new NodeCache(curatorClint, path); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("监听事件触发"); System.out.println("重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData())); } }); curatorClint.setData().forPath(path,"456".getBytes()); curatorClint.setData().forPath(path,"789".getBytes()); curatorClint.setData().forPath(path,"123".getBytes()); curatorClint.setData().forPath(path,"222".getBytes()); curatorClint.setData().forPath(path,"333".getBytes()); curatorClint.setData().forPath(path,"444".getBytes()); Thread.sleep(15000); } }
package com.shu.watch; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; /** * @author 31380 * @description * @create 2024/3/16 19:43 */ public class CuratorPathChildrenCacheTest { /** * PathChildrenCache:监听子节点的新增、修改、删除操作 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { CuratorFramework client = getClient(); String parentPath = "/p1"; PathChildrenCache pathChildrenCache = new PathChildrenCache(client,parentPath,false); /* * StartMode:初始化方式 * POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件 * NORMAL:异步初始化 * BUILD_INITIAL_CACHE:同步初始化 * */ pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("事件类型:" + event.getType() + ";操作节点:" + event.getData().getPath()); switch(event.getType()){ case CHILD_ADDED: System.out.println("新增子节点:" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("更新子节点:" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("删除子节点:" + event.getData().getPath()); break; default: break; } } }); String path = "/p1/c1"; client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); // 此处需留意,如果没有现成睡眠则无法触发监听事件 client.delete().forPath(path); Thread.sleep(15000); } private static CuratorFramework getClient(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .retryPolicy(retryPolicy) .sessionTimeoutMs(6000) .connectionTimeoutMs(3000) .namespace("demo") .build(); client.start(); return client; } }
package com.shu.watch; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; /** * @author 31380 * @description * @create 2024/3/16 19:44 */ public class CuratorWatcher3 { private static final String CONNECT_ADDR = "127.0.0.1:2181"; private static final int SESSION_TIMEOUT = 5000; public static void main(String[] args) throws Exception { RetryPolicy policy = new ExponentialBackoffRetry(1000, 10); CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(policy).build(); curator.start(); TreeCache treeCache = new TreeCache(curator, "/treeCache"); treeCache.start(); treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> { switch (treeCacheEvent.getType()) { case NODE_ADDED: System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData()) + ",状态:" + treeCacheEvent.getData().getStat()); break; case NODE_UPDATED: System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData()) + ",状态:" + treeCacheEvent.getData().getStat()); break; case NODE_REMOVED: System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData()) + ",状态:" + treeCacheEvent.getData().getStat()); break; default: break; } }); curator.create().forPath("/treeCache", "123".getBytes()); curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes()); curator.setData().forPath("/treeCache", "789".getBytes()); curator.setData().forPath("/treeCache/c1", "910".getBytes()); curator.delete().forPath("/treeCache/c1"); curator.delete().forPath("/treeCache"); Thread.sleep(5000); curator.close(); } }
详细介绍参考书籍《从Paxos到Zookeeper:分布式一致性原理与实践》
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。