当前位置:   article > 正文

zookeeper基本操作

zookeeper基本操作

一.命令操作
(1)数据模型
ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:
PERSISTENT持久化节点
EPHEMERAL 临时节点 :-e
PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

(2)服务端命令
启动 ZooKeeper 服务: ./zkServer.sh start
查看 ZooKeeper 服务状态: ./zkServer.sh status
停止 ZooKeeper 服务: ./zkServer.sh stop
重启 ZooKeeper 服务: ./zkServer.sh restart
(3)客户端常用命令

  • 连接服务器
    ./zkCli.sh -server ip:port
  • 断开连接
    quit
  • 查看命令帮助
    help
  • 显式指定目录下的节点
    ls 目录
  • 创建节点
    create /节点path value
  • 获取节点值
    get /节点path
  • 设置节点值
    set /节点path value
  • 删除单个节点
    delete /节点path
  • 删除带有子节点的节点
    deleteall /节点path
  • 创建临时节点
    create -e /节点path value
  • 创建顺序节点
    create -s /节点path value
  • 查询节点信息
    ls -s /节点path
    节点详细信息
    czxid:节点被创建的事务ID
    ctime: 创建时间
    mzxid: 最后一次被更新的事务ID
    mtime: 修改时间
    pzxid:子节点列表最后一次被更新的事务ID
    cversion:子节点的版本号
    dataversion:数据版本号
    aclversion:权限版本号
    ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
    dataLength:节点存储的数据的长度
    numChildren:当前节点的子节点个数
    二.javaAPI操作(Curator)
package com.js.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @author JiaShuai
 * @date 2022/12/4 23:02
 */
