赞
踩
先看一下客户端源码的流程图
总体流程
总体流程
开启SendThread线程
开启EventThread
总结
下面根据源码讲解,大家整合源码和流程图一起看最好,本篇内容比较多建议收藏起来看。
从ZkCli.sh脚本中可以看到zk源码客户端入口类为ZooKeeperMain
public static void main(String args[]) throws KeeperException, IOException, InterruptedException { // 初始化 ZooKeeperMain main = new ZooKeeperMain(args); // 读命令执行命令,也就是我们的创建节点更新节点等等命令的读取执行 main.run(); }
跟踪 ZooKeeperMain main = new ZooKeeperMain(args);
public ZooKeeperMain(String args[]) throws IOException, InterruptedException { //向private Map options = new HashMap(); 设置参数 cl.parseOptions(args); System.out.println("Connecting to " + cl.getOption("server")); // 连接zk connectToZK(cl.getOption("server")); //zk = new ZooKeeper(cl.getOption("server"),// Integer.parseInt(cl.getOption("timeout")), new MyWatcher()); }
protected void connectToZK(String newHost) throws InterruptedException, IOException { // zk已连接,先close if (zk != null && zk.getState().isAlive()) { zk.close(); } host = newHost; boolean readOnly = cl.getOption("readonly") != null; //new 出 ZooKeeper zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly); }
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); // 创建客户端上下文 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); // 开启上下文设置的线程 /**public void start() { ** sendThread.start(); ** eventThread.start(); ** } **/ cnxn.start(); }
SendThread(ClientCnxnSocket clientCnxnSocket) { super(makeThreadName("-SendThread()")); // 连接状态 CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,CLOSED, AUTH_FAILED, NOT_CONNECTED state = States.CONNECTING; this.clientCnxnSocket = clientCnxnSocket; setDaemon(true);// 守护线程 }
EventThread() { super(makeThreadName("-EventThread")); setDaemon(true); // 守护线程 }
void run() throws KeeperException, IOException, InterruptedException { if (cl.getCommand() == null) { System.out.println("Welcome to ZooKeeper!"); boolean jlinemissing = false; // only use jline if it's in the classpath try { Class> consoleC = Class.forName("jline.ConsoleReader"); Class> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompletor"); System.out.println("JLine support is enabled"); Object console = consoleC.getConstructor().newInstance(); Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk); Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor")); addCompletor.invoke(console, completor); String line; Method readLine = consoleC.getMethod("readLine", String.class); while ((line = (String)readLine.invoke(console, getPrompt())) != null) { // 执行 executeLine(line); } } catch (ClassNotFoundException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (NoSuchMethodException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InvocationTargetException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (IllegalAccessException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InstantiationException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } if (jlinemissing) { System.out.println("JLine support is disabled"); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String line; while ((line = br.readLine()) != null) { executeLine(line); } } } else { // Command line args non-null. Run what was passed. processCmd(cl); } }
public void executeLine(String line) throws InterruptedException, IOException, KeeperException { if (!line.equals("")) { cl.parseCommand(line); // 添加历史命令 addToHistory(commandCount,line); // 执行命令核心方法 processCmd(cl); // 命令计数 commandCount++; } }
protected boolean processCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { try { // 执行zk命令 return processZKCmd(co); } catch (IllegalArgumentException e) { System.err.println("Command failed: " + e); } catch (KeeperException.NoNodeException e) { System.err.println("Node does not exist: " + e.getPath()); } catch (KeeperException.NoChildrenForEphemeralsException e) { System.err.println("Ephemerals cannot have children: " + e.getPath()); } catch (KeeperException.NodeExistsException e) { System.err.println("Node already exists: " + e.getPath()); } catch (KeeperException.NotEmptyException e) { System.err.println("Node not empty: " + e.getPath()); } catch (KeeperException.NotReadOnlyException e) { System.err.println("Not a read-only call: " + e.getPath()); }catch (KeeperException.InvalidACLException e) { System.err.println("Acl is not valid : "+e.getPath()); }catch (KeeperException.NoAuthException e) { System.err.println("Authentication is not valid : "+e.getPath()); }catch (KeeperException.BadArgumentsException e) { System.err.println("Arguments are not valid : "+e.getPath()); }catch (KeeperException.BadVersionException e) { System.err.println("version No is not valid : "+e.getPath()); } return false; }
/** 根据字符串判断属于什么命令然后发送请求,最终会调用到zookeeper类中的对应方法 **/protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { Stat stat = new Stat(); String[] args = co.getArgArray(); String cmd = co.getCommand(); if (args.length < 1) { usage(); return false; } if (!commandMap.containsKey(cmd)) { usage(); return false; } boolean watch = args.length > 2; String path = null; List acl = Ids.OPEN_ACL_UNSAFE; LOG.debug("Processing " + cmd); if (cmd.equals("quit")) { System.out.println("Quitting..."); zk.close(); System.exit(0); } else if (cmd.equals("redo") && args.length >= 2) { Integer i = Integer.decode(args[1]); if (commandCount <= i || i < 0){ // don't allow redoing this redo System.out.println("Command index out of range"); return false; } cl.parseCommand(history.get(i)); if (cl.getCommand().equals( "redo" )){ System.out.println("No redoing redos"); return false; } history.put(commandCount, history.get(i)); processCmd( cl); } else if (cmd.equals("history")) { for (int i=commandCount - 10;i<=commandCount;++i) { if (i < 0) continue; System.out.println(i + " - " + history.get(i)); } } else if (cmd.equals("printwatches")) { if (args.length == 1) { System.out.println("printwatches is " + (printWatches ? "on" : "off")); } else { printWatches = args[1].equals("on"); } } else if (cmd.equals("connect")) { if (args.length >=2) { connectToZK(args[1]); } else { connectToZK(host); } } // Below commands all need a live connection if (zk == null || !zk.getState().isAlive()) { System.out.println("Not connected"); return false; } if (cmd.equals("create") && args.length >= 3) { int first = 0; CreateMode flags = CreateMode.PERSISTENT; if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) { first+=2; flags = CreateMode.EPHEMERAL_SEQUENTIAL; } else if (args[1].equals("-e")) { first++; flags = CreateMode.EPHEMERAL; } else if (args[1].equals("-s")) { first++; flags = CreateMode.PERSISTENT_SEQUENTIAL; } if (args.length == first + 4) { acl = parseACLs(args[first+3]); } path = args[first + 1]; String newPath = zk.create(path, args[first+2].getBytes(), acl, flags); System.err.println("Created " + newPath); } else if (cmd.equals("delete") && args.length >= 2) { path = args[1]; zk.delete(path, watch ? Integer.parseInt(args[2]) : -1); } else if (cmd.equals("rmr") && args.length >= 2) { path = args[1]; ZKUtil.deleteRecursive(zk, path); } else if (cmd.equals("set") && args.length >= 3) { path = args[1]; stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1); printStat(stat); } else if (cmd.equals("aget") && args.length >= 2) { path = args[1]; zk.getData(path, watch, dataCallback, path); } else if (cmd.equals("get") && args.length >= 2) { path = args[1]; byte data[] = zk.getData(path, watch, stat); data = (data == null)? "null".getBytes() : data; System.out.println(new String(data)); printStat(stat); } else if (cmd.equals("ls") && args.length >= 2) { path = args[1]; List children = zk.getChildren(path, watch); System.out.println(children); } else if (cmd.equals("ls2") && args.length >= 2) { path = args[1]; List children = zk.getChildren(path, watch, stat); System.out.println(children); printStat(stat); } else if (cmd.equals("getAcl") && args.length >= 2) { path = args[1]; acl = zk.getACL(path, stat); for (ACL a : acl) { System.out.println(a.getId() + ": " + getPermString(a.getPerms())); } } else if (cmd.equals("setAcl") && args.length >= 3) { path = args[1]; stat = zk.setACL(path, parseACLs(args[2]), args.length > 4 ? Integer.parseInt(args[3]) : -1); printStat(stat); } else if (cmd.equals("stat") && args.length >= 2) { path = args[1]; stat = zk.exists(path, watch); if (stat == null) { throw new KeeperException.NoNodeException(path); } printStat(stat); } else if (cmd.equals("listquota") && args.length >= 2) { path = args[1]; String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode; byte[] data = null; try { System.err.println("absolute path is " + absolutePath); data = zk.getData(absolutePath, false, stat); StatsTrack st = new StatsTrack(new String(data)); System.out.println("Output quota for " + path + " " + st.toString()); data = zk.getData(Quotas.quotaZookeeper + path + "/" + Quotas.statNode, false, stat); System.out.println("Output stat for " + path + " " + new StatsTrack(new String(data)).toString()); } catch(KeeperException.NoNodeException ne) { System.err.println("quota for " + path + " does not exist."); } } else if (cmd.equals("setquota") && args.length >= 4) { String option = args[1]; String val = args[2]; path = args[3]; System.err.println("Comment: the parts are " + "option " + option + " val " + val + " path " + path); if ("-b".equals(option)) { // we are setting the bytes quota createQuota(zk, path, Long.parseLong(val), -1); } else if ("-n".equals(option)) { // we are setting the num quota createQuota(zk, path, -1L, Integer.parseInt(val)); } else { usage(); } } else if (cmd.equals("delquota") && args.length >= 2) { //if neither option -n or -b is specified, we delete // the quota node for thsi node. if (args.length == 3) { //this time we have an option String option = args[1]; path = args[2]; if ("-b".equals(option)) { delQuota(zk, path, true, false); } else if ("-n".equals(option)) { delQuota(zk, path, false, true); } } else if (args.length == 2) { path = args[1]; // we dont have an option specified. // just delete whole quota node delQuota(zk, path, true, true); } else if (cmd.equals("help")) { usage(); } } else if (cmd.equals("close")) { zk.close(); } else if (cmd.equals("sync") && args.length >= 2) { path = args[1]; zk.sync(path, new AsyncCallback.VoidCallback() { public void processResult(int rc, String path, Object ctx) { System.out.println("Sync returned " + rc); } }, null ); } else if (cmd.equals("addauth") && args.length >=2 ) { byte[] b = null; if (args.length >= 3) b = args[2].getBytes(); zk.addAuthInfo(args[1], b); } else if (!commandMap.containsKey(cmd)) { usage(); } return watch; }
Zookeeper类中的方法
public String create(final String path, byte data[], List acl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); // 请求参数 CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } // 设置acl权限 request.setAcl(acl); // 提交请求 ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }
public ReplyHeader submitRequest(RequestHeader h, Record request,Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // 封装成packet,任务放到outgoingQueue队列里面 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); // 同步等待结果 synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
之前提到上下文会创建sendThread与evenThread两个线程,接下来再来看一下这两个线程都做了什么,既然是线程那么就需要看run方法。
@Override public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; // 客户端alive时 while (state.isAlive()) { try { // 未连接 if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } // 开始创建连接 startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); }// 已连接 if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); }// 传输方法 pendingQueue 等待结果的队列 outgoingQueue 需要发送的队列 clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else if (e instanceof SocketException) { LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); } else { LOG.warn("Session 0x{} for server {}, unexpected error{}", Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e); } cleanup(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } } } cleanup(); clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }
开始连接startConnect()方法
private void startConnect(InetSocketAddress addr) throws IOException { // initializing it for new connection saslLoginFailed = false; // 设置state为CONNECTING state = States.CONNECTING; setName(getName().replaceAll("(.*)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr);// 连接 clientCnxnSocket.connect(addr); }
@Override void connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { // 注册连接 registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; }
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); // 创建socket连接 boolean immediateConnect = sock.connect(addr); if (immediateConnect) { sendThread.primeConnection(); } }
// 次方法最终也会封装成package放到outgoingQueue队列中void primeConnection() throws IOException { LOG.info("Socket connection established to " + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session"); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); synchronized (outgoingQueue) { // We add backwards since we are pushing into the front // Only send if there's a pending watch // TODO: here we have the only remaining use of zooKeeper in // this class. It's to be eliminated! if (!disableAutoWatchReset) { List dataWatches = zooKeeper.getDataWatches(); List existWatches = zooKeeper.getExistWatches(); List childWatches = zooKeeper.getChildWatches(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) { Iterator dataWatchesIter = prependChroot(dataWatches).iterator(); Iterator existWatchesIter = prependChroot(existWatches).iterator(); Iterator childWatchesIter = prependChroot(childWatches).iterator(); long setWatchesLastZxid = lastZxid; while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()) { List dataWatchesBatch = new ArrayList(); List existWatchesBatch = new ArrayList(); List childWatchesBatch = new ArrayList(); int batchLength = 0; // Note, we may exceed our max length by a bit when we add the last // watch in the batch. This isn't ideal, but it makes the code simpler. while (batchLength < SET_WATCHES_MAX_LENGTH) { final String watch; if (dataWatchesIter.hasNext()) { watch = dataWatchesIter.next(); dataWatchesBatch.add(watch); } else if (existWatchesIter.hasNext()) { watch = existWatchesIter.next(); existWatchesBatch.add(watch); } else if (childWatchesIter.hasNext()) { watch = childWatchesIter.next(); childWatchesBatch.add(watch); } else { break; } batchLength += watch.length(); } SetWatches sw = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setWatches); h.setXid(-8); // 封装为package Packet packet = new Packet(h, new ReplyHeader(), sw, null, null); // 添加到队列中 outgoingQueue.addFirst(packet); } } } for (AuthData id : authInfo) { outgoingQueue.addFirst(new Packet(new RequestHeader(-4, OpCode.auth), null, new AuthPacket(0, id.scheme, id.data), null, null)); } outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); } clientCnxnSocket.enableReadWriteOnly(); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request sent on " + clientCnxnSocket.getRemoteSocketAddress()); } }
返回到run方法查看调用的doTransport传输方法
@Override void doTransport(int waitTimeOut, List pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, outgoingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { synchronized(outgoingQueue) { if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { enableWrite(); } } } selected.clear(); }
void doIO(List pendingQueue, LinkedList outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } // 处理读请求 if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } // 处理写请求 if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } } } }
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); }// 根据是否有回调方法,判断是同步请求还是异步请求 if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; // 异步请求放到waitingEvents 队列中,现在就要用到eventThread eventThread.queuePacket(p); } }
下面跟踪一下eventThread线程,当然也是先看run方法:
@Override public void run() { try { isRunning = true; while (true) { // take 方法取数据 Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { // event处理 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); }
private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else { Packet p = (Packet) event; int rc = 0; String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.cb == null) { LOG.warn("Somehow a null cb got to EventThread!"); } else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { StatCallback cb = (StatCallback) p.cb; if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetDataResponse) { DataCallback cb = (DataCallback) p.cb; GetDataResponse rsp = (GetDataResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getData(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetACLResponse) { ACLCallback cb = (ACLCallback) p.cb; GetACLResponse rsp = (GetACLResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getAcl(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetChildrenResponse) { ChildrenCallback cb = (ChildrenCallback) p.cb; GetChildrenResponse rsp = (GetChildrenResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren()); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetChildren2Response) { Children2Callback cb = (Children2Callback) p.cb; GetChildren2Response rsp = (GetChildren2Response) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof CreateResponse) { StringCallback cb = (StringCallback) p.cb; CreateResponse rsp = (CreateResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath() .substring(chrootPath.length()))); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof MultiResponse) { MultiCallback cb = (MultiCallback) p.cb; MultiResponse rsp = (MultiResponse) p.response; if (rc == 0) { List results = rsp.getResultList(); int newRc = rc; for (OpResult result : results) { if (result instanceof ErrorResult && KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result).getErr())) { break; } } cb.processResult(newRc, clientPath, p.ctx, results); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.cb instanceof VoidCallback) { VoidCallback cb = (VoidCallback) p.cb; cb.processResult(rc, clientPath, p.ctx); } } } catch (Throwable t) { LOG.error("Caught unexpected throwable", t); } } }
整片内容比较多建议收藏起来看,源码方面的知识也比较枯燥一些。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。