赞
踩
EventThread线程不断的从waitingEvents这个队列中取出Object,识别出其具体类型Watcher或者AsyncCallback,并分别调用process和processResult接口方法来实现对事件的触发和回调。
- public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
- public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
-
- public void delete(final String path, int version)
- public void delete(final String path, int version, VoidCallback cb, Object ctx)
-
- public Stat exists(String path, boolean watch)
- public void exists(String path, boolean watch, StatCallback cb, Object ctx)
-
- public List<ACL> getACL(final String path, Stat stat)
- public void getACL(final String path, Stat stat, ACLCallback cb,Object ctx)
-
- public List<String> getChildren(String path, boolean watch)
- public List<String> getChildren(String path, boolean watch, Stat stat)
- public void getChildren(String path, boolean watch, Children2Callback cb,Object ctx)
- public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
-
- public byte[] getData(String path, boolean watch, Stat stat)
- public void getData(String path, boolean watch, DataCallback cb, Object ctx)
-
- public Stat setACL(final String path, List<ACL> acl, int version)
- public void setACL(final String path, List<ACL> acl, int version, StatCallback cb, Object ctx)
-
- public Stat setData(final String path, byte data[], int version)
- public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
EventThread源码分析
- class EventThread extends Thread {
- //队列
- private final LinkedBlockingQueue<Object> waitingEvents =
- new LinkedBlockingQueue<Object>();
-
- //这实际上是队列中的会话状态,直到EventThread事件线程实际处理该事件并将其交给watcher为止。
- private volatile KeeperState sessionState = KeeperState.Disconnected;
-
- private volatile boolean wasKilled = false;
- private volatile boolean isRunning = false;
-
- EventThread() {
- super(makeThreadName("-EventThread"));
- setUncaughtExceptionHandler(uncaughtExceptionHandler);
- setDaemon(true);
- }
- //将WatchedEvent加入到waitingEvents队列中
- //在SendThread线程的readResponse(ByteBuffer incomingBuffer)读取从服务器端的response
- //如果是事件,则解析反序列化还原成WatchedEvent,调用该方法
- public void queueEvent(WatchedEvent event) {
- if (event.getType() == EventType.None
- && sessionState == event.getState()) {
- return;
- }
- sessionState = event.getState();
-
- //根据event获取针对该event监听的watcher集合
- WatcherSetEventPair pair = new WatcherSetEventPair(
- watcher.materialize(event.getState(), event.getType(),event.getPath()),
- event);
- // queue the pair (watch set & event) for later processing
- //将watcher集合和事件组放入到队列中排队,之后进行处理
- waitingEvents.add(pair);
- }
-
- //在SendThread线程的readResponse(ByteBuffer incomingBuffer)读取从服务器端的response
- //如果收到的是正常的响应请求,则从pendingQueue队列中取出一个Packet,校验之后调用该方法
- //放入到waitingEvents队列中
- public void queuePacket(Packet packet) {
- if (wasKilled) {
- synchronized (waitingEvents) {
- if (isRunning) waitingEvents.add(packet);
- else processEvent(packet);
- }
- } else {
- waitingEvents.add(packet);
- }
- }
-
- public void queueEventOfDeath() {
- waitingEvents.add(eventOfDeath);
- }
-
- @Override
- public void run() {
- try {
- isRunning = true;
- //一直循环,从waitingEvents队列中取出WatchedEvent或者Packet
- while (true) {
- Object event = waitingEvents.take();
- if (event == eventOfDeath) {
- wasKilled = true;
- } else {
- //处理event
- processEvent(event);
- }
- if (wasKilled)
- synchronized (waitingEvents) {
- if (waitingEvents.isEmpty()) {
- isRunning = false;
- break;
- }
- }
- }
- } catch (InterruptedException e) {
- LOG.error("Event thread exiting due to interruption", e);
- }
-
- LOG.info("EventThread shut down");
- }
-
- private void processEvent(Object event) {
- try {
- //watcher监听事件
- if (event instanceof WatcherSetEventPair) {
- // each watcher will process the event
- WatcherSetEventPair pair = (WatcherSetEventPair) event;
- //循环watcher集合,调用watcher的回调方法(WatchedEvent event)处理该事件
- for (Watcher watcher : pair.watchers) {
- try {
- //还记得Watcher接口中对于watcher实现类都要实现的一个process(WatchedEvent event)方法,就是这个
- watcher.process(pair.event);
- } catch (Throwable t) {
- LOG.error("Error while calling watcher ", t);
- }
- }
- }
- //异步回调
- else {
- Packet p = (Packet) event;
- //ResultCode服务器端响应码,常见码如下
- //0:接口调用成功 -4客户端与服务器端连接已断开
- //-110指定节点已存在 -112会话已过期
- int rc = 0;
- String clientPath = p.clientPath;
- if (p.replyHeader.getErr() != 0) {
- rc = p.replyHeader.getErr();
- }
- if (p.cb == null) {
- LOG.warn("Somehow a null cb got to EventThread!");
- }
- //如果是exists/setData/setAcl命令异步方式的服务器端响应
- else if (p.response instanceof ExistsResponse
- || p.response instanceof SetDataResponse
- || p.response instanceof SetACLResponse) {
- StatCallback cb = (StatCallback) p.cb;
-
- //rc==0表示接口调用成功
- if (rc == 0) {
- if (p.response instanceof ExistsResponse) {
- cb.processResult(rc, clientPath, p.ctx,
- ((ExistsResponse) p.response)
- .getStat());
- } else if (p.response instanceof SetDataResponse) {
- cb.processResult(rc, clientPath, p.ctx,
- ((SetDataResponse) p.response)
- .getStat());
- } else if (p.response instanceof SetACLResponse) {
- cb.processResult(rc, clientPath, p.ctx,
- ((SetACLResponse) p.response)
- .getStat());
- }
- } else {
- cb.processResult(rc, clientPath, p.ctx, null);
- }
- }
- //getData命令异步方式的服务器异步响应
- else if (p.response instanceof GetDataResponse) {
- DataCallback cb = (DataCallback) p.cb;
- GetDataResponse rsp = (GetDataResponse) p.response;
- if (rc == 0) {
- cb.processResult(rc, clientPath, p.ctx, rsp
- .getData(), rsp.getStat());
- } else {
- cb.processResult(rc, clientPath, p.ctx, null,
- null);
- }
- }
- //getACL命令异步方式的服务器异步响应
- else if (p.response instanceof GetACLResponse) {
- ACLCallback cb = (ACLCallback) p.cb;
- GetACLResponse rsp = (GetACLResponse) p.response;
- if (rc == 0) {
- cb.processResult(rc, clientPath, p.ctx, rsp
- .getAcl(), rsp.getStat());
- } else {
- cb.processResult(rc, clientPath, p.ctx, null,
- null);
- }
- }
- //getChildren命令的服务器响应
- else if (p.response instanceof GetChildrenResponse) {
- ChildrenCallback cb = (ChildrenCallback) p.cb;
- GetChildrenResponse rsp = (GetChildrenResponse) p.response;
- if (rc == 0) {
- cb.processResult(rc, clientPath, p.ctx, rsp
- .getChildren());
- } else {
- cb.processResult(rc, clientPath, p.ctx, null);
- }
- }
- //getChildren命令异步方式的服务器异步响应,与上面的区别在与stat信息更新
- else if (p.response instanceof GetChildren2Response) {
- Children2Callback cb = (Children2Callback) p.cb;
- GetChildren2Response rsp = (GetChildren2Response) p.response;
- if (rc == 0) {
- cb.processResult(rc, clientPath, p.ctx, rsp
- .getChildren(), rsp.getStat());
- } else {
- cb.processResult(rc, clientPath, p.ctx, null, null);
- }
- }
- //create命令异步方式的服务器异步响应
- else if (p.response instanceof CreateResponse) {
- StringCallback cb = (StringCallback) p.cb;
- CreateResponse rsp = (CreateResponse) p.response;
- if (rc == 0) {
- cb.processResult(rc, clientPath, p.ctx,
- (chrootPath == null
- ? rsp.getPath()
- : rsp.getPath()
- .substring(chrootPath.length())));
- } else {
- cb.processResult(rc, clientPath, p.ctx, null);
- }
- }
- else if (p.cb instanceof VoidCallback) {
- VoidCallback cb = (VoidCallback) p.cb;
- cb.processResult(rc, clientPath, p.ctx);
- }
- }
- } catch (Throwable t) {
- LOG.error("Caught unexpected throwable", t);
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。