当前位置:   article > 正文

进阶分布式系统架构系列(七):Zookeeper 监听机制

zk监听

点击下方名片,设为星标

回复“1024”获取2TB学习资源!

前面介绍了 Zookeeper 安装部署常用管理命令节点选举机制等相关的知识点,今天我将详细的为大家介绍 zookeeper 监听机制相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!

监听器原理

  • 首先要有一个main()线程

  • 在main()线程中创建Zookeeper客户端,这时就会创建两个线程connect线程负责网络连接通信,listen线程负责监听

  • 通过connect线程将注册的监听事件发送给Zookeeper服务器

  • 在Zookeeper的注册监听列表中将注册的监听事件添加到列表中,表示这个服务器中的/path,即根目录这个路径被客户端监听了

  • 一旦被监听的服务器根目录下,数据或路径发生改变,Zookeeper服务器就会将这个消息发送给Listener线程

  • Listener线程内部调用process方法,采取相应的措施

93ac0d4875f264035506ee671c37fde8.png

Watcher监听

watch 监听机制是 zookeeper 的关键技术之一!

watch监听机制的本质

当今时代,发布订阅场景到处可见,像微信中的公众号消息订阅,或者网购场景下库存消息的订阅通知等等,这些都是属于发布订阅的场景。

watch监听机制是zk的一个关键技术,zk通过它来实现发布订阅的功能,通过watch我们可以联想到设计模式中的观察者模式,二者确实有点类似,你可以将其看成是分布式场景下的观察者模式。9b590a943ade48fa9897932afb729909.png

watcher特性
  • 一次性:watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册

  • 客户端顺序回调:watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行

  • 轻量级:watchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容。

  • 时效性: watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知。

watcher 监听机制实现

客户端收到集合数据更改的通知,client可以在读取的指定znode时设置watches,(客户端注册表)watches会向注册的client发送关于znode更改的通知(比如监听节点数据变更、节点删除、子节点状态变更等事件)。

例如:这里新增一个子节点就可以观察watcher监听事件的变化

  1. [zk: localhost:2181(CONNECTED) 1] ls /hello watch
  2. []
  3. [zk: localhost:2181(CONNECTED) 2] create /hello/test abc
  4.  
  5. WATCHER::
  6.  
  7. WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/hello
  8. Created /hello/test

通过这个事件机制,可以基于zl实现分布式锁、集群管理等功能。

当节点变更时,zk产生一个watcher事件并发送到client,但是client只会收到一次通知,也就是后续节点再发生变化就不会再次收到通知。(watcher是一次性的),可以通过循环监听去实现永久监听。比如说我再建立一个子节点,此时就没有触发监听事件。

watcher 监听流程

总体分三个过程:客户端注册watcher、服务器处理watcher、客户端回调watcher客户端。

客户端watch的注册和回调
客户端watch注册实现过程

