当前位置:   article > 正文

Curator基本操作(Zookeeper节点增删改查)_curatorframework 删除临时节点

curatorframework 删除临时节点

Curator是Zookeeper的Java客户端库,官网为 https://curator.apache.org

环境

  • Ubuntu 22.04
  • Zookeeper 3.7.1
  • JDK 17.0.3.1
  • IntelliJ IDEA 2022.1.3
  • Curator 5.4.0

准备

创建Maven项目 test1215

打开 pom.xml 文件,添加如下依赖:

        <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.4.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.4.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

src/test/java 目录下,创建测试文件 Test1215.java

首先加上连接和关闭连接的测试:

package pkg1;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class Test1215 {
    private CuratorFramework client;

    @Before
    public void testConnect() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);

        client = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("test1215")
                .build();

        client.start();
    }

    // quit
    @After
    public void testClose() {
        if (client != null) {
            client.close();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

注:

  • 本例是单机Zookeeper,所以connectionString是 localhost:2181 ,如果是Zookeeper集群,则connectionString是 localhost:2181,localhost:2182,localhost:2183 这样的。
  • 在创建连接时,指定了namespace为 test1215 ,表示所有的操作都是在 /test1215 节点下操作的,否则默认是在根节点 / 下操作的。

增删改查操作

创建节点

    // create <node>
    @Test
    public void test1() throws Exception {
        client.create().forPath("/app1");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

运行测试,然后在命令行下查看:

ls /
[test1, test1215, test20000000003, test20000000004, test30000000005, zookeeper]
  • 1
  • 2
ls /test1215
[app1]
  • 1
  • 2

可见,创建了节点 /test1215/test1215/app1

get /test1215/app1
127.0.1.1
  • 1
  • 2

注意,如果不设置数据,默认会设置为机器的IP地址,即 127.0.0.1

若要显式设置数据,则 forPath() 方法需要多一个参数。

    // create <node> <data>
    @Test
    public void test2() throws Exception {
        client.create().forPath("/app2", "Hello".getBytes());
    }
  • 1
  • 2
  • 3
  • 4
  • 5

若要设置节点的临时性和顺序性,则要用 withMode() 方法:

    // create -es <node>
    @Test
    public void test3() throws Exception {
        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/app3");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

本例中创建了一个临时且顺序的节点。

注意,会话结束后,临时节点就会自动删除,所以要验证的话,需要设置断点,让程序暂停,然后在命令行下查看:

ls /test1215
[app1, app2, app30000000002]
  • 1
  • 2

可见,创建了顺序节点 app30000000002

等运行结束后再查看:

ls /test1215
[app1, app2]
  • 1
  • 2

可见,临时节点在会话结束之后自动删除了。

如果需要一次性创建多级节点,则要用 creatingParentsIfNeeded() 方法:

    // create <parent>
    // create <parent/child>
    @Test
    public void test4() throws Exception {
        client.create().creatingParentsIfNeeded().forPath("/app4/child1");

        client.create().creatingParentsIfNeeded().forPath("/app4/child2/aaa");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

注:貌似没有对应的命令,Curator可能是一级一级创建的。

运行测试,然后在命令行下查看:

ls /test1215
[app1, app2, app4]
  • 1
  • 2
ls /test1215/app4
[child1, child2]
  • 1
  • 2
ls /test1215/app4/child2
[aaa]
  • 1
  • 2

注意:默认的数据 127.0.0.1 是设置在最小的节点上的,本例中为 /test1215/app4/child1/test1215/app4/child2/aaa

查询节点

获取节点数据:

    // get <node>
    @Test
    public void test5() throws Exception {
        byte[] data = client.getData().forPath("/app2");

        String str = new String(data);

        System.out.println(str);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

运行结果如下:

Hello
  • 1

查询子节点:

    // ls <node>
    @Test
    public void test6() throws Exception {
        List<String> list = client.getChildren().forPath("/");
        list.forEach(System.out::println);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

运行结果如下:

app2
app1
app4
  • 1
  • 2
  • 3

注意:这里的 / 实际上是 /test1215 ,因为设置了namespace。

查询节点状态:

    // ls -s <node>
    @Test
    public void test7() throws Exception {
        Stat stat = new Stat();

        System.out.println(stat);

        client.getData().storingStatIn(stat).forPath("/app2");

        System.out.println(stat);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

运行结果如下:

0,0,0,0,0,0,0,0,0,0,0

103,103,1671156504367,1671156504367,0,0,0,0,5,0,103
  • 1
  • 2
  • 3

设置节点数据

直接更新节点数据:

    // set <node> <data>
    @Test
    public void test8() throws Exception {
        client.setData().forPath("/app2", "China".getBytes());
    }
  • 1
  • 2
  • 3
  • 4
  • 5

运行测试,然后在命令行下查看:

get /test1215/app2
China
  • 1
  • 2

根据版本号更新数据:

    // set -v <version> <node> <data>
    @Test
    public void test9() throws Exception {
        Stat stat = new Stat();

        client.getData().storingStatIn(stat).forPath("/app2");

        int version = stat.getVersion();

        System.out.println("Version = " + version);

        client.setData().withVersion(version).forPath("/app2", "good".getBytes());
//        client.setData().withVersion(100).forPath("/app2", "good".getBytes());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

运行结果如下:

Version = 1
  • 1

注意:本例中,先获取节点的版本,在更新数据时,带上该版本号,以确保没有并发问题。如果此时实际的版本号已改变,二者不相等,则不会更新数据,而是抛出异常。

更新成功后版本号会加1。本例中,原先的版本号为1,如果更新成功,则版本号会变成2。

在命令行验证版本号:

ls -s /test1215/app2
[]
cZxid = 0x67
ctime = Fri Dec 16 10:08:24 CST 2022
mZxid = 0x80
mtime = Fri Dec 16 10:25:16 CST 2022
pZxid = 0x67
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

可见,当前的 dataVersion 已经变为2。

如果更新时,获取的版本号和实际版本号不相等(本例中可用硬编码来指定一个错误的版本号),则会报错:

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /test1215/app2
  • 1

删除节点

删除没有子节点的节点:

    // delete <node>
    @Test
    public void test10() throws Exception {
        client.delete().forPath("/app1");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

运行测试,然后在命令行下查看:

ls /test1215
[app2, app4]
  • 1
  • 2

可见 /test1215/app1 已经删除了。

删除带有子节点的节点:

    // deleteall <node>
    @Test
    public void test11() throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath("/app4/child2");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

运行测试,然后在命令行下查看:

ls /test1215/app4
[child1]
  • 1
  • 2

可见, /test1215/app4/child2 连同其子节点 /test1215/app4/child2/aaa 一起删除了。

确保删除成功:

    // delete <node>
    @Test
    public void test12() throws Exception {
        client.delete().guaranteed().forPath("/app4/child1");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

运行测试,然后在命令行下查看:

ls /test1215/app4
[]
  • 1
  • 2

本例中看不出 guaranteed() 的效果。

可以在删除节点时,添加一个回调函数:

    // delete <node>
    @Test
    public void test13() throws Exception {
        client.delete().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                System.out.println("Deleted");
                System.out.println(curatorEvent);
            }
        }).forPath("/app2");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

运行结果如下:

Deleted
CuratorEventImpl{type=DELETE, resultCode=0, path='/app2', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null, opResults=null}
  • 1
  • 2

在命令行下查看:

[zk: localhost:2181(CONNECTED) 107] ls /test1215
[app4]
  • 1
  • 2

可见 /test1215/app2 已经删除了。

注意,此时如果把 /test1215/app4 删除,则 /test1215 节点为空,大概几秒钟后也就自动删除了。但是刚才把 /test1215/app4 下的所有子节点都删除之后, /test1215/app4 节点为空,并不会自动删除。可见用namespace创建的节点有一个特性:当其为空时,就会自动删除。这应该也是封装的Zookeeper的特性吧,有待研究。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号