赞
踩
1.1、zkClient客户端流程如下图所示
1.2、zkCli.sh配置代码如下:
- # use POSTIX interface, symlink is followed automatically
- ZOOBIN="${BASH_SOURCE-$0}"
- ZOOBIN="$(dirname "${ZOOBIN}")"
- ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
- if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
- . "$ZOOBINDIR"/../libexec/zkEnv.sh
- else
- . "$ZOOBINDIR"/zkEnv.sh
- fi
- "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
- -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
- org.apache.zookeeper.ZooKeeperMain "$@"
启动客户端 zkCli.sh文件里面的配置,实际运行代码如下:
- public static void main(String[] args) throws KeeperException, IOException, InterruptedException {
- ZooKeeperMain main = new ZooKeeperMain(args);
- main.run();
- }
1.3、Main方法流程:
new ZooKeeperMain 对象,调用run()方法,在ZookeeperMain的构造方法里面,重点是:
- public ZooKeeperMain(String[] args) throws IOException, InterruptedException {
- this.cl.parseOptions(args);
- System.out.println("Connecting to " + this.cl.getOption("server"));
- this.connectToZK(this.cl.getOption("server"));
- }
-
- protected void connectToZK(String newHost) throws InterruptedException, IOException {
- if (this.zk != null && this.zk.getState().isAlive()) {
- this.zk.close();
- }
- this.host = newHost;
- boolean readOnly = this.cl.getOption("readonly") != null;
- this.zk = new ZooKeeper(this.host, Integer.parseInt(this.cl.getOption("timeout")),
-
- new ZooKeeperMain.MyWatcher(), readOnly);
- }
-
- public void start() {
- this.sendThread.start();
- this.eventThread.start();
- }
最终在connectToZK方法里面也就是使用原生的Zk客户端进行连接的。
- public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) 、 throws IOException {
- this.watchManager = new ZooKeeper.ZKWatchManager();
- LOG.info("Initiating client connection, connectString=" + connectString + "sessionTimeout="+ sessionTimeout + " watcher=" + watcher);
- this.watchManager.defaultWatcher = watcher;
- ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
- HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
- this.cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout,
-
- this, this.watchManager, getClientCnxnSocket(), canBeReadOnly); //获得和服务端连接的对象
- this.cnxn.start();
- }
-
- ClientCnxn.ClientCnxn():
-
- public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher,
-
- long sessionId, byte[] sessionPasswd) throws IOException{
- this.zooKeeper = zooKeeper;
- this.watcher = watcher;
- this.sessionId = sessionId;
- this.sessionPasswd = sessionPasswd;
- // parse out chroot, if any
- int off = hosts.indexOf('/');
- if (off >= 0) {
- String chrootPath = hosts.substring(off);
- // ignore "/" chroot spec, same as null
- if (chrootPath.length() == 1) {
- this.chrootPath = null;
- } else {
- PathUtils.validatePath(chrootPath);
- this.chrootPath = chrootPath;
- }
- hosts = hosts.substring(0, off);
- } else {
- this.chrootPath = null;
- }
- String hostsList[] = hosts.split(",");
- for (String host : hostsList) {
- int port = 2181;
- String parts[] = host.split(":");
- if (parts.length > 1) {
- port = Integer.parseInt(parts[1]);
- host = parts[0];
- }
- InetAddress addrs[] = InetAddress.getAllByName(host);
- for (InetAddress addr : addrs) {
- serverAddrs.add(new InetSocketAddress(addr, port));
- }
- }
- this.sessionTimeout = sessionTimeout;
- connectTimeout = sessionTimeout / hostsList.length;
- readTimeout = sessionTimeout * 2 / 3;
- Collections.shuffle(serverAddrs);
- sendThread = new SendThread();
- eventThread = new EventThread();
- }
mian方法流程图如下所示:
第三步第四步执行如下:
- @SuppressWarnings("unchecked")
- void run() throws KeeperException, IOException, InterruptedException {
- if (cl.getCommand() == null) {
- System.out.println("Welcome to ZooKeeper!");
- boolean jlinemissing = false;
- // only use jline if it's in the classpath
- try {
- Class consoleC = Class.forName("jline.ConsoleReader");
- Class completorC =Class.forName("org.apache.zookeeper.JLineZNodeCompletor");
- System.out.println("JLine support is enabled");
- Object console = consoleC.getConstructor().newInstance();
- Object completor =completorC.getConstructor(ZooKeeper.class).newInstance(zk);
- Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor"));
- addCompletor.invoke(console, completor);
- String line;
- Method readLine = consoleC.getMethod("readLine", String.class);
- // 循环读取命令并执行
- while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
- executeLine(line);
- }
- }
- }
-
- public void executeLine(String line)throws InterruptedException, IOException, KeeperException {
- if (!line.equals("")) {
- cl.parseCommand(line);
- addToHistory(commandCount,line);
- processCmd(cl); // 第5步:执行命令
- commandCount++;
- }
- }
第6步:处理命令
- protected boolean processZKCmd(MyCommandOptions co) throws KeeperException
- IOException, InterruptedException{
- Stat stat = new Stat();
- String[] args = co.getArgArray();
- String cmd = co.getCommand();
- if (args.length < 1) {
- usage();
- return false;
- }
- if (!commandMap.containsKey(cmd)) {
- usage();
- return false;
- }
- boolean watch = args.length > 2;
- String path = null;
- List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
- LOG.debug("Processing " + cmd);
- if (cmd.equals("quit")) {
- System.out.println("Quitting...");
- zk.close();
- System.exit(0);
- } else if (cmd.equals("history")) {
- .........
-
- // 创建节点命令
- if (cmd.equals("create") && args.length >= 3) {
- int first = 0;
- CreateMode flags = CreateMode.PERSISTENT; // 持久节点
- if ((args[1].equals("-e") && args[2].equals("-s"))
- || (args[1]).equals("-s") && (args[2].equals("-e"))) {
- first+=2;
- flags = CreateMode.EPHEMERAL_SEQUENTIAL; 临时顺序节点
- } else if (args[1].equals("-e")) {
- first++;
- flags = CreateMode.EPHEMERAL; // 临时节点
- } else if (args[1].equals("-s")) {
- first++;
- flags = CreateMode.PERSISTENT_SEQUENTIAL; // 持久顺序节点
- }
- if (args.length == first + 4) {
- acl = parseACLs(args[first+3]);
- }
- path = args[first + 1];
-
- // 第7步:解析create命令
- String newPath = zk.create(path, args[first+2].getBytes(), acl,flags);
- System.err.println("Created " + newPath);
- } else if (cmd.equals("delete") && args.length >= 2) {
- path = args[1];
- zk.delete(path, watch ? Integer.parseInt(args[2]) : -1);
- }
// 第8,9步:解析create命令参数
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。