赞
踩
在 1.1.2.1.2 ClientCnxn.start 里创建了两个线程,这两个线程是干什么用的呢?
改变状态为 connecting
其父类继承自 Thread 类,这里不贴图了,重点看一下 SendThread 里的 run 方法:
@Override public void run() { //初始化clientCnxnSocket,sessionId用于 Log and Exception messages clientCnxnSocket.introduce(this,sessionId); //初始化当前时间 now = Time.currentElapsedTime() = System.nanoTime() / 1000000; clientCnxnSocket.updateNow(); //初始化最后发信时间和最后收信时间this.lastSend = now;this.lastHeard = now; clientCnxnSocket.updateLastSendAndHeard(); int to; //最后ping读写服务器时间 long lastPingRwServer = Time.currentElapsedTime(); //最大发ping信号间隔 final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; //如果状态是活的:this != CLOSED && this != AUTH_FAILED; while (state.isAlive()) { try { //如果有Channel注册到Selector了(有SelectionKey, //如果这些不太懂,需要了解 NIO,可以看看我这方面的博客),isConnected返回true //这里 if 里的意思是:没有连接(其实就是表面意思=。=) if (!clientCnxnSocket.isConnected()) { //这个isFirstConnect默认值是true //这里 if 里的意思是:不是第一次连接。 //证明之前是连接过的,走到这里说明是重连 if(!isFirstConnect){ try { //睡 1 s,为什么?重连间隔 Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing //如果正在关闭或者状态是关闭的或认证失败的 if (closing || !state.isAlive()) { break; } //如果读写服务器地址不为空,用读写服务器地址 if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { //挨个访问服务器地址列表里的地址,间隔 1 s serverAddress = hostProvider.next(1000); } //开始连接 startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else if (e instanceof SocketException) { LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); } else { LOG.warn("Session 0x{} for server {}, unexpected error{}", Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e); } cleanup(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } } } cleanup(); clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }
private void startConnect(InetSocketAddress addr) throws IOException { // initializing it for new connection saslLoginFailed = false; state = States.CONNECTING; setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr); //在这里连接 clientCnxnSocket.connect(addr); }
void connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; }
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
//注册连接时间
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
//阻塞,等待连接
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
在这里插入图片描述
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。