当前位置:   article > 正文

zookeeper的使用

zookeeper的使用

1.zookeeper下载

zookeeper官方下载地址

2.zookeeper安装及配置

这里写图片描述
1.把zookeeper压缩包拷贝到3台电脑(1、3、5……台也行)

2.配置conf/zoo.cfg:
这里写图片描述

3.分别在3台电脑的${dataDir}目录下创建名为myid的文件:
/zookeeperdata/myid
server1机器的内容为1,
server2机器的内容为2,
server3机器的内容为3。
这里写图片描述

3.zoo.cfg配置文件的说明

  • tickTime:发送心跳的时间间隔,单位:毫秒;
  • dataDir:zookeeper保存数据的目录;
  • clientPort:客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接受客户端的访问请求;
  • initLimit:这个配置项是用来配置zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到Leader的Follower服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过5个心跳的时间(也就是tickTime)长度后,zookeeper服务器还没有接受到客户端的返回信息,那么表示这个客户端连接失败。踪的时间长度就是5*2000=10秒;
  • syncLimit:这个配置项标识Leader与Follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是2*2000=4秒;
  • server.A=B:C:D:其中A是一个数字,表示这个是第几号服务器;B是这个服务器的IP地址;C表示这个服务器与集群中的Leader服务器交换信息的端口;D表示万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口;如果是伪集群的配置方式,由于B都是一样,所以不同的zookeeper实例通信端口号不能一样,所以要给它们分配不同的端口号;

4.zookeeper常用命令

zkServer命令:
zkServer命令

zkClient命令:
执行./zkCli.sh进入客户端界面。
或者执行./zkCli.sh [-server 127.0.0.1:2181]。
这里写图片描述
这里写图片描述
注意:
删除节点有2个命令:delete(删除的节点没有子节点)和rmr(删除的节点可以有子节点)。

5.在程序中使用zookeeper

5.1使用zookeeper-3.4.10.jar

这里写图片描述

package cn.sky.zookeepertest;

import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class Main {
    public static void main(String[] args) throws Exception {
        String connectString = "127.0.0.1:2181";
        //伪集群
//      String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        CountDownLatch cdl = new CountDownLatch(1);//count=1
        ZooKeeper zkCli = new ZooKeeper(connectString, 1000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                KeeperState  state = event.getState();
                if(state.equals(KeeperState.SyncConnected)) {
                    System.out.println("SyncConnected");
                    cdl.countDown();//count--
                }
            }
        });
        cdl.await();//等待,直到count==0才继续往下执行,表明zookeeper已经连接成功
        getChildren(zkCli, "/");
