watch 监听机制是 zookeeper 的关键技术之一!
watch 监听机制是 zookeeper 的关键技术之一!
时效性: watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知。
- [zk: localhost:2181(CONNECTED) 1] ls /hello watch
- []
- [zk: localhost:2181(CONNECTED) 2] create /hello/test abc
- WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/hello
- Created /hello/test
- //Zookeeper.java
- public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
- PathUtils.validatePath(path);
- ZooKeeper.WatchRegistration wcb = null;
- if (watcher != null) {
- //注册watch
- wcb = new ZooKeeper.DataWatchRegistration(watcher, path);
- }
- String serverPath = this.prependChroot(path);
- RequestHeader h = new RequestHeader();
- h.setType(4);
- GetDataRequest request = new GetDataRequest();
- request.setPath(serverPath);
- request.setWatch(watcher != null);
- GetDataResponse response = new GetDataResponse();
- ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
- if (r.getErr() != 0) {
- throw KeeperException.create(Code.get(r.getErr()), path);
- } else {
- if (stat != null) {
- DataTree.copyStat(response.getStat(), stat);
- }
- return response.getData();
- }
- }
- public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException {
- ReplyHeader r = new ReplyHeader();
- ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration, watchDeregistration);
- synchronized(packet) {
- while(!packet.finished) {
- packet.wait();
- }
- return r;
- }
- }
- public ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
- ClientCnxn.Packet packet = null;
- packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
- packet.cb = cb;
- packet.ctx = ctx;
- packet.clientPath = clientPath;
- packet.serverPath = serverPath;
- packet.watchDeregistration = watchDeregistration;
- synchronized(this.state) {
- if (this.state.isAlive() && !this.closing) {
- if (h.getType() == -11) {
- this.closing = true;
- }
- this.outgoingQueue.add(packet);
- } else {
- this.conLossPacket(packet);
- }
- }
- this.sendThread.getClientCnxnSocket().packetAdded();
- return packet;
- }
- class SendThread extends ZooKeeperThread {
- .....
- void readResponse(ByteBuffer incomingBuffer) throws IOException {
- ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ReplyHeader replyHdr = new ReplyHeader();
- replyHdr.deserialize(bbia, "header");
- if (replyHdr.getXid() == -2) {
- if (ClientCnxn.LOG.isDebugEnabled()) {
- ClientCnxn.LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " after " + (System.nanoTime() - this.lastPingSentNs) / 1000000L + "ms");
- }
- } else if (replyHdr.getXid() == -4) {
- if (replyHdr.getErr() == Code.AUTHFAILED.intValue()) {
- ClientCnxn.this.state = States.AUTH_FAILED;
- ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
- }
- if (ClientCnxn.LOG.isDebugEnabled()) {
- ClientCnxn.LOG.debug("Got auth sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
- }
- } else if (replyHdr.getXid() == -1) {
- if (ClientCnxn.LOG.isDebugEnabled()) {
- ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
- }
- WatcherEvent event = new WatcherEvent();
- event.deserialize(bbia, "response");
- if (ClientCnxn.this.chrootPath != null) {
- String serverPath = event.getPath();
- if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
- event.setPath("/");
- } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
- event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
- } else {
- ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
- }
- }
- WatchedEvent we = new WatchedEvent(event);
- if (ClientCnxn.LOG.isDebugEnabled()) {
- ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
- }
- ClientCnxn.this.eventThread.queueEvent(we);
- } else if (this.tunnelAuthInProgress()) {
- GetSASLRequest request = new GetSASLRequest();
- request.deserialize(bbia, "token");
- ClientCnxn.this.zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
- } else {
- ClientCnxn.Packet packet;
- synchronized(ClientCnxn.this.pendingQueue) {
- if (ClientCnxn.this.pendingQueue.size() == 0) {
- throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
- }
- packet = (ClientCnxn.Packet)ClientCnxn.this.pendingQueue.remove();
- }
- try {
- if (packet.requestHeader.getXid() != replyHdr.getXid()) {
- packet.replyHeader.setErr(Code.CONNECTIONLOSS.intValue());
- throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet);
- }
- packet.replyHeader.setXid(replyHdr.getXid());
- packet.replyHeader.setErr(replyHdr.getErr());
- packet.replyHeader.setZxid(replyHdr.getZxid());
- if (replyHdr.getZxid() > 0L) {
- ClientCnxn.this.lastZxid = replyHdr.getZxid();
- }
- if (packet.response != null && replyHdr.getErr() == 0) {
- packet.response.deserialize(bbia, "response");
- }
- if (ClientCnxn.LOG.isDebugEnabled()) {
- ClientCnxn.LOG.debug("Reading reply sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", packet:: " + packet);
- }
- } finally {
- ClientCnxn.this.finishPacket(packet);
- }
- }
- }
- private void finishPacket(ClientCnxn.Packet p) {
- int err = p.replyHeader.getErr();
- if (p.watchRegistration != null) {
- p.watchRegistration.register(err);
- }
- if (p.watchDeregistration != null) {
- Map materializedWatchers = null;
- try {
- materializedWatchers = p.watchDeregistration.unregister(err);
- Iterator i$ = materializedWatchers.entrySet().iterator();
- while(i$.hasNext()) {
- Entry<EventType, Set<Watcher>> entry = (Entry)i$.next();
- Set<Watcher> watchers = (Set)entry.getValue();
- if (watchers.size() > 0) {
- this.queueEvent(p.watchDeregistration.getClientPath(), err, watchers, (EventType)entry.getKey());
- p.replyHeader.setErr(Code.OK.intValue());
- }
- }
- } catch (NoWatcherException var9) {
- LOG.error("Failed to find watcher!", var9);
- p.replyHeader.setErr(var9.code().intValue());
- } catch (KeeperException var10) {
- LOG.error("Exception when removing watcher", var10);
- p.replyHeader.setErr(var10.code().intValue());
- }
- }
- if (p.cb == null) {
- synchronized(p) {
- p.finished = true;
- p.notifyAll();
- }
- } else {
- p.finished = true;
- this.eventThread.queuePacket(p);
- }
- }
在SendThread.readResponse()中的xid=-1来进行处理——>调用 eventThread.queueEvent()进行处理
- class SendThread extends ZooKeeperThread {
- .....
- void readResponse(ByteBuffer incomingBuffer) throws IOException {
- ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ReplyHeader replyHdr = new ReplyHeader();
- replyHdr.deserialize(bbia, "header");
- if (replyHdr.getXid() == -2) {
- ....
- } else if (replyHdr.getXid() == -1) {
- if (ClientCnxn.LOG.isDebugEnabled()) {
- ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
- }
- WatcherEvent event = new WatcherEvent();
- event.deserialize(bbia, "response");
- if (ClientCnxn.this.chrootPath != null) {
- String serverPath = event.getPath();
- if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0) {
- event.setPath("/");
- } else if (serverPath.length() > ClientCnxn.this.chrootPath.length()) {
- event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
- } else {
- ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
- }
- }
- WatchedEvent we = new WatchedEvent(event);
- if (ClientCnxn.LOG.isDebugEnabled()) {
- ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
- }
- ClientCnxn.this.eventThread.queueEvent(we);
- }
- ....
- }
- //FinalRequestProcessor.java
- public void processRequest(Request request) {
- ....
- ProcessTxnResult rc = null;
- synchronized(this.zks.outstandingChanges) {
- rc = this.zks.processTxn(request);
- if (request.getHdr() != null) {
- TxnHeader hdr = request.getHdr();
- Record txn = request.getTxn();
- long zxid = hdr.getZxid();
- while(!this.zks.outstandingChanges.isEmpty() && ((ChangeRecord)this.zks.outstandingChanges.get(0)).zxid <= zxid) {
- ChangeRecord cr = (ChangeRecord)this.zks.outstandingChanges.remove(0);
- if (cr.zxid < zxid) {
- LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + zxid);
- }
- if (this.zks.outstandingChangesForPath.get(cr.path) == cr) {
- this.zks.outstandingChangesForPath.remove(cr.path);
- }
- }
- }
- if (request.isQuorum()) {
- this.zks.getZKDatabase().addCommittedProposal(request);
- }
- }
- if (request.type != -11 || !this.connClosedByClient(request) || !this.closeSession(this.zks.serverCnxnFactory, request.sessionId) && !this.closeSession(this.zks.secureServerCnxnFactory, request.sessionId)) {
- if (request.cnxn != null) {
- ServerCnxn cnxn = request.cnxn;
- String lastOp = "NA";
- this.zks.decInProcess();
- Code err = Code.OK;
- Object rsp = null;
- try {
- if (request.getHdr() != null && request.getHdr().getType() == -1) {
- if (request.getException() != null) {
- throw request.getException();
- }
- throw KeeperException.create(Code.get(((ErrorTxn)request.getTxn()).getErr()));
- }
- KeeperException ke = request.getException();
- if (ke != null && request.type != 14) {
- throw ke;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}", request);
- }
- boolean removed;
- String msg;
- WatcherType type;
- Stat stat;
- DataNode n;
- List children;
- Stat stat;
- label170:
- switch(request.type) {
- ...
- case 4:
- lastOp = "GETD";
- GetDataRequest getDataRequest = new GetDataRequest();
- ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
- n = this.zks.getZKDatabase().getNode(getDataRequest.getPath());
- if (n == null) {
- throw new NoNodeException();
- }
- PrepRequestProcessor.checkACL(this.zks, this.zks.getZKDatabase().aclForNode(n), 1, request.authInfo);
- stat = new Stat();
- byte[] b = this.zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
- rsp = new GetDataResponse(b, stat);
- break;
- ....
- } catch (SessionMovedException var15) {
- ....
- }
- long lastZxid = this.zks.getZKDatabase().getDataTreeLastProcessedZxid();
- ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
- this.zks.serverStats().updateLatency(request.createTime);
- cnxn.updateStatsForResponse((long)request.cxid, lastZxid, lastOp, request.createTime, Time.currentElapsedTime());
- try {
- cnxn.sendResponse(hdr, (Record)rsp, "response");
- if (request.type == -11) {
- cnxn.sendCloseSession();
- }
- } catch (IOException var14) {
- LOG.error("FIXMSG", var14);
- }
- }
- }
- }
setData()对节点数据变更——>调用 WatchManager.triggerWatch 触发事件——>触发之后删除(客户端watch机制是一次性的)
- //DataTree.java
- public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
- Stat s = new Stat();
- DataNode n = (DataNode)this.nodes.get(path);
- if (n == null) {
- throw new NoNodeException();
- } else {
- byte[] lastdata = null;
- byte[] lastdata;
- synchronized(n) {
- lastdata = n.data;
- n.data = data;
- n.stat.setMtime(time);
- n.stat.setMzxid(zxid);
- n.stat.setVersion(version);
- n.copyStat(s);
- }
- String lastPrefix = this.getMaxPrefixWithQuota(path);
- if (lastPrefix != null) {
- this.updateBytes(lastPrefix, (long)((data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)));
- }
- this.dataWatches.triggerWatch(path, EventType.NodeDataChanged);
- return s;
- }
- }
- //WatchManager.java
- Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
- WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
- HashSet watchers;
- synchronized(this) {
- watchers = (HashSet)this.watchTable.remove(path);
- if (watchers == null || watchers.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, 64L, "No watchers for " + path);
- }
- return null;
- }
- Iterator i$ = watchers.iterator();
- while(i$.hasNext()) {
- Watcher w = (Watcher)i$.next();
- HashSet<String> paths = (HashSet)this.watch2Paths.get(w);
- if (paths != null) {
- paths.remove(path);
- }
- }
- }
- Iterator i$ = watchers.iterator();
- while(true) {
- Watcher w;
- do {
- if (!i$.hasNext()) {
- return watchers;
- }
- w = (Watcher)i$.next();
- } while(supress != null && supress.contains(w));
- w.process(e);
- }
- }
- //DataTree.java
- public void deleteNode(String path, long zxid) throws NoNodeException {
- int lastSlash = path.lastIndexOf(47);
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- DataNode node = (DataNode)this.nodes.get(path);
- if (node == null) {
- throw new NoNodeException();
- } else {
- this.nodes.remove(path);
- synchronized(node) {
- this.aclCache.removeUsage(node.acl);
- }
- DataNode parent = (DataNode)this.nodes.get(parentName);
- if (parent == null) {
- throw new NoNodeException();
- } else {
- synchronized(parent) {
- parent.removeChild(childName);
- parent.stat.setPzxid(zxid);
- long eowner = node.stat.getEphemeralOwner();
- EphemeralType ephemeralType = EphemeralType.get(eowner);
- if (ephemeralType == EphemeralType.CONTAINER) {
- this.containers.remove(path);
- } else if (ephemeralType == EphemeralType.TTL) {
- this.ttls.remove(path);
- } else if (eowner != 0L) {
- HashSet<String> nodes = (HashSet)this.ephemerals.get(eowner);
- if (nodes != null) {
- synchronized(nodes) {
- nodes.remove(path);
- }
- }
- }
- }
- if (parentName.startsWith("/zookeeper") && "zookeeper_limits".equals(childName)) {
- this.pTrie.deletePath(parentName.substring("/zookeeper/quota".length()));
- }
- String lastPrefix = this.getMaxPrefixWithQuota(path);
- if (lastPrefix != null) {
- this.updateCount(lastPrefix, -1);
- int bytes = false;
- int bytes;
- synchronized(node) {
- bytes = node.data == null ? 0 : -node.data.length;
- }
- this.updateBytes(lastPrefix, (long)bytes);
- }
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, 64L, "dataWatches.triggerWatch " + path);
- ZooTrace.logTraceMessage(LOG, 64L, "childWatches.triggerWatch " + parentName);
- }
- Set<Watcher> processed = this.dataWatches.triggerWatch(path, EventType.NodeDeleted);
- this.childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
- this.childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
- }
- }
- }
既然Watcher监听器是一次性的,在开发过程中需要反复注册Watcher,比较繁琐。Curator引入了Cache来监听ZooKeeper服务端的事件。Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程,简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,主要有以下三类:NodeCache使用的第一步,就是构造一个NodeCache缓存实例,有两个构造方法NodeCache(CuratorFramework client, String path) 和NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) 第一个参数就是传入创建的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数dataIsCompressed 表示是否对数据进行压缩。
NodeCache使用的第二步,就是构造一个NodeCacheListener监听器实例,NodeCacheListener监听器接口,只定义了一个简单的方法 nodeChanged,当节点变化时,这个方法就会被回调。
再创建完NodeCacheListener的实例之后,需要将这个实例注册到NodeCache缓存实例,使用缓存实例的addListener方法,然后使用缓存实例nodeCache的start方法,启动节点的事件监听。PathChildrenCache 子节点监听,只能监听子节点,监听不到当前节点,不能递归监听,子节点下的子节点不能递归监控。Tree Cache节点树缓存可以看做是上两种的合体,Tree Cache观察的是当前ZNode节点的所有数据。而TreeCache节点树缓存是PathChildrenCache的增强,不光能监听子节点,也能监听节点自身,使用步骤和NodeCache类似。
