赞
踩
目前,Zookeeper服务器有三种Java客户端: Zookeeper、Zkclient和Curator。
- Zookeeper: Zookeeper是官方提供的原生java客户端
- Zkclient: 是在原生zookeeper客户端基础上进行扩展的开源第三方Java客户端
- Curator: Netflix公司在原生zookeeper客户端基础上开源的第三方Java客户端
由于 Curator 较于其他两种客户端操作更简单,功能更丰富,可以说是当前最好用,最流行的ZooKeepe的客户端。所以接下来我们将以Curator作为Zookeeper客户端为例,进行整合开发。
Curator是Apache软件基金会下的一个开源框架,目前是Apache下的顶级项目。Curator起初是 Netflix公司开源的一套ZooKeeper客户端框架,后捐献给Apache。和 ZkClient一样,它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及 NodeExistsException异常等。
Curator可以提供支持常见的ZooKeeper应用场景:
curator它主要包含三个依赖(curator的依赖都已经放到maven仓库,你直接使用maven来构建它。对于大多数人来说,我们可能最常需要引入的是curator-recipes):
注意:目前Curator2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性。
最新版本Curator4.0十分依赖Zookeeper3.5.X。Curator4.0在软兼容模式下支持Zookeeper3.4.X,但是需要依赖排除zookeeper。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example.ahao</groupId> <artifactId>ahao_zookeeper</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.8</version> <relativePath/> </parent> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <!-- 编码字符集 --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- SpringBoot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- SpringBoot 测试包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- ZooKeeper客户端 curator --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project>
在开始配置之前,简单了解一下Curator提供的几种常用的重试策略实现类:
配置文件
# 端口号 server: port: 8888 # zookeeper配置 apache: zookeeper: # 服务器连接地址,集群模式则使用逗号分隔如:ip1:host,ip2:host connect-url: 127.0.0.1:2180 # 会话超时时间:单位ms session-timeout: 10000 # 连接超时时间:单位ms connection-timeout: 10000 # ACL权限控制,验证策略 scheme: auth # 验证内容id auth_id: admin:123456 # 重试策略 retry-policy: # 初始化间隔时间 base-sleep-time: 1000 # 最大重试次数 max-retries: 5 # 最大重试间隔时间 max-sleep: 30000
重试策略配置类
package com.ahao.demo.config; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @Name: CuratorRetryPolicy * @Description: 重试策略参数 * @Author: ahao * @Date: 2024/1/10 6:23 PM */ @ConfigurationProperties(prefix = "apache.retry-policy") @Configuration @Getter @Setter public class CuratorRetryPolicy { // 初始化间隔时间 private Integer baseSleepTime; // 最大重试次数 private Integer maxRetries; // 最大重试间隔时间 private Integer maxSleep; }
客户端配置类
package com.ahao.demo.config; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Name: ZkClientConfig * @Description: Curator客户端配置类 * @Author: ahao * @Date: 2024/1/10 3:52 PM */ @Configuration @ConfigurationProperties(prefix = "apache.zookeeper") @Setter @Slf4j public class ZkClientConfig { // 服务器连接地址,集群模式则使用逗号分隔如:ip1:host,ip2:host private String connectUrl; // 会话超时时间:单位ms private Integer sessionTimeout; // 连接超时时间:单位ms private Integer connectionTimeout; // ACL权限控制,验证策略 private String scheme; // 验证内容id private String authId; @Autowired private CuratorRetryPolicy curatorRetryPolicy; @Bean public CuratorFramework curatorFramework(){ CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(connectUrl) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) // 权限认证 //.authorization(scheme,authId.getBytes(StandardCharsets.UTF_8)) // 重试策略 .retryPolicy(new ExponentialBackoffRetry(curatorRetryPolicy.getBaseSleepTime() ,curatorRetryPolicy.getMaxRetries() ,curatorRetryPolicy.getMaxSleep())) .build(); // 启动客户端 curatorFramework.start(); return curatorFramework; } }
现在客户端已经配置好了,启动CuratorDemoApplication.class,观察一下是否能够正常启动。
观察输出日志,显示如下信息表示客户端连接成功。
为了偷懒,我没有写相关业务层代码,在启动类中通过ApplicationRunner的run方法在容器启动后直接执行。
代码如下
创建默认节点(持久节点):client.create().forPath("路径")
创建默认节点,带初始内容:client.create().forPath("路径","内容".getBytes())
创建临时节点:client.create().withMode(CreateMode.EPHEMERAL).forPath("路径")
递归方式创建节点(父节点可以不存在):client.create().creatingParentsIfNeeded().forPath("路径")
package com.ahao.demo; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; /** * @Name: CuratorDemoApplication * @Description: * @Author: ahao * @Date: 2024/1/10 3:29 PM */ @Slf4j @SpringBootApplication public class CuratorDemoApplication implements ApplicationRunner{ @Autowired private CuratorFramework client; public static void main(String[] args) { SpringApplication.run(CuratorDemoApplication.class,args); } @Override public void run(ApplicationArguments args) throws Exception { log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。"); TimeUnit.SECONDS.sleep(3); // 创建节点。如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空 client.create() // 如果需要,递归创建节点 .creatingParentsIfNeeded() // 指定创建节点类型 .withMode(CreateMode.EPHEMERAL) // 节点路径和数据 .forPath("/ahao/test","this is a book".getBytes(StandardCharsets.UTF_8)); } }
通过./zkCli.sh
启动的客户端终端,查看节点信息,起初没有/ahao/test节点,在启动容器后,执行新增节点方法。本来没有新增/ahao节点(没有父节点,直接创建子节点会失败),由于指定递归方式(.creatingParentsIfNeeded()),所以先完成了/ahao父节点的新增,然后再新增/ahao/test。观察/ahao和/ahao/test节点的数据:/ahao节点数据为空,/ahao/test节点数据就是我们在代码中传递的数据。最后,停止CuratorDemoApplication,观察节点信息发现,/ahao/test节点(临时节点)被删除了,而/ahao(持久节点)仍存在。
判断某个节点是否存在:client.checkExists().forPath()
获取某个节点的数据:client.getData().forPath()
获取某个节点下的子节点:client.getChildren().forPath()
@Override public void run(ApplicationArguments args) throws Exception { log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。"); TimeUnit.SECONDS.sleep(3); log.info("新增节点"); // 创建节点 client.create() // 如果需要,递归创建节点 .creatingParentsIfNeeded() // 指定创建节点类型 .withMode(CreateMode.EPHEMERAL) // 节点路径和数据 .forPath("/ahao/test","this is a book".getBytes(StandardCharsets.UTF_8)); // 睡眠1s TimeUnit.SECONDS.sleep(1); // 读取节点的数据内容 byte[] bytes = client.getData().forPath("/ahao/test"); String s = new String(bytes,StandardCharsets.UTF_8); log.info("读取到的数据内容:{}",s); // 判断节点是否存在并返回节点状态信息 Stat stat = client.checkExists().forPath("/ahao/test"); log.info("读取节点状态信息:{}", stat); // 获取子节点 List<String> list = client.getChildren().forPath("/ahao"); log.info("读取子节点:{}", list); }
日志输出如下:
更新节点内容:client.setData().forPath()
@Override public void run(ApplicationArguments args) throws Exception { log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。"); TimeUnit.SECONDS.sleep(3); log.info("新增节点"); // 创建节点 client.create() // 如果需要,递归创建节点 .creatingParentsIfNeeded() // 指定创建节点类型 .withMode(CreateMode.EPHEMERAL) // 节点路径和数据 .forPath("/ahao/test","this is a book".getBytes(StandardCharsets.UTF_8)); // 睡眠1s TimeUnit.SECONDS.sleep(1); // 读取节点的数据内容 byte[] bytes = client.getData().forPath("/ahao/test"); String s = new String(bytes,StandardCharsets.UTF_8); log.info("读取到的数据内容:{}",s); // 更新节点 client.setData().forPath("/ahao/test","这是一本书".getBytes(StandardCharsets.UTF_8)); // 再次读取节点的数据内容 byte[] bytes2 = client.getData().forPath("/ahao/test"); String s2 = new String(bytes2,StandardCharsets.UTF_8); log.info("读取到的数据内容:{}",s2); }
日志输出如下:
删除节点:client.delete().forPath()
递归方式删除节点及其子节点:client.delete().deletingChildrenIfNeeded().forPath()
@Override public void run(ApplicationArguments args) throws Exception { log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。"); TimeUnit.SECONDS.sleep(3); log.info("新增节点"); // 创建节点 client.create() // 如果需要,递归创建节点 .creatingParentsIfNeeded() // 指定创建节点类型 .withMode(CreateMode.EPHEMERAL) // 节点路径和数据 .forPath("/ahao/test","this is a book".getBytes(StandardCharsets.UTF_8)); // 睡眠1s TimeUnit.SECONDS.sleep(1); // 删除/ahao节点 // 直接删除会报错KeeperErrorCode = Directory not empty for /ahao因为/ahao下有子节点 // client.delete().forPath("/ahao"); // 正确方式删除/ahao节点 // client.delete().deletingChildrenIfNeeded().forPath("/ahao"); // 删除/ahao/test节点 client.delete().forPath("/ahao/test"); }
Curator使用BackgroundCallback接口实现有关服务端返回的结果信息处理。
public interface BackgroundCallback
{
/**
* Called when the async background operation completes
*
* @param client 当前客户端实例
* @param event operation result details 服务端事件操作结果,包含事件类型和响应码
* @throws Exception errors
*/
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
事件类型 在枚举类org.apache.curator.framework.api.CuratorEventType中有列举。
public enum CuratorEventType { /** * Corresponds to {@link CuratorFramework#create()} */ CREATE, /** * Corresponds to {@link CuratorFramework#delete()} */ DELETE, /** * Corresponds to {@link CuratorFramework#checkExists()} */ EXISTS, /** * Corresponds to {@link CuratorFramework#getData()} */ GET_DATA, /** * Corresponds to {@link CuratorFramework#setData()} */ SET_DATA, /** * Corresponds to {@link CuratorFramework#getChildren()} */ CHILDREN, /** * Corresponds to {@link CuratorFramework#sync(String, Object)} */ SYNC, /** * Corresponds to {@link CuratorFramework#getACL()} */ GET_ACL, /** * Corresponds to {@link CuratorFramework#setACL()} */ SET_ACL, /** * Corresponds to {@link CuratorFramework#transaction()} */ TRANSACTION, /** * Corresponds to {@link CuratorFramework#getConfig()} */ GET_CONFIG, /** * Corresponds to {@link CuratorFramework#reconfig()} */ RECONFIG, /** * Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()} */ WATCHED, /** * Corresponds to {@link CuratorFramework#watches()} ()} */ REMOVE_WATCHES, /** * Event sent when client is being closed */ CLOSING }
代码实现
@Override public void run(ApplicationArguments args) throws Exception { log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。"); TimeUnit.SECONDS.sleep(3); log.info("新增节点"); // 创建节点 client.create() // 如果需要,递归创建节点 .creatingParentsIfNeeded() // 指定创建节点类型 .withMode(CreateMode.EPHEMERAL) // 节点路径和数据 .forPath("/ahao/test","this is a book".getBytes(StandardCharsets.UTF_8)); // 睡眠1s TimeUnit.SECONDS.sleep(1); // 异步回调 BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { log.info("时间类型:{}",event.getType()); } }; // 删除/ahao/test节点 client.delete().deletingChildrenIfNeeded().inBackground(callback).forPath("/ahao/test"); }
日志输出中可以发现,不再是main线程:
本篇我们介绍了如何SpringBoot整合Curator客户端,并讲解了部分基本API的使用,有关更高级的用法如分布式锁、事件监听、分布式消息队列等功能将在下篇博客介绍。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。