public class CuratorTest {
    private CuratorFramework client;
    /*
     *建立连接
     *
    */
    @Before
    public void testConnect(){
        /**
         * Create a new client
         *
         * @param connectString       连接字符串,zk的连接信息,ip和端口"192.168.254.100:2181"
         * @param sessionTimeoutMs    会话超时时间 ms
         * @param connectionTimeoutMs 连接超时时间 ms
         * @param retryPolicy         重试策略
         */
        //重试策略
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
        /*1.第一种方式*/
//        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.254.100:2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        client.start();
        /*2.第二种方式*/
        client = CuratorFrameworkFactory.builder().connectString("192.168.254.100:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("jiashuai")
                .build();
        client.start();
    }

    /**
     * 创建节点:create 持久 临时 顺序 数据
     * 1.基本创建
     * 2.创建节点,带有数据
     * 3.设置节点类型
     * 4.创建多级节点 /app1/p1
     */
    @Test
    public void testCreate() throws Exception {
        //1.基本创建
        //创建节点不创建数据,默认数据为当前机器的ip地址
        String path = client.create().forPath("/app1");
        System.out.println(path);
    }
    @Test
    public void testCreate1() throws Exception {
        //2.创建节点,带有数据
        String path = client.create().forPath("/app2","哈哈".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);
    }
    @Test
    public void testCreate2() throws Exception {
        //3.设置节点类型
        //默认是持久化
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
        System.out.println(path);
    }
    @Test
    public void testCreate4() throws Exception {
        //4.创建多级节点
        //creatingParentContainersIfNeeded();如果父节点不存在则创建父节点
        String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1");
        System.out.println(path);
    }

    /**
     * 查询节点
     * 1.查询数据 get
     * 2.查询子节点 ls
     * 3.查询节点状态信息 ls-s
     */
    @Test
    public void testGet1() throws Exception {
        //1.查询数据 get
        byte[] bytes = client.getData().forPath("/app1");
        System.out.println(new String(bytes));
    }
    @Test
    public void testGet2() throws Exception {
        //2.查询子节点 ls
        List<String> list = client.getChildren().forPath("/");
        System.out.println(list);
    }
    @Test
    public void testGet3() throws Exception {
        //创建节点状态对象
        Stat status=new Stat();
        //3.查询节点状态信息 ls-s
         client.getData().storingStatIn(status).forPath("/app1");
        System.out.println(status);
    }

    /**
     * 修改数据
     * 1.修改数据
     * 2.根据版本修改
     */
    @Test
    public void testSet() throws Exception {
        //1.修改数据
        client.setData().forPath("/app1","zhangsan".getBytes(StandardCharsets.UTF_8));
        byte[] bytes = client.getData().forPath("/app1");
        System.out.println(new String(bytes));
    }
    @Test
    public void testSetForVersion() throws Exception {
        //2.根据版本修改
        Stat status=new Stat();
        client.getData().storingStatIn(status).forPath("/app1");
        int version= status.getVersion();
        System.out.println("version = " + version);
        client.setData().withVersion(version).forPath("/app1","张三".getBytes(StandardCharsets.UTF_8));
        byte[] bytes = client.getData().forPath("/app1");
        System.out.println(new String(bytes));
    }

    /**
     * 删除节点
     *1.删除单个节点
     * 2.删除有子节点的节点(父节点)
     * 3.必须成功的删除
     * 4.回调
     */
    @Test
    public void testDelete1() throws Exception {
        //1.删除单个节点
        client.delete().forPath("/app1");
    }
    @Test
    public void testDelete2() throws Exception {
        //2.删除有子节点的节点(父节点)
        client.delete().deletingChildrenIfNeeded().forPath("/app4");
    }
    @Test
    public void testDelete3() throws Exception {
        //3必须成功的删除
        client.delete().guaranteed().forPath("/app2");
    }
    @Test
    public void testDelete4() throws Exception {
        //4.回调
        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("被删除啦啦啦啦");
                System.out.println("event = " + event);
            }
        }).forPath("/app2");
    }

    @After
    public void testClose(){
        //关闭client
        if (client!=null){
            client.close();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182

Watch事件监听

package com.js.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @author JiaShuai
 * @date 2022/12/4 23:02
 */
public class CuratorWatcherTest {
    private CuratorFramework client;
    /*
     *建立连接
     *
    */
    @Before
    public void testConnect(){
        /**
         * Create a new client
         *
         * @param connectString       连接字符串,zk的连接信息,ip和端口"192.168.254.100:2181"
         * @param sessionTimeoutMs    会话超时时间 ms
         * @param connectionTimeoutMs 连接超时时间 ms
         * @param retryPolicy         重试策略
         */
        //重试策略
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
        /*1.第一种方式*/
//        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.254.100:2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        client.start();
        /*2.第二种方式*/
        client = CuratorFrameworkFactory.builder().connectString("192.168.254.100:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("jiashuai")
                .build();
        client.start();
    }



    @After
    public void testClose(){
        //关闭client
        if (client!=null){
            client.close();
        }
    }

    /**
     *nodecache:给指定节点注册监听器
     */
    @Test
    public void testNodeCache() throws Exception {
        //1.创建NodeCache对象
        NodeCache nodeCache=new NodeCache(client,"/app1");
        //2.注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了!!!");
                //获取修改后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println("data = " + new String(data));
            }
        });
        //3.开启监听
        nodeCache.start(true);
        while (true){

        }
    }
    /**
     *PathChildrenCache:监听节点的孩子节点们
     */
    @Test
    public void testPathChildrenCache() throws Exception {
        //1.PathChildrenCache对象
        PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/app2",true);
        //2.注册监听
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println( "子节点变化了");
                System.out.println("event = " + event);
                //监听子节点数据变更,并且拿到变更后的数据
                //1.获取类型
                PathChildrenCacheEvent.Type type = event.getType();
                //2,判断节点是否为update
                if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    byte[] data = event.getData().getData();
                    System.out.println("data = " + new String(data));
                }
            }
        });
        //3.开启监听
        pathChildrenCache.start();
        while (true){

        }
    }
    /**
     *testTreeCache:监听节点和节点的孩子节点们
     */
    @Test
    public void testTreeCache() throws Exception {
        //1.PathChildrenCache对象
        TreeCache treeCache=new TreeCache(client,"/app2");
        //2.注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
                System.out.println( "节点变化了");
                System.out.println("event = " + event);
            }
        });
        //3.开启监听
        treeCache.start();
        while (true){

        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140

分布式锁
原理:
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。
1.客户端获取锁时,在lock节点下创建临时顺序节点。
2.然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
3.如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
4.如果发现比自己小的那个节点被删除,则客户端的
Watcher会收到相应通知,此时再次判断自己创建的节点
是否是lock子节点中序号最小的,如果是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点
并注册监听。
–分布式锁API
在Curator中有五种锁方案:
InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
InterProcessMutex:分布式可重入排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2:共享信号量

案例:12306售票系统

Ticket12306类

package com.js.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

/**
 * @author JiaShuai
 * @date 2022/12/5 18:03
 */
public class Ticket12306 implements Runnable{
    private int tickets=10; //票数
    private InterProcessMutex lock;

    public Ticket12306(){
        //重试策略
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
        /*1.第一种方式*/
//        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.254.100:2181",
//                60 * 1000, 15 * 1000, retryPolicy);
//        client.start();
        /*2.第二种方式*/
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.254.100:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        lock=new InterProcessMutex(client,"/lock");
    }
    @Override
    public void run() {
        while (true) {
            //获取锁
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if (tickets>0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

LockTest类

package com.js.curator;

/**
 * @author JiaShuai
 * @date 2022/12/5 18:05
 */
public class LockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306=new Ticket12306();

        //创建客户端
        Thread t1=new Thread(ticket12306,"携程");
        Thread t2=new Thread(ticket12306,"飞猪");

        t1.start();
        t2.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/688035
推荐阅读
相关标签
  

闽ICP备14008679号