当前位置:   article > 正文

ZooKeeper源码分析之EventThread线程_zookeeper中eventthread线程 java.lang.thread.state: wa

zookeeper中eventthread线程 java.lang.thread.state: waiting (parking)

        EventThread线程不断的从waitingEvents这个队列中取出Object,识别出其具体类型Watcher或者AsyncCallback,并分别调用process和processResult接口方法来实现对事件的触发和回调。

  • watcher就是数据变更通知
  • AsyncCallback是ZooKeeper客户端的API命令中的异步API,我们首先看下ZooKeeper客户端的API,创建节点、删除节点、检测节点是否存在、权限控制、获取子节点、获取数据内容、设置权限、设置数据内容都有相应的异步方式,有StringCallback、VoidCallback、StatCallback、ACLCallback、ChildrenCallback、Children2Callback、DataCallback七种
  1. public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  2. public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
  3. public void delete(final String path, int version)
  4. public void delete(final String path, int version, VoidCallback cb, Object ctx)
  5. public Stat exists(String path, boolean watch)
  6. public void exists(String path, boolean watch, StatCallback cb, Object ctx)
  7. public List<ACL> getACL(final String path, Stat stat)
  8. public void getACL(final String path, Stat stat, ACLCallback cb,Object ctx)
  9. public List<String> getChildren(String path, boolean watch)
  10. public List<String> getChildren(String path, boolean watch, Stat stat)
  11. public void getChildren(String path, boolean watch, Children2Callback cb,Object ctx)
  12. public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
  13. public byte[] getData(String path, boolean watch, Stat stat)
  14. public void getData(String path, boolean watch, DataCallback cb, Object ctx)
  15. public Stat setACL(final String path, List<ACL> acl, int version)
  16. public void setACL(final String path, List<ACL> acl, int version, StatCallback cb, Object ctx)
  17. public Stat setData(final String path, byte data[], int version)
  18. public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)

