赞
踩
这里是通过getData请求注册一个默认监听器
这里将监听器对象个监听的节点封装起来,并且存放在客户端中,同时也会把监听器的信息封装到request对象中,但是注意这里只是把是否存在监听器发送过去。
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); // the watch contains the un-chroot path WatchRegistration wcb = null; // 注册监听器 // 这里会将监听器对象和节点路径封装起来 if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); // 这里会将监听器信息放入request中 // 这里只是将是否注册监听器传进去,并没有把监听器对象传进去 request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); // 这个方法就是将请求向服务端发送过去 // 并阻塞等待响应结果 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (stat != null) { DataTree.copyStat(response.getStat(), stat); } // 返回响应结果 return response.getData(); }
这里直接从处理器链中分析,至于服务端通过NIO发送请求就不再分析了,可以参考下面的博文
这个处理器并没有处理关于事件逻辑,下面是调用链
1.org.apache.zookeeper.server.RequestProcessor#processRequest
2.java.util.AbstractQueue#add
3.org.apache.zookeeper.server.PrepRequestProcessor#run
4.org.apache.zookeeper.server.PrepRequestProcessor#pRequest
pRequest方法就是这个处理器的真正的处理方法,这里getData请求只是作了ACL安全验证
然后就是进行下一个处理器
这个处理器主要是对数据持久化和日志的处理,也没有对事件进行处理,下面是调用链
1.org.apache.zookeeper.server.RequestProcessor#processRequest
2.java.util.AbstractQueue#add
3.org.apache.zookeeper.server.SyncRequestProcessor#run
接着是调用下一个处理器
这个处理器对事件做了真正的处理
1.org.apache.zookeeper.server.RequestProcessor#processRequest
2.org.apache.zookeeper.server.FinalRequestProcessor#processRequest
在处理请求的响应信息时会将其中的监听信息拿出来并存储在服务端的一个WatchManager对象中,调用链如下
3.org.apache.zookeeper.server.ZKDatabase#getData
4.org.apache.zookeeper.server.DataTree#getData
到最后就是将响应信息发送回客户端
接收了服务端响应这里并没有什么特殊的地方
假设发送了set请求,直接分析服务端的FinalRequestProcessor处理器,这里会触发事件,触发事件的调用链如下
1.org.apache.zookeeper.server.FinalRequestProcessor#processRequest
2.org.apache.zookeeper.server.ZooKeeperServer#processTxn
3.org.apache.zookeeper.server.ZKDatabase#processTxn
4.org.apache.zookeeper.server.DataTree#processTxn
5.org.apache.zookeeper.server.DataTree#setData
6.org.apache.zookeeper.server.WatchManager#triggerWatch(java.lang.String, org.apache.zookeeper.Watcher.Event.EventType)
7.org.apache.zookeeper.server.WatchManager#triggerWatch(java.lang.String, org.apache.zookeeper.Watcher.Event.EventType, java.util.Set<org.apache.zookeeper.Watcher>)
这里会见触发的监听器对象拿出来,并且移除,这也解释了为什么原生的客户端注册的监听器时一次性的。
这里在process方法中将触发的事件发送到客户端中去了
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { // 这里会通过remove方法拿到监听器 // 同时会将监听器从map中移除 // 这也解释了为什么原生的客户端注册的监听器时一次性的 watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { // 移除路径 paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } // 这里调用了NIOServerCnxn的process方法 // 这里将触发的事件发送到客户端去 w.process(e); } // 返回触发的监听器 return watchers; }
这里主要分析客户端的EventThread线程的run方法,至于接收响应就不再分析了,调用链如下
1.org.apache.zookeeper.ClientCnxn.EventThread#run
2.org.apache.zookeeper.ClientCnxn.EventThread#processEvent
/** * 下面是EventThread的run方法 * 也就是线程启动的执行方法 * 它是在创建原生客户端时启动的 */ @Override public void run() { try { isRunning = true; while (true) { // 从队列中拿到触发的事件 Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { // 处理事件 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); }
在processEvent
方法中直接调用了监听器的process
方法触发事件
这里是通过客户端的命令行的quit命令模拟的CloseSession操作
客户端命令行发送quit命令
1.org.apache.zookeeper.ZooKeeperMain#processZKCmd
这里先是在processZKCmd方法中判断命令类型并调用相关的函数进行处理
2.org.apache.zookeeper.ZooKeeper#close
3.org.apache.zookeeper.ClientCnxn#close
最后在close方法中将closeSession请求发送给服务端
同样我们直接分析处理器链到底做了什么
这里直接找到对于closeSession的处理,将临时节点封装成一个记录对象放到一个改变记录集合中去,会在后面的处理器中处理
这里并没有对CloseSession有什么处理
1.org.apache.zookeeper.server.FinalRequestProcessor#processRequest
2.org.apache.zookeeper.server.ZooKeeperServer#processTxn
3.org.apache.zookeeper.server.ZKDatabase#processTxn
4.org.apache.zookeeper.server.DataTree#processTxn
5.org.apache.zookeeper.server.DataTree#killSession
6.org.apache.zookeeper.server.DataTree#deleteNode
下面再DataTree中会删除临时节点
最后也会移除session
具体介绍请看ACL详细介绍
假设在客户端发送了如下命令
addauth digest zhangsan:12345
在客户端是用的addAuthInfo
方法处理的命令
服务端得从主线程开始分析,就是NIOServerCnxnFactory得run
方法
1.org.apache.zookeeper.server.NIOServerCnxnFactory#run
2.org.apache.zookeeper.server.NIOServerCnxn#doIO
3.org.apache.zookeeper.server.NIOServerCnxn#readPayload
4.org.apache.zookeeper.server.NIOServerCnxn#readRequest
5.org.apache.zookeeper.server.ZooKeeperServer#processPacket
在processPacket
方法中对这个权限操作做了处理
6.org.apache.zookeeper.server.auth.DigestAuthenticationProvider#handleAuthentication
这个方法将用户真正的保存在了服务端中了
public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) { String id = new String(authData); try { // 将获得的id生成一个签名 // id包含名字和密码或者ip String digest = generateDigest(id); if (digest.equals(superDigest)) { // 超级管理员 cnxn.addAuthInfo(new Id("super", "")); } // 将用户对象添加到一个集合authInfo中去(前提是不存在这个用户) // 这样服务端也就保存了用户的信息 cnxn.addAuthInfo(new Id(getScheme(), digest)); return KeeperException.Code.OK; } catch (NoSuchAlgorithmException e) { LOG.error("Missing algorithm",e); } return KeeperException.Code.AUTHFAILED; }
假设客户端发送如下的命令
setAcl /parent auth:zhangsan:123456:rdwca
在setAcl的时候其实也会验证权限,所以我们还是用上面那一条命令
只看分析服务端
checkACL
方法的具体验证过程如下
/** * * @param zks * @param acl:当前节点存在的用户及其权限 * @param perm:当前操作需要的权限 * @param ids:客户端当前的用户 * @throws KeeperException.NoAuthException */ static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm, List<Id> ids) throws KeeperException.NoAuthException { // 是否跳过验证 if (skipACL) { return; } // 如果当前节点没有任何权限用户就会直接跳过验证 if (acl == null || acl.size() == 0) { return; } // 有没有超级管理员 for (Id authId : ids) { if (authId.getScheme().equals("super")) { return; } } // 遍历当前结点所有权限用户 for (ACL a : acl) { Id id = a.getId(); /** * 首先会查看当前操作的权限在所有权限用户中是否有拥有权限的 * 这里是通过与操作判断的 * 权限的种类共有下面几种 * * int READ = 1 << 0; * int WRITE = 1 << 1; * int CREATE = 1 << 2; * int DELETE = 1 << 3; * int ADMIN = 1 << 4; * int ALL = READ | WRITE | CREATE | DELETE | ADMIN; * * 这样的话通过与操作需要的权限进行与操作就可以得出是否拥有操作的权限了 */ if ((a.getPerms() & perm) != 0) { if (id.getScheme().equals("world") && id.getId().equals("anyone")) { return; } AuthenticationProvider ap = ProviderRegistry.getProvider(id .getScheme()); if (ap != null) { // 遍历客户端的用户中是否存在拥有权限的用户 for (Id authId : ids) { if (authId.getScheme().equals(id.getScheme()) && ap.matches(authId.getId(), id.getId())) { return; } } } } } // 如果没有权限就会抛出异常 throw new KeeperException.NoAuthException(); }
下面是描述权限的结构
跳过验证的设置
这个验证权限的方法基本作了如下的判断:
如果最后没有权限用户就会抛出异常
需要注意的是更改权限时会有问题,客户端在更改一个节点中的一个用户的权限时,会更改其他的用户的权限。
这样会发现此节点的所有用户的权限都会变成刚刚设置的权限。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。