当前位置:   article > 正文

zookeeper源码-watcher监听_eventthread shut down for session: 0x188f1a94ad724

eventthread shut down for session: 0x188f1a94ad724db

ClientCnxn类的readResponse()方法 

  1. //客户端读取返回值
  2. void readResponse(ByteBuffer incomingBuffer) throws IOException {
  3. ByteBufferInputStream bbis = new ByteBufferInputStream(
  4. incomingBuffer);
  5. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  6. ReplyHeader replyHdr = new ReplyHeader();
  7. replyHdr.deserialize(bbia, "header");
  8. if (replyHdr.getXid() == -2) {
  9. // -2 is the xid for pings
  10. if (LOG.isDebugEnabled()) {
  11. LOG.debug("Got ping response for sessionid: 0x"
  12. + Long.toHexString(sessionId)
  13. + " after "
  14. + ((System.nanoTime() - lastPingSentNs) / 1000000)
  15. + "ms");
  16. }
  17. return;
  18. }
  19. if (replyHdr.getXid() == -4) {
  20. // -4 is the xid for AuthPacket
  21. if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
  22. state = States.AUTH_FAILED;
  23. eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
  24. Watcher.Event.KeeperState.AuthFailed, null) );
  25. }
  26. if (LOG.isDebugEnabled()) {
  27. LOG.debug("Got auth sessionid:0x"
  28. + Long.toHexString(sessionId));
  29. }
  30. return;
  31. }
  32. //-1代表是通知
  33. if (replyHdr.getXid() == -1) {
  34. // -1 means notification
  35. if (LOG.isDebugEnabled()) {
  36. LOG.debug("Got notification sessionid:0x"
  37. + Long.toHexString(sessionId));
  38. }
  39. //从服务器返回读取事件数据
  40. WatcherEvent event = new WatcherEvent();
  41. event.deserialize(bbia, "response");
  42. //转换server到client
  43. // convert from a server path to a client path
  44. if (chrootPath != null) {
  45. String serverPath = event.getPath();
  46. if(serverPath.compareTo(chrootPath)==0)
  47. event.setPath("/");
  48. else if (serverPath.length() > chrootPath.length())
  49. event.setPath(serverPath.substring(chrootPath.length()));
  50. else {
  51. LOG.warn("Got server path " + event.getPath()
  52. + " which is too short for chroot path "
  53. + chrootPath);
  54. }
  55. }
  56. //触发watch事件,并将事件放进队列
  57. WatchedEvent we = new WatchedEvent(event);
  58. if (LOG.isDebugEnabled()) {
  59. LOG.debug("Got " + we + " for sessionid 0x"
  60. + Long.toHexString(sessionId));
  61. }
  62. eventThread.queueEvent( we );
  63. return;
  64. }
  65. // If SASL authentication is currently in progress, construct and
  66. // send a response packet immediately, rather than queuing a
  67. // response as with other packets.
  68. if (clientTunneledAuthenticationInProgress()) {
  69. GetSASLRequest request = new GetSASLRequest();
  70. request.deserialize(bbia,"token");
  71. zooKeeperSaslClient.respondToServer(request.getToken(),
  72. ClientCnxn.this);
  73. return;
  74. }
  75. Packet packet;
  76. synchronized (pendingQueue) {
  77. if (pendingQueue.size() == 0) {
  78. throw new IOException("Nothing in the queue, but got "
  79. + replyHdr.getXid());
  80. }
  81. packet = pendingQueue.remove();
  82. }
  83. /*
  84. * Since requests are processed in order, we better get a response
  85. * to the first request!
  86. */
  87. try {
  88. if (packet.requestHeader.getXid() != replyHdr.getXid()) {
  89. packet.replyHeader.setErr(
  90. KeeperException.Code.CONNECTIONLOSS.intValue());
  91. throw new IOException("Xid out of order. Got Xid "
  92. + replyHdr.getXid() + " with err " +
  93. + replyHdr.getErr() +
  94. " expected Xid "
  95. + packet.requestHeader.getXid()
  96. + " for a packet with details: "
  97. + packet );
  98. }
  99. packet.replyHeader.setXid(replyHdr.getXid());
  100. packet.replyHeader.setErr(replyHdr.getErr());
  101. packet.replyHeader.setZxid(replyHdr.getZxid());
  102. if (replyHdr.getZxid() > 0) {
  103. lastZxid = replyHdr.getZxid();
  104. }
  105. if (packet.response != null && replyHdr.getErr() == 0) {
  106. packet.response.deserialize(bbia, "response");
  107. }
  108. if (LOG.isDebugEnabled()) {
  109. LOG.debug("Reading reply sessionid:0x"
  110. + Long.toHexString(sessionId) + ", packet:: " + packet);
  111. }
  112. } finally {
  113. finishPacket(packet);
  114. }
  115. }
