赞
踩
ClientCnxn类的readResponse()方法
- //客户端读取返回值
- void readResponse(ByteBuffer incomingBuffer) throws IOException {
- ByteBufferInputStream bbis = new ByteBufferInputStream(
- incomingBuffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ReplyHeader replyHdr = new ReplyHeader();
-
- replyHdr.deserialize(bbia, "header");
- if (replyHdr.getXid() == -2) {
- // -2 is the xid for pings
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got ping response for sessionid: 0x"
- + Long.toHexString(sessionId)
- + " after "
- + ((System.nanoTime() - lastPingSentNs) / 1000000)
- + "ms");
- }
- return;
- }
- if (replyHdr.getXid() == -4) {
- // -4 is the xid for AuthPacket
- if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
- state = States.AUTH_FAILED;
- eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.AuthFailed, null) );
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got auth sessionid:0x"
- + Long.toHexString(sessionId));
- }
- return;
- }
- //-1代表是通知
- if (replyHdr.getXid() == -1) {
- // -1 means notification
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got notification sessionid:0x"
- + Long.toHexString(sessionId));
- }
- //从服务器返回读取事件数据
- WatcherEvent event = new WatcherEvent();
- event.deserialize(bbia, "response");
- //转换server到client
- // convert from a server path to a client path
- if (chrootPath != null) {
- String serverPath = event.getPath();
- if(serverPath.compareTo(chrootPath)==0)
- event.setPath("/");
- else if (serverPath.length() > chrootPath.length())
- event.setPath(serverPath.substring(chrootPath.length()));
- else {
- LOG.warn("Got server path " + event.getPath()
- + " which is too short for chroot path "
- + chrootPath);
- }
- }
- //触发watch事件,并将事件放进队列
- WatchedEvent we = new WatchedEvent(event);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got " + we + " for sessionid 0x"
- + Long.toHexString(sessionId));
- }
-
- eventThread.queueEvent( we );
- return;
- }
-
- // If SASL authentication is currently in progress, construct and
- // send a response packet immediately, rather than queuing a
- // response as with other packets.
- if (clientTunneledAuthenticationInProgress()) {
- GetSASLRequest request = new GetSASLRequest();
- request.deserialize(bbia,"token");
- zooKeeperSaslClient.respondToServer(request.getToken(),
- ClientCnxn.this);
- return;
- }
-
- Packet packet;
- synchronized (pendingQueue) {
- if (pendingQueue.size() == 0) {
- throw new IOException("Nothing in the queue, but got "
- + replyHdr.getXid());
- }
- packet = pendingQueue.remove();
- }
- /*
- * Since requests are processed in order, we better get a response
- * to the first request!
- */
- try {
- if (packet.requestHeader.getXid() != replyHdr.getXid()) {
- packet.replyHeader.setErr(
- KeeperException.Code.CONNECTIONLOSS.intValue());
- throw new IOException("Xid out of order. Got Xid "
- + replyHdr.getXid() + " with err " +
- + replyHdr.getErr() +
- " expected Xid "
- + packet.requestHeader.getXid()
- + " for a packet with details: "
- + packet );
- }
-
- packet.replyHeader.setXid(replyHdr.getXid());
- packet.replyHeader.setErr(replyHdr.getErr());
- packet.replyHeader.setZxid(replyHdr.getZxid());
- if (replyHdr.getZxid() > 0) {
- lastZxid = replyHdr.getZxid();
- }
- if (packet.response != null && replyHdr.getErr() == 0) {
- packet.response.deserialize(bbia, "response");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reading reply sessionid:0x"
- + Long.toHexString(sessionId) + ", packet:: " + packet);
- }
- } finally {
- finishPacket(packet);
- }
- }
queueEvent()方法属于ClientCnxn类的内部类EventThread
- class EventThread extends ZooKeeperThread {
- private final LinkedBlockingQueue<Object> waitingEvents =
- new LinkedBlockingQueue<Object>();
-
- /** This is really the queued session state until the event
- * thread actually processes the event and hands it to the watcher.
- * But for all intents and purposes this is the state.
- */
- private volatile KeeperState sessionState = KeeperState.Disconnected;
-
- private volatile boolean wasKilled = false;
- private volatile boolean isRunning = false;
-
- EventThread() {
- super(makeThreadName("-EventThread"));
- setDaemon(true);
- }
-
-
-
- private final LinkedBlockingQueue<Object> waitingEvents =
- new LinkedBlockingQueue<Object>();
- //初始是关闭
- private volatile KeeperState sessionState = KeeperState.Disconnected;
-
- public void queueEvent(WatchedEvent event) {
- //如果KeeperState状态关闭,并且Event状态是-1,则直接退出
- if (event.getType() == EventType.None
- && sessionState == event.getState()) {
- return;
- }
- sessionState = event.getState();
- //这是Event参数
- // materialize the watchers based on the event
- WatcherSetEventPair pair = new WatcherSetEventPair(
- //触发materialize方法,返回对应的目录文件数据
- watcher.materialize(event.getState(), event.getType(),
- event.getPath()),
- event);
- // queue the pair (watch set & event) for later processing
- //放进waiting事件
- waitingEvents.add(pair);
- }
materialize方法
- @Override
- public Set<Watcher> materialize(Watcher.Event.KeeperState state,
- Watcher.Event.EventType type,
- String clientPath)
- {
- Set<Watcher> result = new HashSet<Watcher>();
-
- switch (type) {
- //当事件类型为None时,也就是客户端和服务端的租约到期,或者认证失败或者断开连接或者建立连接时(建立连接时不会清除watcher,客户端收到反馈后会返回所有事件并触发,当事件类型是节点数据变化或者节点创建时则触发dataWatches和existWatches,当事件类型是子节点变化时childWatches被触发,当事件类型是删除节点时三种watcher都会触发。
- case None:
- result.add(defaultWatcher);
- //满足两种情况则清空dataWatches,existWatches,childWatches
- //1.状态不是SyncConnected(客户端与服务器端已建立链接)
- //2.设置了System property:zookeepr.disableAutoWatchReset 为true
- boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
- state != Watcher.Event.KeeperState.SyncConnected;
- synchronized(dataWatches) {
- for(Set<Watcher> ws: dataWatches.values()) {
- result.addAll(ws);
- }
- if (clear) {
- dataWatches.clear();
- }
- }
-
- synchronized(existWatches) {
- for(Set<Watcher> ws: existWatches.values()) {
- result.addAll(ws);
- }
- if (clear) {
- existWatches.clear();
- }
- }
-
- synchronized(childWatches) {
- for(Set<Watcher> ws: childWatches.values()) {
- result.addAll(ws);
- }
- if (clear) {
- childWatches.clear();
- }
- }
-
- return result;
- case NodeDataChanged:
- //当为创建时候,根据子类事件类型触发相应事件
- case NodeCreated:
- synchronized (dataWatches) {
- addTo(dataWatches.remove(clientPath), result);
- }
- synchronized (existWatches) {
- addTo(existWatches.remove(clientPath), result);
- }
- break;
- //当为子目录事件时候,根据子类事件类型触发相应事件
- case NodeChildrenChanged:
- synchronized (childWatches) {
- addTo(childWatches.remove(clientPath), result);
- }
- break;
- //当为删除事件时候,根据子类事件类型触发相应事件
- case NodeDeleted:
- synchronized (dataWatches) {
- addTo(dataWatches.remove(clientPath), result);
- }
- // XXX This shouldn't be needed, but just in case
- synchronized (existWatches) {
- Set<Watcher> list = existWatches.remove(clientPath);
- if (list != null) {
- addTo(existWatches.remove(clientPath), result);
- LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
- }
- }
- synchronized (childWatches) {
- addTo(childWatches.remove(clientPath), result);
- }
- break;
- default:
- String msg = "Unhandled watch event type " + type
- + " with state " + state + " on path " + clientPath;
- LOG.error(msg);
- throw new RuntimeException(msg);
- }
-
- return result;
- }
-
-
- //将form添加至to
- final private void addTo(Set<Watcher> from, Set<Watcher> to) {
- if (from != null) {
- to.addAll(from);
- }
- }
EventThread线程的
run方法
- @Override
- public void run() {
- try {
- isRunning = true;
- while (true) {
- //从队列中获取事件数据
- Object event = waitingEvents.take();
- if (event == eventOfDeath) {
- //是否关闭线程
- wasKilled = true;
- } else {
- //处理事件
- processEvent(event);
- }
- if (wasKilled)
- synchronized (waitingEvents) {
- if (waitingEvents.isEmpty()) {
- isRunning = false;
- break;
- }
- }
- }
- } catch (InterruptedException e) {
- LOG.error("Event thread exiting due to interruption", e);
- }
-
- LOG.info("EventThread shut down for session: 0x{}",
- Long.toHexString(getSessionId()));
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。