当前位置:   article > 正文

第四章 zk源码解读笔记_zookeeper ${bash_source-$0}

zookeeper ${bash_source-$0}

一、客户端源码

1、总体流程

1.1、zkClient客户端流程如下图所示

 

1.2、zkCli.sh配置代码如下:

  1. # use POSTIX interface, symlink is followed automatically
  2. ZOOBIN="${BASH_SOURCE-$0}"
  3. ZOOBIN="$(dirname "${ZOOBIN}")"
  4. ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
  5. if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
  6.   . "$ZOOBINDIR"/../libexec/zkEnv.sh
  7. else
  8.   . "$ZOOBINDIR"/zkEnv.sh
  9. fi
  10. "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
  11.      -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
  12.      org.apache.zookeeper.ZooKeeperMain "$@"

启动客户端 zkCli.sh文件里面的配置,实际运行代码如下:

  1. public static void main(String[] args) throws KeeperException, IOException, InterruptedException {
  2.     ZooKeeperMain main = new ZooKeeperMain(args);
  3.     main.run();
  4. }

1.3、Main方法流程:

new ZooKeeperMain 对象,调用run()方法,在ZookeeperMain的构造方法里面,重点是:

  1. public ZooKeeperMain(String[] args) throws IOException, InterruptedException {
  2.     this.cl.parseOptions(args);
  3.     System.out.println("Connecting to " + this.cl.getOption("server"));
  4.     this.connectToZK(this.cl.getOption("server"));
  5. }
  6. protected void connectToZK(String newHost) throws InterruptedException, IOException {
  7.     if (this.zk != null && this.zk.getState().isAlive()) {
  8.         this.zk.close();
  9.     }
  10.     this.host = newHost;
  11.     boolean readOnly = this.cl.getOption("readonly") != null;
  12.     this.zk = new ZooKeeper(this.host, Integer.parseInt(this.cl.getOption("timeout")),
  13.  new ZooKeeperMain.MyWatcher(), readOnly);
  14. }
  15. public void start() {
  16.     this.sendThread.start();
  17.     this.eventThread.start();
  18. }

最终在connectToZK方法里面也就是使用原生的Zk客户端进行连接的。

  1. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)throws IOException {
  2.     this.watchManager = new ZooKeeper.ZKWatchManager();
  3.     LOG.info("Initiating client connection, connectString=" + connectString + "sessionTimeout="+ sessionTimeout + " watcher=" + watcher);
  4.     this.watchManager.defaultWatcher = watcher;
  5.     ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
  6.     HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
  7.     this.cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout,
  8.  this, this.watchManager, getClientCnxnSocket(), canBeReadOnly); //获得和服务端连接的对象
  9.     this.cnxn.start(); 
  10. }
  11. ClientCnxn.ClientCnxn():
  12. public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher,
  13. long sessionId, byte[] sessionPasswd) throws IOException{
  14.         this.zooKeeper = zooKeeper;
  15.         this.watcher = watcher;
  16.         this.sessionId = sessionId;
  17.         this.sessionPasswd = sessionPasswd;
  18.         // parse out chroot, if any
  19.         int off = hosts.indexOf('/');
  20.         if (off >= 0) {
  21.             String chrootPath = hosts.substring(off);
  22.             // ignore "/" chroot spec, same as null
  23.             if (chrootPath.length() == 1) {
  24.                 this.chrootPath = null;
  25.             } else {
  26.                 PathUtils.validatePath(chrootPath);
  27.                 this.chrootPath = chrootPath;
  28.             }
  29.             hosts = hosts.substring(0,  off);
  30.         } else {
  31.             this.chrootPath = null;
  32.         }
  33.         String hostsList[] = hosts.split(",");
  34.         for (String host : hostsList) {
  35.             int port = 2181;
  36.             String parts[] = host.split(":");
  37.             if (parts.length > 1) {
  38.                 port = Integer.parseInt(parts[1]);
  39.                 host = parts[0];
  40.             }
  41.             InetAddress addrs[] = InetAddress.getAllByName(host);
  42.             for (InetAddress addr : addrs) {
  43.                 serverAddrs.add(new InetSocketAddress(addr, port));
  44.             }
  45.         }
  46.         this.sessionTimeout = sessionTimeout;
  47.         connectTimeout = sessionTimeout / hostsList.length;
  48.         readTimeout = sessionTimeout * 2 / 3;
  49.         Collections.shuffle(serverAddrs);
  50.         sendThread = new SendThread();
  51.         eventThread = new EventThread();
  52.     }