EventThread源码分析 

  1. class EventThread extends Thread {
  2. //队列
  3. private final LinkedBlockingQueue<Object> waitingEvents =
  4. new LinkedBlockingQueue<Object>();
  5. //这实际上是队列中的会话状态,直到EventThread事件线程实际处理该事件并将其交给watcher为止。
  6. private volatile KeeperState sessionState = KeeperState.Disconnected;
  7. private volatile boolean wasKilled = false;
  8. private volatile boolean isRunning = false;
  9. EventThread() {
  10. super(makeThreadName("-EventThread"));
  11. setUncaughtExceptionHandler(uncaughtExceptionHandler);
  12. setDaemon(true);
  13. }
  14. //将WatchedEvent加入到waitingEvents队列中
  15. //在SendThread线程的readResponse(ByteBuffer incomingBuffer)读取从服务器端的response
  16. //如果是事件,则解析反序列化还原成WatchedEvent,调用该方法
  17. public void queueEvent(WatchedEvent event) {
  18. if (event.getType() == EventType.None
  19. && sessionState == event.getState()) {
  20. return;
  21. }
  22. sessionState = event.getState();
  23. //根据event获取针对该event监听的watcher集合
  24. WatcherSetEventPair pair = new WatcherSetEventPair(
  25. watcher.materialize(event.getState(), event.getType(),event.getPath()),
  26. event);
  27. // queue the pair (watch set & event) for later processing
  28. //将watcher集合和事件组放入到队列中排队,之后进行处理
  29. waitingEvents.add(pair);
  30. }
  31. //在SendThread线程的readResponse(ByteBuffer incomingBuffer)读取从服务器端的response
  32. //如果收到的是正常的响应请求,则从pendingQueue队列中取出一个Packet,校验之后调用该方法
  33. //放入到waitingEvents队列中
  34. public void queuePacket(Packet packet) {
  35. if (wasKilled) {
  36. synchronized (waitingEvents) {
  37. if (isRunning) waitingEvents.add(packet);
  38. else processEvent(packet);
  39. }
  40. } else {
  41. waitingEvents.add(packet);
  42. }
  43. }
  44. public void queueEventOfDeath() {
  45. waitingEvents.add(eventOfDeath);
  46. }
  47. @Override
  48. public void run() {
  49. try {
  50. isRunning = true;
  51. //一直循环,从waitingEvents队列中取出WatchedEvent或者Packet
  52. while (true) {
  53. Object event = waitingEvents.take();
  54. if (event == eventOfDeath) {
  55. wasKilled = true;
  56. } else {
  57. //处理event
  58. processEvent(event);
  59. }
  60. if (wasKilled)
  61. synchronized (waitingEvents) {
  62. if (waitingEvents.isEmpty()) {
  63. isRunning = false;
  64. break;
  65. }
  66. }
  67. }
  68. } catch (InterruptedException e) {
  69. LOG.error("Event thread exiting due to interruption", e);
  70. }
  71. LOG.info("EventThread shut down");
  72. }
  73. private void processEvent(Object event) {
  74. try {
  75. //watcher监听事件
  76. if (event instanceof WatcherSetEventPair) {
  77. // each watcher will process the event
  78. WatcherSetEventPair pair = (WatcherSetEventPair) event;
  79. //循环watcher集合,调用watcher的回调方法(WatchedEvent event)处理该事件
  80. for (Watcher watcher : pair.watchers) {
  81. try {
  82. //还记得Watcher接口中对于watcher实现类都要实现的一个process(WatchedEvent event)方法,就是这个
  83. watcher.process(pair.event);
  84. } catch (Throwable t) {
  85. LOG.error("Error while calling watcher ", t);
  86. }
  87. }
  88. }
  89. //异步回调
  90. else {
  91. Packet p = (Packet) event;
  92. //ResultCode服务器端响应码,常见码如下
  93. //0:接口调用成功 -4客户端与服务器端连接已断开
  94. //-110指定节点已存在 -112会话已过期
  95. int rc = 0;
  96. String clientPath = p.clientPath;
  97. if (p.replyHeader.getErr() != 0) {
  98. rc = p.replyHeader.getErr();
  99. }
  100. if (p.cb == null) {
  101. LOG.warn("Somehow a null cb got to EventThread!");
  102. }
  103. //如果是exists/setData/setAcl命令异步方式的服务器端响应
  104. else if (p.response instanceof ExistsResponse
  105. || p.response instanceof SetDataResponse
  106. || p.response instanceof SetACLResponse) {
  107. StatCallback cb = (StatCallback) p.cb;
  108. //rc==0表示接口调用成功
  109. if (rc == 0) {
  110. if (p.response instanceof ExistsResponse) {
  111. cb.processResult(rc, clientPath, p.ctx,
  112. ((ExistsResponse) p.response)
  113. .getStat());
  114. } else if (p.response instanceof SetDataResponse) {
  115. cb.processResult(rc, clientPath, p.ctx,
  116. ((SetDataResponse) p.response)
  117. .getStat());
  118. } else if (p.response instanceof SetACLResponse) {
  119. cb.processResult(rc, clientPath, p.ctx,
  120. ((SetACLResponse) p.response)
  121. .getStat());
  122. }
  123. } else {
  124. cb.processResult(rc, clientPath, p.ctx, null);
  125. }
  126. }
  127. //getData命令异步方式的服务器异步响应
  128. else if (p.response instanceof GetDataResponse) {
  129. DataCallback cb = (DataCallback) p.cb;
  130. GetDataResponse rsp = (GetDataResponse) p.response;
  131. if (rc == 0) {
  132. cb.processResult(rc, clientPath, p.ctx, rsp
  133. .getData(), rsp.getStat());
  134. } else {
  135. cb.processResult(rc, clientPath, p.ctx, null,
  136. null);
  137. }
  138. }
  139. //getACL命令异步方式的服务器异步响应
  140. else if (p.response instanceof GetACLResponse) {
  141. ACLCallback cb = (ACLCallback) p.cb;
  142. GetACLResponse rsp = (GetACLResponse) p.response;
  143. if (rc == 0) {
  144. cb.processResult(rc, clientPath, p.ctx, rsp
  145. .getAcl(), rsp.getStat());
  146. } else {
  147. cb.processResult(rc, clientPath, p.ctx, null,
  148. null);
  149. }
  150. }
  151. //getChildren命令的服务器响应
  152. else if (p.response instanceof GetChildrenResponse) {
  153. ChildrenCallback cb = (ChildrenCallback) p.cb;
  154. GetChildrenResponse rsp = (GetChildrenResponse) p.response;
  155. if (rc == 0) {
  156. cb.processResult(rc, clientPath, p.ctx, rsp
  157. .getChildren());
  158. } else {
  159. cb.processResult(rc, clientPath, p.ctx, null);
  160. }
  161. }
  162. //getChildren命令异步方式的服务器异步响应,与上面的区别在与stat信息更新
  163. else if (p.response instanceof GetChildren2Response) {
  164. Children2Callback cb = (Children2Callback) p.cb;
  165. GetChildren2Response rsp = (GetChildren2Response) p.response;
  166. if (rc == 0) {
  167. cb.processResult(rc, clientPath, p.ctx, rsp
  168. .getChildren(), rsp.getStat());
  169. } else {
  170. cb.processResult(rc, clientPath, p.ctx, null, null);
  171. }
  172. }
  173. //create命令异步方式的服务器异步响应
  174. else if (p.response instanceof CreateResponse) {
  175. StringCallback cb = (StringCallback) p.cb;
  176. CreateResponse rsp = (CreateResponse) p.response;
  177. if (rc == 0) {
  178. cb.processResult(rc, clientPath, p.ctx,
  179. (chrootPath == null
  180. ? rsp.getPath()
  181. : rsp.getPath()
  182. .substring(chrootPath.length())));
  183. } else {
  184. cb.processResult(rc, clientPath, p.ctx, null);
  185. }
  186. }
  187. else if (p.cb instanceof VoidCallback) {
  188. VoidCallback cb = (VoidCallback) p.cb;
  189. cb.processResult(rc, clientPath, p.ctx);
  190. }
  191. }
  192. } catch (Throwable t) {
  193. LOG.error("Caught unexpected throwable", t);
  194. }
  195. }
  196. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/94746
推荐阅读
相关标签
  

闽ICP备14008679号