赞
踩
ZooKeeper 的监听器原理如下:
exists 方法监听节点创建、删除、数据内容变化。
getData 方法监听节点删除、数据内容变化。
getChildren 方法监听节点的删除、子节点的创建或者删除。
这三个方法注册的监听器具有如下三个特点:
从 3.6.0 版本开始,增加了 addWatch 方法,支持注册持久监听器(监听节点的创建、删除、数据内容变化)、持久递归监听器(监听节点/子节点的创建、删除、数据内容变化)。
源码分析基于 ZooKeeper 3.8.0 版本。
概述:客户端调用 exists、getData、getChildren 方法,传递节点路径和监听器,然后组装成一个 Packet 对象发送给服务端。
ZooKeeper
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException { final String clientPath = path; // 校验路径的合法性 PathUtils.validatePath(clientPath); WatchRegistration wcb = null; if (watcher != null) { // 封装监听器、节点路径到一个WatchRegistration实例 wcb = new ExistsWatchRegistration(watcher, clientPath); } // 如果指定了chrootPath,则在路径的前面拼装chrootPath final String serverPath = prependChroot(clientPath); // 构建请求头 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.exists); // 构建请求体 ExistsRequest request = new ExistsRequest(); request.setPath(serverPath); // 标识是否有监听器 request.setWatch(watcher != null); SetDataResponse response = new SetDataResponse(); // 提交请求 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { if (r.getErr() == KeeperException.Code.NONODE.intValue()) { return null; } throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } return response.getStat().getCzxid() == -1 ? null : response.getStat(); }
ClientCnxn
public ReplyHeader submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { // 传递的WatchDeregistration参数为空 return submitRequest(h, request, response, watchRegistration, null); } public ReplyHeader submitRequest( RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // 构造Packet实例 Packet packet = queuePacket( h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration); synchronized (packet) { if (requestTimeout > 0) { waitForPacketFinish(r, packet); } else { while (!packet.finished) { packet.wait(); } } } if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) { sendThread.cleanAndNotifyState(); } return r; }
public Packet queuePacket( RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) { Packet packet = null; // 构造Packet实例 packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; packet.watchDeregistration = watchDeregistration; synchronized (state) { if (!state.isAlive() || closing) { conLossPacket(packet); } else { if (h.getType() == OpCode.closeSession) { closing = true; } // 将Packet实例加入到outgoingQueue阻塞队列中 outgoingQueue.add(packet); } } // 打开ClientCnxnSocketNIO的Selector sendThread.getClientCnxnSocket().packetAdded(); return packet; }
SendThread
ZooKeeper 的构造器方法中会启动 SendThread 线程,接下来看下 SendThread 的 run 方法。
@Override public void run() { clientCnxnSocket.introduce(this, sessionId, outgoingQueue); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; InetSocketAddress serverAddress = null; while (state.isAlive()) { try { if (!clientCnxnSocket.isConnected()) { if (closing) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } onConnecting(serverAddress); startConnect(serverAddress); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } if (state.isConnected()) { if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed.", e); changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { changeZkState(States.AUTH_FAILED); sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent) { eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null)); if (state == States.AUTH_FAILED) { eventThread.queueEventOfDeath(); } } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo = String.format( "Client session timed out, have not heard from server in %dms for session id 0x%s", clientCnxnSocket.getIdleRecv(), Long.toHexString(sessionId)); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } // 将数据传输到服务端 clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { LOG.warn( "An exception was thrown while closing send thread for session 0x{}.", Long.toHexString(getSessionId()), e); break; } else { LOG.warn( "Session 0x{} for server {}, Closing socket connection. " + "Attempting reconnect except it is a SessionExpiredException.", Long.toHexString(getSessionId()), serverAddress, e); cleanAndNotifyState(); } } } synchronized (state) { cleanup(); } clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null)); if (zooKeeperSaslClient != null) { zooKeeperSaslClient.shutdown(); } ZooTrace.logTraceMessage( LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }
ClientCnxnSocketNIO
@Override void doTransport( int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<SelectionKey> selected; synchronized (this) { selected = selector.selectedKeys(); } updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); updateSocketAddresses(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // IO交互 doIO(pendingQueue, cnxn); } } // 如果服务器状态是已连接状态 if (sendThread.getZkState().isConnected()) { // 如果可以从outgoingQueue阻塞队列中取出Packet实例 if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // 将通道设置为可写状态 enableWrite(); } } selected.clear(); }
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } // 如果通道是可写状态 if (sockKey.isWritable()) { // 从outgoingQueue阻塞队列中取出Packet实例 Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); // 如果该Packet实例非空 if (p != null) { updateLastSend(); if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } // 将Packet实例的ByteBuffer写到通道中 sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount.getAndIncrement(); // 从outgoingQueue阻塞队列中移除该Packet实例 outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { // 将该Packet实例加入到pendingQueue队列中 pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { disableWrite(); } else { // 将通道设置为可写状态 enableWrite(); } } }
概述:服务端根据客户端的请求判断是否需要注册监听器,需要的话将节点路径和 ServerCnxn(表示客户端与服务端的连接)存储到服务端本地的 WatchManager 中,然后向客户端发送响应。
NIOServerCnxnFactory
QuorumPeerMain 在执行 main 方法时,会初始化一个具体的 ServerCnxnFactory 实例,默认是 NIOServerCnxnFactory 类型。
@Override public void start() { stopped = false; if (workerPool == null) { // 实例化WorkerService workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { // 启动SelectorThread线程 thread.start(); } } if (acceptThread.getState() == Thread.State.NEW) { // 启动AcceptThread线程 acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }
AcceptThread
public void run
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。