发送一个带有watch事件的请求——>DataWatchRegistration保存watch事件——>将请求封装成Packet并放入一个队列等待发送——>调用SendThread中的readResponse——>ZKWatchManager将该watch事件进行存储

  1. //Zookeeper.java
  2.     public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
  3.         PathUtils.validatePath(path);
  4.         ZooKeeper.WatchRegistration wcb = null;
  5.         if (watcher != null) {
  6.          //注册watch
  7.             wcb = new ZooKeeper.DataWatchRegistration(watcher, path);
  8.         }
  9.         String serverPath = this.prependChroot(path);
  10.         RequestHeader h = new RequestHeader();
  11.         h.setType(4);
  12.         GetDataRequest request = new GetDataRequest();
  13.         request.setPath(serverPath);
  14.         request.setWatch(watcher != null);
  15.         GetDataResponse response = new GetDataResponse();
  16.         ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
  17.         if (r.getErr() != 0) {
  18.             throw KeeperException.create(Code.get(r.getErr()), path);
  19.         } else {
  20.             if (stat != null) {
  21.                 DataTree.copyStat(response.getStat(), stat);
  22.             }
  23.             return response.getData();
  24.         }
  25.     }
  1. public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {
  2.         ReplyHeader r = new ReplyHeader();
  3.         ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration, watchDeregistration);
  4.         synchronized(packet) {
  5.             while(!packet.finished) {
  6.                 packet.wait();
  7.             }
  8.             return r;
  9.         }
  10.     }
  1. public ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
  2.         ClientCnxn.Packet packet = null;
  3.         packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
  4.         packet.cb = cb;
  5.         packet.ctx = ctx;
  6.         packet.clientPath = clientPath;
  7.         packet.serverPath = serverPath;
  8.         packet.watchDeregistration = watchDeregistration;
  9.         synchronized(this.state) {
  10.             if (this.state.isAlive() && !this.closing) {
  11.                 if (h.getType() == -11) {
  12.                     this.closing = true;
  13.                 }
  14.                 this.outgoingQueue.add(packet);
  15.             } else {
  16.                 this.conLossPacket(packet);
  17.             }
  18.         }
  19.         this.sendThread.getClientCnxnSocket().packetAdded();
  20.         return packet;
  21.     }
  1. class SendThread extends ZooKeeperThread {
  2.         .....
  3.         void readResponse(ByteBuffer incomingBuffer) throws IOException {
  4.             ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
  5.             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  6.             ReplyHeader replyHdr = new ReplyHeader();
  7.             replyHdr.deserialize(bbia, "header");
  8.             if (replyHdr.getXid() == -2) {
  9.                 if (ClientCnxn.LOG.isDebugEnabled()) {
  10.                     ClientCnxn.LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " after " + (System.nanoTime() - this.lastPingSentNs) / 1000000L + "ms");
  11.                 }
  12.             } else if (replyHdr.getXid() == -4) {
  13.                 if (replyHdr.getErr() == Code.AUTHFAILED.intValue()) {
  14.                     ClientCnxn.this.state = States.AUTH_FAILED;
  15.                     ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
  16.                 }
  17.                 if (ClientCnxn.LOG.isDebugEnabled()) {
  18.                     ClientCnxn.LOG.debug("Got auth sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
  19.                 }
  20.             } else if (replyHdr.getXid() == -1) {
  21.                 if (ClientCnxn.LOG.isDebugEnabled()) {
  22.                     ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
  23.                 }
  24.                 WatcherEvent event = new WatcherEvent();
  25.                 event.deserialize(bbia, "response");
  26.                 if (ClientCnxn.this.chrootPath != null) {
  27.                     String serverPath = event.getPath();
  28.                     if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
  29.                         event.setPath("/");
  30.                     } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
  31.                         event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
  32.                     } else {
  33.                         ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
  34.                     }
  35.                 }
  36.                 WatchedEvent we = new WatchedEvent(event);
  37.                 if (ClientCnxn.LOG.isDebugEnabled()) {
  38.                     ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
  39.                 }
  40.                 ClientCnxn.this.eventThread.queueEvent(we);
  41.             } else if (this.tunnelAuthInProgress()) {
  42.                 GetSASLRequest request = new GetSASLRequest();
  43.                 request.deserialize(bbia, "token");
  44.                 ClientCnxn.this.zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
  45.             } else {
  46.                 ClientCnxn.Packet packet;
  47.                 synchronized(ClientCnxn.this.pendingQueue) {
  48.                     if (ClientCnxn.this.pendingQueue.size() == 0) {
  49.                         throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
  50.                     }
  51.                     packet = (ClientCnxn.Packet)ClientCnxn.this.pendingQueue.remove();
  52.                 }
  53.                 try {
  54.                     if (packet.requestHeader.getXid() != replyHdr.getXid()) {
  55.                         packet.replyHeader.setErr(Code.CONNECTIONLOSS.intValue());
  56.                         throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet);
  57.                     }
  58.                     packet.replyHeader.setXid(replyHdr.getXid());
  59.                     packet.replyHeader.setErr(replyHdr.getErr());
  60.                     packet.replyHeader.setZxid(replyHdr.getZxid());
  61.                     if (replyHdr.getZxid() > 0L) {
  62.                         ClientCnxn.this.lastZxid = replyHdr.getZxid();
  63.                     }
  64.                     if (packet.response != null && replyHdr.getErr() == 0) {
  65.                         packet.response.deserialize(bbia, "response");
  66.                     }
  67.                     if (ClientCnxn.LOG.isDebugEnabled()) {
  68.                         ClientCnxn.LOG.debug("Reading reply sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", packet:: " + packet);
  69.                     }
  70.                 } finally {
  71.                     ClientCnxn.this.finishPacket(packet);
  72.                 }
  73.             }
  74.         }
  1. private void finishPacket(ClientCnxn.Packet p) {
  2.         int err = p.replyHeader.getErr();
  3.         if (p.watchRegistration != null) {
  4.             p.watchRegistration.register(err);
  5.         }
  6.         if (p.watchDeregistration != null) {
  7.             Map materializedWatchers = null;
  8.             try {
  9.                 materializedWatchers = p.watchDeregistration.unregister(err);
  10.                 Iterator i$ = materializedWatchers.entrySet().iterator();
  11.                 while(i$.hasNext()) {
  12.                     Entry<EventType, Set<Watcher>> entry = (Entry)i$.next();
  13.                     Set<Watcher> watchers = (Set)entry.getValue();
  14.                     if (watchers.size() > 0) {
  15.                         this.queueEvent(p.watchDeregistration.getClientPath(), err, watchers, (EventType)entry.getKey());
  16.                         p.replyHeader.setErr(Code.OK.intValue());
  17.                     }
  18.                 }
  19.             } catch (NoWatcherException var9) {
  20.                 LOG.error("Failed to find watcher!", var9);
  21.                 p.replyHeader.setErr(var9.code().intValue());
  22.             } catch (KeeperException var10) {
  23.                 LOG.error("Exception when removing watcher", var10);
  24.                 p.replyHeader.setErr(var10.code().intValue());
  25.             }
  26.         }
  27.         if (p.cb == null) {
  28.             synchronized(p) {
  29.                 p.finished = true;
  30.                 p.notifyAll();
  31.             }
  32.         } else {
  33.             p.finished = true;
  34.             this.eventThread.queuePacket(p);
  35.         }
  36.     }