mian方法流程图如下所示:

 

第三步第四步执行如下: 

  1. @SuppressWarnings("unchecked")
  2. void run() throws KeeperException, IOException, InterruptedException {
  3. if (cl.getCommand() == null) {
  4. System.out.println("Welcome to ZooKeeper!");
  5. boolean jlinemissing = false;
  6. // only use jline if it's in the classpath
  7. try {
  8.                 Class consoleC = Class.forName("jline.ConsoleReader");
  9.                 Class completorC =Class.forName("org.apache.zookeeper.JLineZNodeCompletor");
  10.                 System.out.println("JLine support is enabled");
  11.                 Object console = consoleC.getConstructor().newInstance();
  12.                 Object completor =completorC.getConstructor(ZooKeeper.class).newInstance(zk);
  13.                 Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor"));
  14.                 addCompletor.invoke(console, completor);
  15.                 String line;
  16.                 Method readLine = consoleC.getMethod("readLine", String.class);
  17.    // 循环读取命令并执行
  18.                 while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
  19.                     executeLine(line);
  20.                 }
  21.             }
  22.     }
  23.     public void executeLine(String line)throws InterruptedException, IOException, KeeperException {
  24.       if (!line.equals("")) {
  25.         cl.parseCommand(line);
  26.         addToHistory(commandCount,line);
  27.         processCmd(cl); // 第5步:执行命令
  28.         commandCount++;
  29.       }
  30.     }

第6步:处理命令

  1. protected boolean processZKCmd(MyCommandOptions co) throws KeeperException
  2.   IOException, InterruptedException{
  3.         Stat stat = new Stat();
  4.         String[] args = co.getArgArray();
  5.         String cmd = co.getCommand();
  6.         if (args.length < 1) {
  7.             usage();
  8.             return false;
  9.         }
  10.         if (!commandMap.containsKey(cmd)) {
  11.             usage();
  12.             return false;
  13.         }
  14.         boolean watch = args.length > 2;
  15.         String path = null;
  16.         List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
  17.         LOG.debug("Processing " + cmd);
  18.         if (cmd.equals("quit")) {
  19.             System.out.println("Quitting...");
  20.             zk.close();
  21.             System.exit(0);
  22.         } else if (cmd.equals("history")) {
  23.          .........
  24.          // 创建节点命令
  25.         if (cmd.equals("create") && args.length >= 3) {
  26.             int first = 0;
  27.             CreateMode flags = CreateMode.PERSISTENT; // 持久节点
  28.             if ((args[1].equals("-e") && args[2].equals("-s"))
  29.                     || (args[1]).equals("-s") && (args[2].equals("-e"))) {
  30.                 first+=2;
  31.                 flags = CreateMode.EPHEMERAL_SEQUENTIAL; 临时顺序节点
  32.             } else if (args[1].equals("-e")) {
  33.                 first++;
  34.                 flags = CreateMode.EPHEMERAL; // 临时节点
  35.             } else if (args[1].equals("-s")) {
  36.                 first++;
  37.                 flags = CreateMode.PERSISTENT_SEQUENTIAL; // 持久顺序节点
  38.             }
  39.             if (args.length == first + 4) {
  40.                 acl = parseACLs(args[first+3]);
  41.             }
  42.             path = args[first + 1];
  43. // 第7步:解析create命令
  44.             String newPath = zk.create(path, args[first+2].getBytes(), acl,flags);
  45.             System.err.println("Created " + newPath);
  46.         } else if (cmd.equals("delete") && args.length >= 2) {
  47.             path = args[1];
  48.             zk.delete(path, watch ? Integer.parseInt(args[2]) : -1);
  49.         }

// 第8,9步:解析create命令参数


                
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/302370
推荐阅读
相关标签
  

闽ICP备14008679号