赞
踩
再上一遍文章的最后提到了两个问题。
1.eventThread怎么处理事件的。
2.eventThread怎么实现异步回调。
下面一块分析一下两个问题,以及watch机制及异步回调的原理。
主要分为以下三部分内容:
在readResponse
中,当ReplyHeader
的xid
是NOTIFICATION_XID = -1
时,代表客户端接收到的是来自服务端的一个事件通知。
LOG.debug("Got notification session id: 0x{}", Long.toHexString(sessionId)); // 反序列化watch事件 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if (serverPath.compareTo(chrootPath) == 0) { event.setPath("/"); } else if (serverPath.length() > chrootPath.length()) { event.setPath(serverPath.substring(chrootPath.length())); } else { LOG.warn("Got server path {} which is too short for chroot path {}.", event.getPath(), chrootPath); } } // 封装事件对象 WatchedEvent we = new WatchedEvent(event); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); // 将事件加入到事件队列。 eventThread.queueEvent(we);
EventThread用来处理客户端事件的线程。
// 存储事件的阻塞队列 private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); public void queueEvent(WatchedEvent event) { queueEvent(event, null); } private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); final Set<Watcher> watchers; if (materializedWatchers == null) { // materialize the watchers based on the event //寻找应该被通知的观察者 watchers = watcher.materialize(event.getState(), event.getType(), event.getPath()); } else { watchers = new HashSet<Watcher>(); watchers.addAll(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); } private static class WatcherSetEventPair { private final Set<Watcher> watchers; private final WatchedEvent event; public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) { this.watchers = watchers; this.event = event; } }
public Set<Watcher> materialize( Watcher.Event.KeeperState state,Watcher.Event.EventType type,String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { case None: result.add(defaultWatcher); /** The client is in the connected state - it is connected * to a server in the ensemble (one of the servers specified * in the host connection parameter during ZooKeeper client * creation). */ // 如果是state是不是已经连接的状态,则触发临时和永久事件,并清除临时事件。 boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected; synchronized (dataWatches) { for (Set<Watcher> ws : dataWatches.values()) { result.addAll(ws); } dataWatches.clear(); } } synchronized (existWatches) { for (Set<Watcher> ws : existWatches.values()) { result.addAll(ws); } if (clear) { existWatches.clear(); } } synchronized (childWatches) { for (Set<Watcher> ws : childWatches.values()) { result.addAll(ws); } if (clear) { childWatches.clear(); } } synchronized (persistentWatches) { for (Set<Watcher> ws: persistentWatches.values()) { result.addAll(ws); } } synchronized (persistentRecursiveWatches) { for (Set<Watcher> ws: persistentRecursiveWatches.values()) { result.addAll(ws); } } return result; // 根据不同的时间,触发不同的观察者。并remove // private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); case NodeDataChanged: case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } addPersistentWatches(clientPath, result); break; case NodeChildrenChanged: synchronized (childWatches) { addTo(childWatches.remove(clientPath), result); } addPersistentWatches(clientPath, result); break; case NodeDeleted: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } // TODO This shouldn't be needed, but just in case synchronized (existWatches) { Set<Watcher> list = existWatches.remove(clientPath); if (list != null) { addTo(list, result); LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!"); } } synchronized (childWatches) { addTo(childWatches.remove(clientPath), result); } addPersistentWatches(clientPath, result); break; default: String errorMsg = String.format( "Unhandled watch event type %s with state %s on path %s", type, state, clientPath); LOG.error(errorMsg); throw new RuntimeException(errorMsg); } return result; }
至此,已经将要触发的观察者和事件的Pair(WatcherSetEventPair
)加入到waitingEvents
中。下面看一下具体被触发的过程。
public void run() { try { isRunning = true; while (true) { // 不断的在持剑队列中获取要处理的事件Pair Object event = waitingEvents.take(); // 在授权失败或者链接断开后,会在waitingEvents放入eventOfDeath对象 if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } // 确保在退出前waitingEvents里的事件都已经被处理。 if (wasKilled)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。