赞
踩
zk-client EventThread 通过eventOfDeath 停止
class EventThread extends ZooKeeperThread { private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); /** This is really the queued session state until the event * thread actually processes the event and hands it to the watcher. * But for all intents and purposes this is the state. */ private volatile KeeperState sessionState = KeeperState.Disconnected; private volatile boolean wasKilled = false; private volatile boolean isRunning = false; EventThread() { super(makeThreadName("-EventThread")); setDaemon(true); } public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); } public void queuePacket(Packet packet) { if (wasKilled) { synchronized (waitingEvents) { if (isRunning) waitingEvents.add(packet); else processEvent(packet); } } else { waitingEvents.add(packet); } } public void queueEventOfDeath() { waitingEvents.add(eventOfDeath); } @Override public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); } ```
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。