queueEvent()方法属于ClientCnxn类的内部类EventThread
  1. class EventThread extends ZooKeeperThread {
  2. private final LinkedBlockingQueue<Object> waitingEvents =
  3. new LinkedBlockingQueue<Object>();
  4. /** This is really the queued session state until the event
  5. * thread actually processes the event and hands it to the watcher.
  6. * But for all intents and purposes this is the state.
  7. */
  8. private volatile KeeperState sessionState = KeeperState.Disconnected;
  9. private volatile boolean wasKilled = false;
  10. private volatile boolean isRunning = false;
  11. EventThread() {
  12. super(makeThreadName("-EventThread"));
  13. setDaemon(true);
  14. }
  15. private final LinkedBlockingQueue<Object> waitingEvents =
  16. new LinkedBlockingQueue<Object>();
  17. //初始是关闭
  18. private volatile KeeperState sessionState = KeeperState.Disconnected;
  19. public void queueEvent(WatchedEvent event) {
  20. //如果KeeperState状态关闭,并且Event状态是-1,则直接退出
  21. if (event.getType() == EventType.None
  22. && sessionState == event.getState()) {
  23. return;
  24. }
  25. sessionState = event.getState();
  26. //这是Event参数
  27. // materialize the watchers based on the event
  28. WatcherSetEventPair pair = new WatcherSetEventPair(
  29. //触发materialize方法,返回对应的目录文件数据
  30. watcher.materialize(event.getState(), event.getType(),
  31. event.getPath()),
  32. event);
  33. // queue the pair (watch set & event) for later processing
  34. //放进waiting事件
  35. waitingEvents.add(pair);
  36. }

materialize方法

  1. @Override
  2. public Set<Watcher> materialize(Watcher.Event.KeeperState state,
  3. Watcher.Event.EventType type,
  4. String clientPath)
  5. {
  6. Set<Watcher> result = new HashSet<Watcher>();
  7. switch (type) {
  8. //当事件类型为None时,也就是客户端和服务端的租约到期,或者认证失败或者断开连接或者建立连接时(建立连接时不会清除watcher,客户端收到反馈后会返回所有事件并触发,当事件类型是节点数据变化或者节点创建时则触发dataWatches和existWatches,当事件类型是子节点变化时childWatches被触发,当事件类型是删除节点时三种watcher都会触发。
  9. case None:
  10. result.add(defaultWatcher);
  11. //满足两种情况则清空dataWatches,existWatches,childWatches
  12. //1.状态不是SyncConnected(客户端与服务器端已建立链接)
  13. //2.设置了System property:zookeepr.disableAutoWatchReset 为true
  14. boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
  15. state != Watcher.Event.KeeperState.SyncConnected;
  16. synchronized(dataWatches) {
  17. for(Set<Watcher> ws: dataWatches.values()) {
  18. result.addAll(ws);
  19. }
  20. if (clear) {
  21. dataWatches.clear();
  22. }
  23. }
  24. synchronized(existWatches) {
  25. for(Set<Watcher> ws: existWatches.values()) {
  26. result.addAll(ws);
  27. }
  28. if (clear) {
  29. existWatches.clear();
  30. }
  31. }
  32. synchronized(childWatches) {
  33. for(Set<Watcher> ws: childWatches.values()) {
  34. result.addAll(ws);
  35. }
  36. if (clear) {
  37. childWatches.clear();
  38. }
  39. }
  40. return result;
  41. case NodeDataChanged:
  42. //当为创建时候,根据子类事件类型触发相应事件
  43. case NodeCreated:
  44. synchronized (dataWatches) {
  45. addTo(dataWatches.remove(clientPath), result);
  46. }
  47. synchronized (existWatches) {
  48. addTo(existWatches.remove(clientPath), result);
  49. }
  50. break;
  51. //当为子目录事件时候,根据子类事件类型触发相应事件
  52. case NodeChildrenChanged:
  53. synchronized (childWatches) {
  54. addTo(childWatches.remove(clientPath), result);
  55. }
  56. break;
  57. //当为删除事件时候,根据子类事件类型触发相应事件
  58. case NodeDeleted:
  59. synchronized (dataWatches) {
  60. addTo(dataWatches.remove(clientPath), result);
  61. }
  62. // XXX This shouldn't be needed, but just in case
  63. synchronized (existWatches) {
  64. Set<Watcher> list = existWatches.remove(clientPath);
  65. if (list != null) {
  66. addTo(existWatches.remove(clientPath), result);
  67. LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
  68. }
  69. }
  70. synchronized (childWatches) {
  71. addTo(childWatches.remove(clientPath), result);
  72. }
  73. break;
  74. default:
  75. String msg = "Unhandled watch event type " + type
  76. + " with state " + state + " on path " + clientPath;
  77. LOG.error(msg);
  78. throw new RuntimeException(msg);
  79. }
  80. return result;
  81. }
  82. //将form添加至to
  83. final private void addTo(Set<Watcher> from, Set<Watcher> to) {
  84. if (from != null) {
  85. to.addAll(from);
  86. }
  87. }

EventThread线程的run方法

  1. @Override
  2. public void run() {
  3. try {
  4. isRunning = true;
  5. while (true) {
  6. //从队列中获取事件数据
  7. Object event = waitingEvents.take();
  8. if (event == eventOfDeath) {
  9. //是否关闭线程
  10. wasKilled = true;
  11. } else {
  12. //处理事件
  13. processEvent(event);
  14. }
  15. if (wasKilled)
  16. synchronized (waitingEvents) {
  17. if (waitingEvents.isEmpty()) {
  18. isRunning = false;
  19. break;
  20. }
  21. }
  22. }
  23. } catch (InterruptedException e) {
  24. LOG.error("Event thread exiting due to interruption", e);
  25. }
  26. LOG.info("EventThread shut down for session: 0x{}",
  27. Long.toHexString(getSessionId()));
  28. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/94747
推荐阅读
相关标签
  

闽ICP备14008679号