客户端回调处理过程:

在SendThread.readResponse()中的xid=-1来进行处理——>调用 eventThread.queueEvent()进行处理

  1. class SendThread extends ZooKeeperThread {
  2.         .....
  3.         void readResponse(ByteBuffer incomingBuffer) throws IOException {
  4.             ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
  5.             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  6.             ReplyHeader replyHdr = new ReplyHeader();
  7.             replyHdr.deserialize(bbia, "header");
  8.             if (replyHdr.getXid() == -2) {
  9.       ....
  10.             } else if (replyHdr.getXid() == -1) {
  11.                 if (ClientCnxn.LOG.isDebugEnabled()) {
  12.                     ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
  13.                 }
  14.                 WatcherEvent event = new WatcherEvent();
  15.                 event.deserialize(bbia, "response");
  16.                 if (ClientCnxn.this.chrootPath != null) {
  17.                     String serverPath = event.getPath();
  18.                     if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
  19.                         event.setPath("/");
  20.                     } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
  21.                         event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
  22.                     } else {
  23.                         ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
  24.                     }
  25.                 }
  26.                 WatchedEvent we = new WatchedEvent(event);
  27.                 if (ClientCnxn.LOG.isDebugEnabled()) {
  28.                     ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
  29.                 }
  30.                 ClientCnxn.this.eventThread.queueEvent(we);
  31.             }
  32.             ....
  33.         }
服务端watch的注册和触发
服务端watch注册实现过程

