当前位置:   article > 正文

Java Api 如何操作 zookeeper (Curator客户端)_curator依赖

curator依赖

本文我们使用的客户端是目前比较成熟的Curator,他是基于zookeeper Api 的封装。

1. 依赖

使用Curator操作zookeeper,我们首先需要引入相关依赖:

  • framework 是他的核心框架
  • recipes 是基于zookeeper特性封装的一些方法,比如分布式锁、leader选举等。
<dependencies>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>5.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>5.2.0</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2. 连接

首先我们讲一下Curator的相关API。

作为一个客户端,他一般具有以下三个功能:

  • 建立连接(session)
  • CRUD的操作命令
  • 基于特性提供解决方案层面的封装

我们可以通过CuratorFrameworkFactory来建立连接,他里面提供了一个链式风格的构建方式,我们可以以这种方式快速建立连接。

public class CuratorMain {

    public static void main(String[] args) {
        
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                .connectionTimeoutMs(20000)
                .connectString("127.0.0.1:2181") //读写分离(zookeeper-server),可以跟多个节点,用逗号分隔;
                // 衰减重试的一个算法机制:baseSleepTimeMs*Math.max(1,random.nextInt(1<<(maxRetries+1))
                /**
                 * RetryNTimes 指定最大重试次数
                 * RetryOneTimes
                 * RetryUntilElapsed 一直重试,直到达到规定时间
                 */
                .retryPolicy(new ExponentialBackoffRetry(1000, 3)) //原生API没有重连机制
                .sessionTimeoutMs(15000)
                .build();
        curatorFramework.start(); //启动
        try {
            //获取节点数据
            byte[] data = curatorFramework.getData().forPath("/seq");
            System.out.println(new String(data));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

3. CRUD操作

使用Curator进行CRUD操作还是比较简单的,由于是链式编程风格,所有都会有相应的提示。

创建相关节点我们只需要通过建立的CuratorFramework连接调用create方法即可,然后设置相关的属性。

creatingParentsIfNeeded如果父节点不存在则会创建;

我们在zk客户端中创建节点的时候,如果父节点不存在会创建失败,所以使用这个方法会帮我们递归创建节点。

image-20220125114736961

withMode设置节点模型,可以是持久类型还是零时节点等。

String value = "Hello World";
String node = curatorFramework.create()
        //如果父节点不存在则创建
        .creatingParentsIfNeeded()
        //节点模型
        .withMode(CreateMode.PERSISTENT)
        .forPath("/node", value.getBytes());
System.out.println("节点创建成功:" + node);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

获取节点数据我们可以通过getData方法获取。

String result = new String(curatorFramework.getData().forPath(node));
  • 1

在获取节点数据的时候我们可以通过storingStatIn方法,将值放入stat对象中,这一步是Curator封装好的。在我们zk的客户端中,get 命令和 stat 命令获取到的数据是不一样的,所以在Curator中对其进行了一定的封装。

Stat stat = new Stat(); //存储状态信息的对象
//获取节点的value
byte[] data = curatorFramework.getData().storingStatIn(stat).forPath(node);
System.out.println("节点value值:" + new String(data));
  • 1
  • 2
  • 3
  • 4

image-20220125142013162

修改节点数据通过setData方法,中间还可以携带状态信息中的版本号,用来使用乐观锁,这部分可以自行了解下。

curatorFramework.setData()
        .withVersion(stat.getVersion())
        .forPath(node, "Update Date Result".getBytes());
  • 1
  • 2
  • 3

修改节点数据通过delete方法。

curatorFramework.delete().forPath(node);
  • 1

我们还可以通过checkExists方法来判断节点是否存在。

curatorFramework.checkExists().forPath(node);
  • 1

4. 异步操作

异步操作就是我们在创建数据节点的时候,我们可以不需要等到这个节点创建完成在进行其他指令。

主要通过inBackground方法,这个方法里面会有多个重载方法,包括相关的回调。

image-20220125142241368

通过以下代码简单实现以下异步的操作:这边使用了JUC中的CountDownLatch,关于这部分的内容可以参考并发编程中相关的文章。这个时候输出的node是null,原因是我们获取到的是个value值,这个时候刚刚创建,所以为空。

public void asyncCRUD() throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    String node = curatorFramework.create().withMode(CreateMode.PERSISTENT)
            .inBackground((session, event) -> {
                System.out.println(Thread.currentThread().getName() + ":执行创建节点:" + event.getPath());
                countDownLatch.countDown(); //触发回调,递减计数器
            }).forPath("/async-node");
    countDownLatch.await();
    System.out.println("异步执行创建节点:" + node);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

5. ACL (权限)

在zk中还有一个权限控制的相关问题,在上一篇文章中我们演示了如何在客户端中使用权限的相关命令,本文我们通过Java API来实现相关操作。

大致流程主要就是创建一个ID授权标识符,然后在创建节点的时候使用withACL将其添加进去。

public void aclOperation() throws Exception {
     //授权标识符
     //会根据 : 后的字符进行签名
    Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("cc:cc"));
    List<ACL> acls = new ArrayList<>();
    acls.add(new ACL(ZooDefs.Perms.ALL, id));
    String node = curatorFramework.create().creatingParentsIfNeeded()
            .withMode(CreateMode.PERSISTENT)
            .withACL(acls, false).forPath("/curator-auth", "Auth".getBytes());
    System.out.println("创建带有权限节点:" + node);
    System.out.println("数据查询结果:" + new String(curatorFramework.getData().forPath(node)));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

需要注意的是,由于zk他的授权是在会话层面的,所以我们建立连接的时候需要进行授权。通过authorization方法授权。

public ZookeeperACLExample() {
    curatorFramework = CuratorFrameworkFactory
            .builder()
            .connectionTimeoutMs(20000)
            .connectString("127.0.0.1:2181") //读写分离(zookeeper-server)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .authorization("digest", "cc:cc".getBytes())
            .sessionTimeoutMs(20000)
            .build();
    curatorFramework.start(); //启动
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

6. watch 监听机制

zookeeper 3.6之后,使用CuratorCache实现watcher机制,所以接下来我们将以下通过CuratorCache如何实现监听。

首先我们可以到该类下看看有哪些事件。包括节点创建、节点删除、节点更新、当前节点的子节点又增删改事件的时候出发等。

image-20220125154525020

image-20220125154601370

了解玩相关事件后,我们通过代码来演示一下:

6.1 普通监听

我们可以通过CuratorWatcher来实现一个普通监听,可以通过实现接口或者内部函数的方式来重写其中的process方法。

在如下代码中我们首先创建一个监听,然后创建一个节点,节点创建完成之后绑定监听,然后对节点信息进行修改。

public void normalWatcher() throws Exception {
    CuratorWatcher curatorWatcher = new CuratorWatcher() {
        @Override
        public void process(WatchedEvent watchedEvent) throws Exception {
            System.out.println("监听到的事件" + watchedEvent.toString());
        }
    };
    String node = curatorFramework.create().forPath("/watcher1", "Watcher String".getBytes());
    System.out.println("节点创建成功:" + node);
    //设置一次普通的watcher监听
    String data = new String(curatorFramework.getData().usingWatcher(curatorWatcher).forPath(node));
    System.out.println("设置监听并获取节点数据:" + data);
    //第一次操作
    curatorFramework.setData().forPath(node, "change data 0".getBytes());
    Thread.sleep(1000);
    curatorFramework.setData().forPath(node, "change data 1".getBytes());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

image-20220125160306636

这个时候我们可以发现只监听到一次数据修改的事件,这是为什么呢?这是由于3.6之前的zk是一次性的,他只监听到了一次事件的变更。所以我们如果需要循环监听,那么我们需要进行如下处理,重新监听:

CuratorWatcher curatorWatcher = new CuratorWatcher() {
    @Override
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println("监听到的事件" + watchedEvent.toString());
        //循环设置监听
        curatorFramework.checkExists().usingWatcher(this).forPath(watchedEvent.getPath());
    }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

6.2 持久化订阅(3.6之后)

为了解决上述普通监听的问题,所以出现了持久化订阅,这就需要使用到CuratorCacheListener

我们首先使用CuratorCache构建一个实例,传入连接、节点名(表示我要监听的节点)以及选择项(包括三个类型:单节点缓存、数据压缩以及关闭后不清理缓存),选择项可以设置多个。

CuratorCache curatorCache = CuratorCache.
        build(curatorFramework, node, CuratorCache.Options.SINGLE_NODE_CACHE);
  • 1
  • 2

实例创建之后,我们就需要配置监听,这里的话我们一般不配置匿名内部类,而是直接实现相关接口能够复用。我们只需要实现CuratorCacheListener接口,重写event方法即可。

public class ZookeeperWatcherListener implements CuratorCacheListener {

    @Override
    public void event(Type type, ChildData oldData, ChildData data) {
        System.out.println("事件类型:" + type + ":oldData:" + oldData + ":data" + data);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

实现类创建好之后,我们需要配置相关的监听项:

CuratorCacheListener listener = CuratorCacheListener
        .builder()
        .forAll(new ZookeeperWatcherListener())
        .build();
  • 1
  • 2
  • 3
  • 4

builder之后是是多个监听项可以选择的,传入的参数就是我们刚刚实现的接口类。

image-20220125162031134

最后只需要绑定并启动即可。

public void persisWatcher(String node) {
    CuratorCache curatorCache = CuratorCache.
            build(curatorFramework, node, CuratorCache.Options.SINGLE_NODE_CACHE);
    CuratorCacheListener listener = CuratorCacheListener
            .builder()
            .forAll(new ZookeeperWatcherListener())
            .build();
    curatorCache.listenable().addListener(listener);
    curatorCache.start();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

配置监听之后,我们就可以对节点信息进行修改,查看监听信息。

7. 项目地址

ZK 学习案例的代码如下:

ZK-demo

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/772521
推荐阅读
相关标签
  

闽ICP备14008679号