赞
踩
目录
Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper实现分布式锁,发布订阅(多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者)等功能。
Watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(Watcher 是一次性的操作)。 当然,可以通过循环监听去达到永久监听效果。
如何注册事件机制:
ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher客户端。注册 watcher 有 3 种方式,getData、exists、getChildren;
ZK的所有读操作都可以设置watch监视点: getData, getChildren, exists. 写操作则是不能设置监视点的。
监视有两种类型:数据监视点和子节点监视点。创建、删除或者设置znode都会触发这些监视点。exists,getData 可以设置数据监视点。getChildren 可以设置子节点变化。
而可能监测的事件类型有: None,NodeCreated, NodeDataChanged, NodeDeleted, NodeChildrenChanged.
- None // 客户端连接状态发生变化的时候 会收到None事件
- NodeCreated // 节点创建事件
- NodeDeleted // 节点删除事件
- NodeDataChanged // 节点数据变化
- NodeChildrenChanged // 子节点被创建 删除触发该事件
ZK 可以做到,只要数据一发生变化,就会通知相应地注册了监听的客户端。那么,它是怎么做到的呢?
其实原理应该是很简单的,四个步骤:
1. 客户端注册Watcher到服务端;
2. 服务端发生数据变更;
3. 服务端通知客户端数据变更;
4. 客户端回调Watcher处理变更应对逻辑;
#起两个客户端,都监听同一个节点 [root@ydt1 zookeeper-3.4.6]# ./bin/zkCli.sh [zk: localhost:2181(CONNECTED) 6] create /watchtest "abc" Created /watchtest [zk: localhost:2181(CONNECTED) 0] get /watchtest watch #监听节点变化 "111" cZxid = 0xf0000000a ctime = Wed Jul 29 17:00:30 CST 2020 mZxid = 0xf0000000b mtime = Wed Jul 29 17:01:03 CST 2020 pZxid = 0xf0000000a cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0 #再起一个客户端,用来修改上一个客户端的节点数据 [root@ydt2 zookeeper-3.4.6]# ./bin/zkCli.sh [zk: localhost:2181(CONNECTED) 0] set /watchtest "111" cZxid = 0xf0000000a ctime = Wed Jul 29 17:00:30 CST 2020 mZxid = 0xf0000000b mtime = Wed Jul 29 17:01:03 CST 2020 pZxid = 0xf0000000a cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 5 numChildren = 0 #回过头再看两个监听节点,都可以看大节点数据变化的信息 [zk: localhost:2181(CONNECTED) 1] WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest #客户端再次修改该节点数据,监听客户端不会有响应,说明监听是一次性的
package com.ydt.zookeeper.watcher; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; public class WatcherTest { ZooKeeper zk; @Before public void init() throws IOException, KeeperException, InterruptedException { zk= new ZooKeeper("ydt1:2181,ydt2:2181,ydt3:2181" , Integer.MAX_VALUE,new Watcher() { //全局监听 public void process(WatchedEvent watchedEvent) { //客户端回调Watcher System.out.println("-----------------------------------------"); System.out.println("connect state:" + watchedEvent.getState()); System.out.println("event type:" + watchedEvent.getType()); System.out.println("znode path:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); } } ); } /** * exists监听事件: * NodeCreated:节点创建 * NodeDeleted:节点删除 * NodeDataChanged:节点内容 * @throws IOException * @throws KeeperException * @throws InterruptedException */ @Test public void test1() throws KeeperException, InterruptedException { //exists注册监听 zk.exists("/watcher-exists", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("-----------------------------------------"); System.out.println("connect state:" + watchedEvent.getState()); System.out.println("event type:" + watchedEvent.getType()); System.out.println("znode path:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); try { zk.exists("/watcher-exists",this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); //不开启ACL,以持久化自动生成序列方式创建 zk.create("/watcher-exists", "watcher-exists".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //通过修改的事务类型操作来触发监听事件 zk.setData("/watcher-exists", "watcher-exists2".getBytes(), -1); //删除节点看看能否触发监听事件 zk.delete("/watcher-exists", -1); } /** * getData监听事件: * NodeDeleted:节点删除 * NodeDataChange:节点内容发生变化 * @throws IOException * @throws KeeperException * @throws InterruptedException */ @Test public void test2() throws IOException, KeeperException, InterruptedException { //不开启ACL,以持久化自动生成序列方式创建 zk.create("/watcher-getData", "watcher-getData".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //getData注册监听 zk.getData("/watcher-getData", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("-----------------------------------------"); System.out.println("connect state:" + watchedEvent.getState()); System.out.println("event type:" + watchedEvent.getType()); System.out.println("znode path:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); try { zk.getData("/watcher-getData",this,null); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } },null); //通过修改的事务类型操作来触发监听事件 zk.setData("/watcher-getData", "watcher-getData2".getBytes(), -1); //删除节点看看能否触发监听事件 zk.delete("/watcher-getData", -1); } /** * getChildren监听事件: * NodeChildrenChanged:子节点发生变化 * NodeDeleted:节点删除 * @throws IOException * @throws KeeperException * @throws InterruptedException */ @Test public void test3() throws IOException, KeeperException, InterruptedException { zk.create("/watcher-getChildren",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create("/watcher-getChildren/watcher-getChildren01","watcher-getChildren01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //getChildren注册监听 zk.getChildren("/watcher-getChildren", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("-----------------------------------------"); System.out.println("connect state:" + watchedEvent.getState()); System.out.println("event type:" + watchedEvent.getType()); System.out.println("znode path:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); try { zk.getChildren("/watcher-getChildren",this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); zk.setData("/watcher-getChildren/watcher-getChildren01","watcher-getChildren02".getBytes(), -1);//修改子节点 zk.delete("/watcher-getChildren/watcher-getChildren01", -1);//删除子节点 zk.delete("/watcher-getChildren", -1);//删除根节点 } }
原理流程:
客户端首先将 Watcher注册到服务端,同时将Watcher对象保存到客户端的Watcher管理器中。当Zookeeper
服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的 Watcher管理器会触发相关 Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程
public interface Watcher { /** * Event的状态 */ public interface Event { /** * 在事件发生时,ZooKeeper的状态 */ public enum KeeperState { @Deprecated Unknown (-1), Disconnected (0), @Deprecated NoSyncConnected (1), SyncConnected (3), AuthFailed (4), ConnectedReadOnly (5), SaslAuthenticated(6), Expired (-112); private final int intValue; KeeperState( int intValue) { this.intValue = intValue; } ...... } /** * ZooKeeper中的事件 */ public enum EventType { None (-1), NodeCreated (1), NodeDeleted (2), NodeDataChanged (3), NodeChildrenChanged (4); private final int intValue; // Integer representation of value // for sending over wire EventType( int intValue) { this.intValue = intValue; } ...... } } //Watcher的回调方法 abstract public void process(WatchedEvent event); }
//-------------------------------------ZooKeeper.java---------------------------------------- //初始化zookeeper客户端时会将默认的监听器设置到监听器管理中 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); //设置默认监听器 watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); //事件信息的传输,通知处理,可以从下面方法可以看出来 cnxn.start(); } -------> public void start() { //负责客户端和服务器端的数据通信, 也包括事件信息的传输和心跳 sendThread.start(); //主要在客户端回调注册的 Watchers 进行通知处理 eventThread.start(); }
-
- public List<String> getChildren(final String path, Watcher watcher)
- throws KeeperException, InterruptedException
- {
- final String clientPath = path;
- PathUtils. validatePath(clientPath);
-
- WatchRegistration wcb = null;
- //如果watcher不等于null, 构建WatchRegistration对象,执行回调的时候会用到
- //该对象描述了watcher和path之间的关系
- if (watcher != null) {
- wcb = new ChildWatchRegistration(watcher, clientPath);
- }
-
- //在传入的path加上root path前缀,构成服务器端的绝对路径
- final String serverPath = prependChroot(clientPath);
-
- //构建RequestHeader对象
- RequestHeader h = new RequestHeader();
- //设置操作类型为OpCode. getChildren
- h.setType(ZooDefs.OpCode. getChildren);
- //构建GetChildrenRequest对象
- GetChildrenRequest request = new GetChildrenRequest();
- //设置path
- request.setPath(serverPath);
- //设置是否使用watcher
- request.setWatch(watcher != null);
- //构建GetChildrenResponse对象
- GetChildrenResponse response = new GetChildrenResponse();
- //提交请求,通过客户端的网络处理类去提交请求,并阻塞等待结果
- ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
- if (r.getErr() != 0) {
- throw KeeperException.create(KeeperException.Code. get(r.getErr()),
- clientPath);
- }
- return response.getChildren();
- }
-
-
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader();//响应头 //组装请求入队 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { //阻塞等待请求结果 while (!packet.finished) { packet.wait(); } } return r; } Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. // 这个队列就是存放我们请求的队列,注意,我们还没有为包生成Xid。 // 它是在发送时生成,通过实现ClientCnxnSocket::doIO(),数据包实际发送的地方 synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } //请求包入队 outgoingQueue.add(packet); } } //请求包入列后,唤醒客户发送线程,将请求包发送出去,进入到发送线程SendThread.run()方法中查看 sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; while (state.isAlive()) { try { //判断当前session会话中socket请求是否还连接着 if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); } //校验判断客户端与服务器连接状态 if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } //心跳维持操作,心跳次数复位 if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server //如果我们处于只读模式节点,请根据情况查找读or写服务器 if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); } //调用ClientCnxnSocketNIO发起socket网络请求 clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } //省略......................................................
- //发送前一些判断
- void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
- ClientCnxn cnxn)
- throws IOException, InterruptedException {
- selector.select(waitTimeOut);
- Set<SelectionKey> selected;
- synchronized (this) {
- selected = selector.selectedKeys();
- }
- // Everything below and until we get back to the select is
- // non blocking, so time is effectively a constant. That is
- // Why we just have to do this once, here
- updateNow();
- for (SelectionKey k : selected) {
- SocketChannel sc = ((SocketChannel) k.channel());
- // readyOps :获取此键上ready操作集合.即在当前通道上已经就绪的事件
- // SelectKey.OP_CONNECT 连接就绪事件,表示客户与服务器的连接已经建立成功
- // 如果全部已经就绪,进行一次复位处理,包括心跳时间和发送时间
- if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
- if (sc.finishConnect()) {
- updateLastSendAndHeard();
- sendThread.primeConnection();
- }
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- //读或者写通道准备完毕,进行IO传输
- doIO(pendingQueue, outgoingQueue, cnxn);
- }
- }
- if (sendThread.getZkState().isConnected()) {
- synchronized(outgoingQueue) {
- if (findSendablePacket(outgoingQueue,
- cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
- enableWrite();
- }
- }
- }
- selected.clear();
- }
- //...................................NIOServerCnxnFactory.java.................................
- public void run() {
- // socket不是关闭状态就进入阻塞
- while (!ss.socket().isClosed()) {
- try {
- selector.select(1000);
- Set<SelectionKey> selected;
- synchronized (this) {
- //获取事件键列表
- selected = selector.selectedKeys();
- }
- ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
- selected);
- Collections.shuffle(selectedList);
- //遍历事件keys
- for (SelectionKey k : selectedList) {
- //就绪等待连接事件那就先创建连接
- if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
- SocketChannel sc = ((ServerSocketChannel) k
- .channel()).accept();
- InetAddress ia = sc.socket().getInetAddress();
- int cnxncount = getClientCnxnCount(ia);
- if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
- LOG.warn("Too many connections from " + ia
- + " - max is " + maxClientCnxns );
- sc.close();
- } else {
- LOG.info("Accepted socket connection from "
- + sc.socket().getRemoteSocketAddress());
- sc.configureBlocking(false);
- SelectionKey sk = sc.register(selector,
- SelectionKey.OP_READ);
- NIOServerCnxn cnxn = createConnection(sc, sk);
- sk.attach(cnxn);
- addCnxn(cnxn);
- }
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- // 就绪读写事件,开始执行(包括处理消息,返回监听)
- NIOServerCnxn c = (NIOServerCnxn) k.attachment();
- c.doIO(k);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unexpected ops in select "
- + k.readyOps());
- }
- }
- }
- selected.clear();
- } catch (RuntimeException e) {
- LOG.warn("Ignoring unexpected runtime exception", e);
- } catch (Exception e) {
- LOG.warn("Ignoring exception", e);
- }
- }
- closeAll();
- LOG.info("NIOServerCnxn factory exited run method");
- }
在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。
如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。
其关SetData请求的时序图如下:
4.6.1 DataTree.setData()方法
根据上面的时序图,一路跟踪到DataTree.setData方法:
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); //根据path, 获得DataNode对象n DataNode n = nodes.get(path); //如果n为null, 则抛出NoNodeException异常 if (n == null) { throw new KeeperException.NoNodeException(); } byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. // 更新数据 String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } //触发Watcher dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
4.6.2 WatchManager触发Watcher
- public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
- WatchedEvent e = new WatchedEvent(type,
- KeeperState.SyncConnected, path);
- HashSet<Watcher> watchers;
- synchronized (this) {
- //从watchTable删除掉path对应的watcher
- watchers = watchTable.remove(path);
- if (watchers == null || watchers.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.EVENT_DELIVERY_TRACE_MASK,
- "No watchers for " + path);
- }
- return null;
- }
- for (Watcher w : watchers) {
- HashSet<String> paths = watch2Paths.get(w);
- if (paths != null) {
- paths.remove(path);
- }
- }
- }
- //循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象
- for (Watcher w : watchers) {
- if (supress != null && supress.contains(w)) {
- continue;
- }
- //ServerCnxn执行监控事件对象
- w.process(e);
- }
- return watchers;
- }
4.6.3 ServerCnxn发送notification
包括NIOServerCnxn和NettyServerCnxn,其实代码逻辑都一样
//NIO synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); //发送notification给Client端 sendResponse(h, e, "notification"); } //Netty public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); try { //发送notification给Client端 sendResponse(h, e, "notification"); } catch (IOException e1) { if (LOG.isDebugEnabled()) { LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1); } close(); } } protected void internalSendBuffer(ByteBuffer bb) { if (bb != ServerCnxnFactory.closeConn) { // We check if write interest here because if it is NOT set, // nothing is queued, so we can try to send the buffer right // away without waking up the selector if(sk.isValid() && ((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) { try { sock.write(bb);//通过socket向客户端写入字节流,调度process } catch (IOException e) { // we are just doing best effort right now } } // if there is nothing left to send, we are done if (bb.remaining() == 0) { packetSent(); return; } } synchronized(this.factory){ sk.selector().wakeup(); if (LOG.isTraceEnabled()) { LOG.trace("Add a buffer to outgoingBuffers, sk " + sk + " is valid: " + sk.isValid()); } outgoingBuffers.add(bb); if (sk.isValid()) { sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } } } //至于远程客户端地址端口怎么知道,初始化ServerCnxn的时候就设置了 public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory) throws IOException { this.zkServer = zk; this.sock = sock;//保存客户端sock对象 this.sk = sk; this.factory = factory; if (this.factory.login != null) { this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login); } if (zk != null) { outstandingLimit = zk.getGlobalOutstandingLimit(); } sock.socket().setTcpNoDelay(true); /* set socket linger to false, so that socket close does not * block */ sock.socket().setSoLinger(false, -1); InetAddress addr = ((InetSocketAddress) sock.socket() .getRemoteSocketAddress()).getAddress(); authInfo.add(new Id("ip", addr.getHostAddress())); sk.interestOps(SelectionKey.OP_READ); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。