判断请求是否带有watch注册事件——>通过FinalRequestProcessor中的processRequest解析请求——>若有watch,则调用zks.getZKDatabase().getData将事件注册到WatchManager

  1. //FinalRequestProcessor.java
  2.     public void processRequest(Request request) {
  3.   ....
  4.         ProcessTxnResult rc = null;
  5.         synchronized(this.zks.outstandingChanges) {
  6.             rc = this.zks.processTxn(request);
  7.             if (request.getHdr() != null) {
  8.                 TxnHeader hdr = request.getHdr();
  9.                 Record txn = request.getTxn();
  10.                 long zxid = hdr.getZxid();
  11.                 while(!this.zks.outstandingChanges.isEmpty() && ((ChangeRecord)this.zks.outstandingChanges.get(0)).zxid <= zxid) {
  12.                     ChangeRecord cr = (ChangeRecord)this.zks.outstandingChanges.remove(0);
  13.                     if (cr.zxid < zxid) {
  14.                         LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid);
  15.                     }
  16.                     if (this.zks.outstandingChangesForPath.get(cr.path) == cr) {
  17.                         this.zks.outstandingChangesForPath.remove(cr.path);
  18.                     }
  19.                 }
  20.             }
  21.             if (request.isQuorum()) {
  22.                 this.zks.getZKDatabase().addCommittedProposal(request);
  23.             }
  24.         }
  25.         if (request.type != -11 || !this.connClosedByClient(request) || !this.closeSession(this.zks.serverCnxnFactory, request.sessionId) && !this.closeSession(this.zks.secureServerCnxnFactory, request.sessionId)) {
  26.             if (request.cnxn != null) {
  27.                 ServerCnxn cnxn = request.cnxn;
  28.                 String lastOp = "NA";
  29.                 this.zks.decInProcess();
  30.                 Code err = Code.OK;
  31.                 Object rsp = null;
  32.                 try {
  33.                     if (request.getHdr() != null && request.getHdr().getType() == -1) {
  34.                         if (request.getException() != null) {
  35.                             throw request.getException();
  36.                         }
  37.                         throw KeeperException.create(Code.get(((ErrorTxn)request.getTxn()).getErr()));
  38.                     }
  39.                     KeeperException ke = request.getException();
  40.                     if (ke != null && request.type != 14) {
  41.                         throw ke;
  42.                     }
  43.                     if (LOG.isDebugEnabled()) {
  44.                         LOG.debug("{}", request);
  45.                     }
  46.                     boolean removed;
  47.                     String msg;
  48.                     WatcherType type;
  49.                     Stat stat;
  50.                     DataNode n;
  51.                     List children;
  52.                     Stat stat;
  53.                     label170:
  54.                     switch(request.type) {
  55.                     ...
  56.                     case 4:
  57.                         lastOp = "GETD";
  58.                         GetDataRequest getDataRequest = new GetDataRequest();
  59.                         ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
  60.                         n = this.zks.getZKDatabase().getNode(getDataRequest.getPath());
  61.                         if (n == null) {
  62.                             throw new NoNodeException();
  63.                         }
  64.                         PrepRequestProcessor.checkACL(this.zks, this.zks.getZKDatabase().aclForNode(n), 1, request.authInfo);
  65.                         stat = new Stat();
  66.                         byte[] b = this.zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
  67.                         rsp = new GetDataResponse(b, stat);
  68.                         break;
  69.                     ....
  70.                 } catch (SessionMovedException var15) {
  71.                     ....
  72.                 }
  73.                 long lastZxid = this.zks.getZKDatabase().getDataTreeLastProcessedZxid();
  74.                 ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
  75.                 this.zks.serverStats().updateLatency(request.createTime);
  76.                 cnxn.updateStatsForResponse((long)request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
  77.                 try {
  78.                     cnxn.sendResponse(hdr, (Record)rsp, "response");
  79.                     if (request.type == -11) {
  80.                         cnxn.sendCloseSession();
  81.                     }
  82.                 } catch (IOException var14) {
  83.                     LOG.error("FIXMSG", var14);
  84.                 }
  85.             }
  86.         }
  87.     }
服务端watch事件的触发过程

setData()对节点数据变更——>调用 WatchManager.triggerWatch 触发事件——>触发之后删除(客户端watch机制是一次性的)

  1. //DataTree.java
  2.     public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
  3.         Stat s = new Stat();
  4.         DataNode n = (DataNode)this.nodes.get(path);
  5.         if (n == null) {
  6.             throw new NoNodeException();
  7.         } else {
  8.             byte[] lastdata = null;
  9.             byte[] lastdata;
  10.             synchronized(n) {
  11.                 lastdata = n.data;
  12.                 n.data = data;
  13.                 n.stat.setMtime(time);
  14.                 n.stat.setMzxid(zxid);
  15.                 n.stat.setVersion(version);
  16.                 n.copyStat(s);
  17.             }
  18.             String lastPrefix = this.getMaxPrefixWithQuota(path);
  19.             if (lastPrefix != null) {
  20.                 this.updateBytes(lastPrefix, (long)((data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)));
  21.             }
  22.             this.dataWatches.triggerWatch(path, EventType.NodeDataChanged);
  23.             return s;
  24.         }
  25.     }
  1. //WatchManager.java
  2.     Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
  3.         WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
  4.         HashSet watchers;
  5.         synchronized(this) {
  6.             watchers = (HashSet)this.watchTable.remove(path);
  7.             if (watchers == null || watchers.isEmpty()) {
  8.                 if (LOG.isTraceEnabled()) {
  9.                     ZooTrace.logTraceMessage(LOG, 64L, "No watchers for " + path);
  10.                 }
  11.                 return null;
  12.             }
  13.             Iterator i$ = watchers.iterator();
  14.             while(i$.hasNext()) {
  15.                 Watcher w = (Watcher)i$.next();
  16.                 HashSet<String> paths = (HashSet)this.watch2Paths.get(w);
  17.                 if (paths != null) {
  18.                     paths.remove(path);
  19.                 }
  20.             }
  21.         }
  22.         Iterator i$ = watchers.iterator();
  23.         while(true) {
  24.             Watcher w;
  25.             do {
  26.                 if (!i$.hasNext()) {
  27.                     return watchers;
  28.                 }
  29.                 w = (Watcher)i$.next();
  30.             } while(supress != null && supress.contains(w));
  31.             w.process(e);
  32.         }
  33.     }
  1. //DataTree.java
  2.     public void deleteNode(String path, long zxid) throws NoNodeException {
  3.         int lastSlash = path.lastIndexOf(47);
  4.         String parentName = path.substring(0, lastSlash);
  5.         String childName = path.substring(lastSlash + 1);
  6.         DataNode node = (DataNode)this.nodes.get(path);
  7.         if (node == null) {
  8.             throw new NoNodeException();
  9.         } else {
  10.             this.nodes.remove(path);
  11.             synchronized(node) {
  12.                 this.aclCache.removeUsage(node.acl);
  13.             }
  14.             DataNode parent = (DataNode)this.nodes.get(parentName);
  15.             if (parent == null) {
  16.                 throw new NoNodeException();
  17.             } else {
  18.                 synchronized(parent) {
  19.                     parent.removeChild(childName);
  20.                     parent.stat.setPzxid(zxid);
  21.                     long eowner = node.stat.getEphemeralOwner();
  22.                     EphemeralType ephemeralType = EphemeralType.get(eowner);
  23.                     if (ephemeralType == EphemeralType.CONTAINER) {
  24.                         this.containers.remove(path);
  25.                     } else if (ephemeralType == EphemeralType.TTL) {
  26.                         this.ttls.remove(path);
  27.                     } else if (eowner != 0L) {
  28.                         HashSet<String> nodes = (HashSet)this.ephemerals.get(eowner);
  29.                         if (nodes != null) {
  30.                             synchronized(nodes) {
  31.                                 nodes.remove(path);
  32.                             }
  33.                         }
  34.                     }
  35.                 }
  36.                 if (parentName.startsWith("/zookeeper") && "zookeeper_limits".equals(childName)) {
  37.                     this.pTrie.deletePath(parentName.substring("/zookeeper/quota".length()));
  38.                 }
  39.                 String lastPrefix = this.getMaxPrefixWithQuota(path);
  40.                 if (lastPrefix != null) {
  41.                     this.updateCount(lastPrefix, -1);
  42.                     int bytes = false;
  43.                     int bytes;
  44.                     synchronized(node) {
  45.                         bytes = node.data == null ? 0 : -node.data.length;
  46.                     }
  47.                     this.updateBytes(lastPrefix, (long)bytes);
  48.                 }
  49.                 if (LOG.isTraceEnabled()) {
  50.                     ZooTrace.logTraceMessage(LOG, 64L, "dataWatches.triggerWatch " + path);
  51.                     ZooTrace.logTraceMessage(LOG, 64L, "childWatches.triggerWatch " + parentName);
  52.                 }
  53.                 Set<Watcher> processed = this.dataWatches.triggerWatch(path, EventType.NodeDeleted);
  54.                 this.childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
  55.                 this.childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
  56.             }
  57.         }
  58.     }