//      watchPath(zkCli,"/myzk");
//      createPath(zkCli);
//      deletePath(zkCli);
        zkCli.close();
    }

    static void getChildren(ZooKeeper zkCli,String path) throws Exception{
        List<String> paths = zkCli.getChildren(path,false);
        System.out.println(paths);
    }

    static void deletePath(ZooKeeper zkCli) throws Exception {
        zkCli.delete("/myzk", -1);
    }

    static void watchPath(ZooKeeper zkCli,String path) throws Exception {
        zkCli.exists(path, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                EventType type = event.getType();
                System.out.println(path+"==>"+type.name());
            }
        });
    }

    static void createPath(ZooKeeper zkCli) throws Exception {
        Stat stat = zkCli.exists("/myzk", false);
        if(stat!=null) {
            System.out.println("/myzk节点已经存在,不能重复创建该节点");
        }else {
            System.out.println("/myzk节点不存在,现在开始创建/myzk节点");
            zkCli.create("/myzk", "hello".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

5.2使用curator

这里写图片描述

package cn.sky.zookeepertest;

import java.net.URLDecoder;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Main {
    private static CuratorFramework client;
    private static String connectString = "127.0.0.1:2181";
    private static InterProcessLock lock1;
    private static InterProcessLock lock2;
    private static String lockpath = "/mylock";
    static {
        client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000, 3));
        lock1 = new InterProcessMutex(client, lockpath);
        lock2 = new InterProcessMutex(client, lockpath);
    }

    public static void main(String[] args) throws Exception {
        client.start();
//      watchConnectionState();
//      getChildren("/");
//      watchNode("/");
//      createNode("/myzk/a");
//      Thread.sleep(1000);
//      System.out.println("================");
//      printZK("/","");
//      testLock();
    }

    private static void doSomething() throws Exception {
        System.out.println(Thread.currentThread().getName()+" begin.");
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName()+" end.");
    }

    /**
     * 测试分布式锁
     * 用多线程模拟分布式环境
     */
    static void testLock() {
        int getLockTimeout = 6;
        Thread t1 = new Thread() {
            public void run() {
                try {
                    if(lock1.acquire(getLockTimeout,TimeUnit.SECONDS)) {
                        doSomething();
                    };
                } catch (Exception e) {
                    e.printStackTrace();
                }finally {
                    if(lock1.isAcquiredInThisProcess()) {
                        try {
                            lock1.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
        };
        Thread t2 = new Thread() {
            public void run() {
                try {
                    if(lock2.acquire(getLockTimeout,TimeUnit.SECONDS)) {
                        doSomething();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if(lock2.isAcquiredInThisProcess()) {
                        try {
                            lock2.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
        };
        t1.start();
        t2.start();
    }

    /**
     * 打印path下的子节点
     * @param path
     * @param prex
     * @throws Exception
     */
    static void printZK(String path,String prex) throws Exception {
        System.out.println(prex+URLDecoder.decode(path.substring(path.lastIndexOf("/"), path.length()), "UTF-8"));
        if(client.checkExists().forPath(path)!=null) {
            List<String> cs = client.getChildren().forPath(path);
            for (String s : cs) {
                if(path.equals("/")) {
                    printZK(path+s,prex+"  ");
                }else {
                    printZK(path+"/"+s,prex+"  ");
                }
            }
        }
    }

    static class MyPathChildrenCacheListener implements PathChildrenCacheListener{
        private String parentPath;
        public MyPathChildrenCacheListener(String parentPath) {
            this.parentPath = parentPath;
        }
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            Type type = event.getType();
            String childPath = event.getData().getPath();
            switch(type) {
            case CHILD_ADDED://在父节点(parentPath)下添加子节点(childPath)后触发
                System.out.println("child_added:"+childPath);
                break;
            case CHILD_REMOVED://父节点下的子节点被删除后触发
                System.out.println("child_removed:"+childPath);
                break;
            case CHILD_UPDATED://父节点下的子节点的值被修改后触发
                System.out.println("child_updated:"+childPath);
                break;
            case CONNECTION_LOST:
                System.out.println("connection_lost");
                break;
            case CONNECTION_SUSPENDED:
                System.out.println("connection_suspended");
                break;
            case INITIALIZED:
                System.out.println("initialized");
                break;
            case CONNECTION_RECONNECTED:
                System.out.println("connection_reconnected");
                break;
            default:
                System.out.println("default");
                break;
            }
        }
    }

    static class MyConnectionStateListener implements ConnectionStateListener{
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            switch (newState) {
            case CONNECTED://第一次连接zkServer成功后触发
                System.out.println("connected");
                break;
            case LOST://关闭zkServer,十几秒(时间不一定)后触发
                System.out.println("lost");
                break;
            case READ_ONLY:
                System.out.println("read_only");
                break;
            case RECONNECTED://关闭zkServer后,再启动zkServer马上触发
                System.out.println("reconnected");
                break;
            case SUSPENDED://关闭zkServer后马上触发
                System.out.println("suspended");
                break;
            default:
                System.out.println("default");
                break;
            }
        }
    }

    /**
     * 监听客户端连接状态
     */
    static void watchConnectionState() {
        client.getConnectionStateListenable().addListener(new MyConnectionStateListener());
    }

    /**
     * 监听path节点下的子节点变化
     * @param parentNode 父节点
     * @throws Exception
     */
    static void watchNode(String parentNode) throws Exception {
        PathChildrenCache cache = new PathChildrenCache(client, parentNode, false);
        cache.start();//调用该方法之后,path节点会被创建,且为永久节点。
        cache.getListenable().addListener(new MyPathChildrenCacheListener(parentNode));
//      cache不能关闭
    }

    /**
     * 获取子节点
     * @param parentNode
     * @throws Exception
     */
    static void getChildren(String parentNode) throws Exception {
        List<String> nodes = client.getChildren().forPath(parentNode);
        System.out.println(nodes);
    }

    /**
     * 创建ZNode节点
     * @param node
     * @throws Exception
     */
    static void createNode(String node) throws Exception {
        Stat stat = client.checkExists().forPath(node);
        if(stat!=null) {
            System.out.println(node+"节点已存在");
        }else {
            System.out.println(node+"节点不存在,现在创建");
            //CreateMode.EPHEMERAL:The znode will be deleted upon the client's disconnect.
            client.create().withMode(CreateMode.EPHEMERAL).forPath(node);
            //CreateMode.PERSISTENT:The znode will not be automatically deleted upon client's disconnect.
//          client.create().withMode(CreateMode.PERSISTENT).forPath(node);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230

注意点:
1.调用以下方法后

new PathChildrenCache(client, parentPath, false).start();
  • 1

parentPath会以永久节点的形式被创建。
查看源码的部分调用关系为:

start()-->
start(StartMode.NORMAL)-->
offerOperation(new RefreshOperation(this, RefreshMode.STANDARD))-->
operation.invoke()-->
cache.refresh(mode)-->
ensurePath()-->
ensureContainers.ensure()-->
internalEnsure()-->
client.createContainers(path)-->
checkExists().creatingParentContainersIfNeeded().forPath(ZKPaths.makePath(path, "foo"))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

这里写图片描述

这里写图片描述

2.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener的public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception 方法:

  1. 该方法里面抛了异常,会被吃掉而不打印出任何的日志信息。解决方法:在该方法里面捕获异常并打印信息;
  2. 并且如果调用了event.getData().getData(),得到的永远是null;
  3. 调用client.create().withMode(CreateMode.EPHEMERAL).forPath(node)之后马上设置node节点的值,不会被监听到值的变化。临时解决方法:在create之后先Thread.sleep(time),再设置值,则可以被监听到值的变化,具体time是多少没法确定。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/975751
推荐阅读
相关标签
  

闽ICP备14008679号