赞
踩
本文我们使用的客户端是目前比较成熟的Curator,他是基于zookeeper Api 的封装。
使用Curator操作zookeeper,我们首先需要引入相关依赖:
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>
首先我们讲一下Curator的相关API。
作为一个客户端,他一般具有以下三个功能:
我们可以通过CuratorFrameworkFactory
来建立连接,他里面提供了一个链式风格的构建方式,我们可以以这种方式快速建立连接。
public class CuratorMain { public static void main(String[] args) { CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectionTimeoutMs(20000) .connectString("127.0.0.1:2181") //读写分离(zookeeper-server),可以跟多个节点,用逗号分隔; // 衰减重试的一个算法机制:baseSleepTimeMs*Math.max(1,random.nextInt(1<<(maxRetries+1)) /** * RetryNTimes 指定最大重试次数 * RetryOneTimes * RetryUntilElapsed 一直重试,直到达到规定时间 */ .retryPolicy(new ExponentialBackoffRetry(1000, 3)) //原生API没有重连机制 .sessionTimeoutMs(15000) .build(); curatorFramework.start(); //启动 try { //获取节点数据 byte[] data = curatorFramework.getData().forPath("/seq"); System.out.println(new String(data)); } catch (Exception e) { e.printStackTrace(); } } }
使用Curator进行CRUD操作还是比较简单的,由于是链式编程风格,所有都会有相应的提示。
创建相关节点我们只需要通过建立的CuratorFramework
连接调用create
方法即可,然后设置相关的属性。
creatingParentsIfNeeded
如果父节点不存在则会创建;
我们在zk客户端中创建节点的时候,如果父节点不存在会创建失败,所以使用这个方法会帮我们递归创建节点。
withMode
设置节点模型,可以是持久类型还是零时节点等。
String value = "Hello World";
String node = curatorFramework.create()
//如果父节点不存在则创建
.creatingParentsIfNeeded()
//节点模型
.withMode(CreateMode.PERSISTENT)
.forPath("/node", value.getBytes());
System.out.println("节点创建成功:" + node);
获取节点数据我们可以通过getData
方法获取。
String result = new String(curatorFramework.getData().forPath(node));
在获取节点数据的时候我们可以通过storingStatIn
方法,将值放入stat对象中,这一步是Curator封装好的。在我们zk的客户端中,get 命令和 stat 命令获取到的数据是不一样的,所以在Curator中对其进行了一定的封装。
Stat stat = new Stat(); //存储状态信息的对象
//获取节点的value
byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(node);
System.out.println("节点value值:" + new String(data));
修改节点数据通过setData
方法,中间还可以携带状态信息中的版本号,用来使用乐观锁,这部分可以自行了解下。
curatorFramework.setData()
.withVersion(stat.getVersion())
.forPath(node, "Update Date Result".getBytes());
修改节点数据通过delete
方法。
curatorFramework.delete().forPath(node);
我们还可以通过checkExists
方法来判断节点是否存在。
curatorFramework.checkExists().forPath(node);
异步操作就是我们在创建数据节点的时候,我们可以不需要等到这个节点创建完成在进行其他指令。
主要通过inBackground
方法,这个方法里面会有多个重载方法,包括相关的回调。
通过以下代码简单实现以下异步的操作:这边使用了JUC中的CountDownLatch,关于这部分的内容可以参考并发编程中相关的文章。这个时候输出的node是null,原因是我们获取到的是个value值,这个时候刚刚创建,所以为空。
public void asyncCRUD() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
String node = curatorFramework.create().withMode(CreateMode.PERSISTENT)
.inBackground((session, event) -> {
System.out.println(Thread.currentThread().getName() + ":执行创建节点:" + event.getPath());
countDownLatch.countDown(); //触发回调,递减计数器
}).forPath("/async-node");
countDownLatch.await();
System.out.println("异步执行创建节点:" + node);
}
在zk中还有一个权限控制的相关问题,在上一篇文章中我们演示了如何在客户端中使用权限的相关命令,本文我们通过Java API来实现相关操作。
大致流程主要就是创建一个ID授权标识符,然后在创建节点的时候使用withACL
将其添加进去。
public void aclOperation() throws Exception {
//授权标识符
//会根据 : 后的字符进行签名
Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("cc:cc"));
List<ACL> acls = new ArrayList<>();
acls.add(new ACL(ZooDefs.Perms.ALL, id));
String node = curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls, false).forPath("/curator-auth", "Auth".getBytes());
System.out.println("创建带有权限节点:" + node);
System.out.println("数据查询结果:" + new String(curatorFramework.getData().forPath(node)));
}
需要注意的是,由于zk他的授权是在会话层面的,所以我们建立连接的时候需要进行授权。通过authorization
方法授权。
public ZookeeperACLExample() {
curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString("127.0.0.1:2181") //读写分离(zookeeper-server)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.authorization("digest", "cc:cc".getBytes())
.sessionTimeoutMs(20000)
.build();
curatorFramework.start(); //启动
}
zookeeper 3.6之后,使用CuratorCache实现watcher机制,所以接下来我们将以下通过CuratorCache如何实现监听。
首先我们可以到该类下看看有哪些事件。包括节点创建、节点删除、节点更新、当前节点的子节点又增删改事件的时候出发等。
了解玩相关事件后,我们通过代码来演示一下:
我们可以通过CuratorWatcher
来实现一个普通监听,可以通过实现接口或者内部函数的方式来重写其中的process
方法。
在如下代码中我们首先创建一个监听,然后创建一个节点,节点创建完成之后绑定监听,然后对节点信息进行修改。
public void normalWatcher() throws Exception { CuratorWatcher curatorWatcher = new CuratorWatcher() { @Override public void process(WatchedEvent watchedEvent) throws Exception { System.out.println("监听到的事件" + watchedEvent.toString()); } }; String node = curatorFramework.create().forPath("/watcher1", "Watcher String".getBytes()); System.out.println("节点创建成功:" + node); //设置一次普通的watcher监听 String data = new String(curatorFramework.getData().usingWatcher(curatorWatcher).forPath(node)); System.out.println("设置监听并获取节点数据:" + data); //第一次操作 curatorFramework.setData().forPath(node, "change data 0".getBytes()); Thread.sleep(1000); curatorFramework.setData().forPath(node, "change data 1".getBytes()); }
这个时候我们可以发现只监听到一次数据修改的事件,这是为什么呢?这是由于3.6之前的zk是一次性的,他只监听到了一次事件的变更。所以我们如果需要循环监听,那么我们需要进行如下处理,重新监听:
CuratorWatcher curatorWatcher = new CuratorWatcher() {
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println("监听到的事件" + watchedEvent.toString());
//循环设置监听
curatorFramework.checkExists().usingWatcher(this).forPath(watchedEvent.getPath());
}
};
为了解决上述普通监听的问题,所以出现了持久化订阅,这就需要使用到CuratorCacheListener
。
我们首先使用CuratorCache
构建一个实例,传入连接、节点名(表示我要监听的节点)以及选择项(包括三个类型:单节点缓存、数据压缩以及关闭后不清理缓存),选择项可以设置多个。
CuratorCache curatorCache = CuratorCache.
build(curatorFramework, node, CuratorCache.Options.SINGLE_NODE_CACHE);
实例创建之后,我们就需要配置监听,这里的话我们一般不配置匿名内部类,而是直接实现相关接口能够复用。我们只需要实现CuratorCacheListener
接口,重写event方法即可。
public class ZookeeperWatcherListener implements CuratorCacheListener {
@Override
public void event(Type type, ChildData oldData, ChildData data) {
System.out.println("事件类型:" + type + ":oldData:" + oldData + ":data" + data);
}
}
实现类创建好之后,我们需要配置相关的监听项:
CuratorCacheListener listener = CuratorCacheListener
.builder()
.forAll(new ZookeeperWatcherListener())
.build();
builder之后是是多个监听项可以选择的,传入的参数就是我们刚刚实现的接口类。
最后只需要绑定并启动即可。
public void persisWatcher(String node) {
CuratorCache curatorCache = CuratorCache.
build(curatorFramework, node, CuratorCache.Options.SINGLE_NODE_CACHE);
CuratorCacheListener listener = CuratorCacheListener
.builder()
.forAll(new ZookeeperWatcherListener())
.build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
}
配置监听之后,我们就可以对节点信息进行修改,查看监听信息。
ZK 学习案例的代码如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。