总结

客户端和服务端各自处理watch事件,并将所需信息分别在两端,这样一来,能够减少彼此之间的通信内容和频率,大大提升了服务的处理性能。

Cache事件监听

既然Watcher监听器是一次性的,在开发过程中需要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程,简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,主要有以下三类:f072276a585eb2a683afbeffbe081f8b.pngNodeCache使用的第一步,就是构造一个NodeCache缓存实例,有两个构造方法NodeCache(CuratorFramework client, String path) 和NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) 第一个参数就是传入创建的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数dataIsCompressed 表示是否对数据进行压缩。

NodeCache使用的第二步,就是构造一个NodeCacheListener监听器实例,NodeCacheListener监听器接口,只定义了一个简单的方法 nodeChanged,当节点变化时,这个方法就会被回调。

再创建完NodeCacheListener的实例之后,需要将这个实例注册到NodeCache缓存实例,使用缓存实例的addListener方法,然后使用缓存实例nodeCache的start方法,启动节点的事件监听。026898df9de4b62c5126b94e79bec60b.pngPathChildrenCache 子节点监听,只能监听子节点,监听不到当前节点,不能递归监听,子节点下的子节点不能递归监控。Tree Cache节点树缓存可以看做是上两种的合体,Tree Cache观察的是当前ZNode节点的所有数据。而TreeCache节点树缓存是PathChildrenCache的增强,不光能监听子节点,也能监听节点自身,使用步骤和NodeCache类似。

参考文章:https://blog.csdn.net/Z_HHHH_Z/article/details

/121750230  https://blog.csdn.net/weixin_44827241/article

/details/126092900

读者专属技术群

构建高质量的技术交流社群,欢迎从事后端开发、运维技术进群(备注岗位,已在技术交流群的请勿重复添加)。主要以技术交流、内推、行业探讨为主,请文明发言。广告人士勿入,切勿轻信私聊,防止被骗。

扫码加我好友,拉你进群

70150a95d63b15cc882388c98e133157.jpeg

推荐阅读 点击标题可跳转

一款超牛 X 的手机端 SSH 工具!吹爆

又一知名互联网大厂员工猝死!打工人务必警醒啊

CentOS 搭建 OpenVPN 服务,一次性成功!收藏了

免翻!加速!一招解决国内用户访问Github速度慢的问题

北京西二旗和上海张江程序员的终极悲惨宿命。。。

面试官:如何构建自定义的 Docker 镜像?问倒一大片

c5943a9efc0125a9cff6da54cdcbe299.png

PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下在看,加个星标,这样每次新文章推送才会第一时间出现在你的订阅列表里。点在看支持我们吧!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/974759
推荐阅读
相关标签
  

闽ICP备14008679号