当前位置:   article > 正文

Zookeeper的Watcher机制及Watcher原理分析_zk监听

zk监听

目录

 

1、什么是Watcher监听机制

2、Zookeeper命令实现

3、Java API实现

4、源码解析

4.1 Watcher接口

4.2 注册全局监听器

4.3 注册监听器(getChildren)

4.4 请求包入列并发送

4.5 服务器端循环监听

4.6 触发Watcher(setData)


1、什么是Watcher监听机制

Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper实现分布式锁,发布订阅(多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者)等功能。

  Watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(Watcher 是一次性的操作)。 当然,可以通过循环监听去达到永久监听效果。

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

如何注册事件机制:

  ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher客户端。注册 watcher 有 3 种方式,getData、exists、getChildren

ZK的所有读操作都可以设置watch监视点: getData, getChildren, exists. 写操作则是不能设置监视点的。

  监视有两种类型:数据监视点和子节点监视点。创建、删除或者设置znode都会触发这些监视点。exists,getData 可以设置数据监视点。getChildren 可以设置子节点变化。

  而可能监测的事件类型有: None,NodeCreated, NodeDataChanged, NodeDeleted, NodeChildrenChanged.

  1.   None // 客户端连接状态发生变化的时候 会收到None事件
  2.   NodeCreated // 节点创建事件
  3.   NodeDeleted // 节点删除事件
  4.   NodeDataChanged // 节点数据变化
  5.   NodeChildrenChanged // 子节点被创建 删除触发该事件

  ZK 可以做到,只要数据一发生变化,就会通知相应地注册了监听的客户端。那么,它是怎么做到的呢?

其实原理应该是很简单的,四个步骤:

1. 客户端注册Watcher到服务端;

2. 服务端发生数据变更;

3. 服务端通知客户端数据变更;

4. 客户端回调Watcher处理变更应对逻辑;

2、Zookeeper命令实现

  1. #起两个客户端,都监听同一个节点
  2. [root@ydt1 zookeeper-3.4.6]# ./bin/zkCli.sh
  3. [zk: localhost:2181(CONNECTED) 6] create /watchtest "abc"
  4. Created /watchtest
  5. [zk: localhost:2181(CONNECTED) 0] get /watchtest watch #监听节点变化
  6. "111"
  7. cZxid = 0xf0000000a
  8. ctime = Wed Jul 29 17:00:30 CST 2020
  9. mZxid = 0xf0000000b
  10. mtime = Wed Jul 29 17:01:03 CST 2020
  11. pZxid = 0xf0000000a
  12. cversion = 0
  13. dataVersion = 1
  14. aclVersion = 0
  15. ephemeralOwner = 0x0
  16. dataLength = 5
  17. numChildren = 0
  18. #再起一个客户端,用来修改上一个客户端的节点数据
  19. [root@ydt2 zookeeper-3.4.6]# ./bin/zkCli.sh
  20. [zk: localhost:2181(CONNECTED) 0] set /watchtest "111"
  21. cZxid = 0xf0000000a
  22. ctime = Wed Jul 29 17:00:30 CST 2020
  23. mZxid = 0xf0000000b
  24. mtime = Wed Jul 29 17:01:03 CST 2020
  25. pZxid = 0xf0000000a
  26. cversion = 0
  27. dataVersion = 1
  28. aclVersion = 0
  29. ephemeralOwner = 0x0
  30. dataLength = 5
  31. numChildren = 0
  32. #回过头再看两个监听节点,都可以看大节点数据变化的信息
  33. [zk: localhost:2181(CONNECTED) 1]
  34. WATCHER::
  35. WatchedEvent state:SyncConnected type:NodeDataChanged path:/watchtest
  36. #客户端再次修改该节点数据,监听客户端不会有响应,说明监听是一次性的

 

3、Java API实现

  1. package com.ydt.zookeeper.watcher;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import org.junit.Before;
  5. import org.junit.Test;
  6. import java.io.IOException;
  7. public class WatcherTest {
  8.    ZooKeeper zk;
  9.    @Before
  10.    public void init() throws IOException, KeeperException, InterruptedException {
  11.        zk= new ZooKeeper("ydt1:2181,ydt2:2181,ydt3:2181"
  12.               , Integer.MAX_VALUE,new Watcher() {
  13.            //全局监听
  14.            public void process(WatchedEvent watchedEvent) {
  15.                //客户端回调Watcher
  16.                System.out.println("-----------------------------------------");
  17.                System.out.println("connect state:" + watchedEvent.getState());
  18.                System.out.println("event type:" + watchedEvent.getType());
  19.                System.out.println("znode path:" + watchedEvent.getPath());
  20.                System.out.println("-----------------------------------------");
  21.           }
  22.       }
  23.       );
  24.   }
  25.    /**
  26.     * exists监听事件:
  27.     *     NodeCreated:节点创建
  28.     *     NodeDeleted:节点删除
  29.     *     NodeDataChanged:节点内容
  30.     * @throws IOException
  31.     * @throws KeeperException
  32.     * @throws InterruptedException
  33.     */
  34.    @Test
  35.    public void test1() throws KeeperException, InterruptedException {
  36.        //exists注册监听
  37.        zk.exists("/watcher-exists", new Watcher() {
  38.            public void process(WatchedEvent watchedEvent) {
  39.                System.out.println("-----------------------------------------");
  40.                System.out.println("connect state:" + watchedEvent.getState());
  41.                System.out.println("event type:" + watchedEvent.getType());
  42.                System.out.println("znode path:" + watchedEvent.getPath());
  43.                System.out.println("-----------------------------------------");
  44.                try {
  45.                    zk.exists("/watcher-exists",this);
  46.               } catch (KeeperException e) {
  47.                    e.printStackTrace();
  48.               } catch (InterruptedException e) {
  49.                    e.printStackTrace();
  50.               }
  51.           }
  52.       });
  53.        //不开启ACL,以持久化自动生成序列方式创建
  54.        zk.create("/watcher-exists", "watcher-exists".getBytes()
  55.               , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  56.        //通过修改的事务类型操作来触发监听事件
  57.        zk.setData("/watcher-exists", "watcher-exists2".getBytes(), -1);
  58.        //删除节点看看能否触发监听事件
  59.        zk.delete("/watcher-exists", -1);
  60.   }
  61.    /**
  62.     * getData监听事件:
  63.     *     NodeDeleted:节点删除
  64.     *     NodeDataChange:节点内容发生变化
  65.     * @throws IOException
  66.     * @throws KeeperException
  67.     * @throws InterruptedException
  68.     */
  69.    @Test
  70.    public void test2() throws IOException, KeeperException, InterruptedException {
  71.        //不开启ACL,以持久化自动生成序列方式创建
  72.        zk.create("/watcher-getData", "watcher-getData".getBytes()
  73.               , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  74.        //getData注册监听
  75.        zk.getData("/watcher-getData", new Watcher() {
  76.            public void process(WatchedEvent watchedEvent) {
  77.                System.out.println("-----------------------------------------");
  78.                System.out.println("connect state:" + watchedEvent.getState());
  79.                System.out.println("event type:" + watchedEvent.getType());
  80.                System.out.println("znode path:" + watchedEvent.getPath());
  81.                System.out.println("-----------------------------------------");
  82.                try {
  83.                    zk.getData("/watcher-getData",this,null);
  84.               } catch (KeeperException e) {
  85.                    e.printStackTrace();
  86.               } catch (InterruptedException e) {
  87.                    e.printStackTrace();
  88.               }
  89.           }
  90.       },null);
  91.        //通过修改的事务类型操作来触发监听事件
  92.        zk.setData("/watcher-getData", "watcher-getData2".getBytes(), -1);
  93.        //删除节点看看能否触发监听事件
  94.        zk.delete("/watcher-getData", -1);
  95.   }
  96.    /**
  97.     * getChildren监听事件:
  98.     *     NodeChildrenChanged:子节点发生变化
  99.     *     NodeDeleted:节点删除
  100.     * @throws IOException
  101.     * @throws KeeperException
  102.     * @throws InterruptedException
  103.     */
  104.    @Test
  105.    public void test3() throws IOException, KeeperException, InterruptedException {
  106.        zk.create("/watcher-getChildren",null,
  107.                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  108.        zk.create("/watcher-getChildren/watcher-getChildren01","watcher-getChildren01".getBytes(),
  109.                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  110.        //getChildren注册监听
  111.        zk.getChildren("/watcher-getChildren", new Watcher() {
  112.            public void process(WatchedEvent watchedEvent) {
  113.                System.out.println("-----------------------------------------");
  114.                System.out.println("connect state:" + watchedEvent.getState());
  115.                System.out.println("event type:" + watchedEvent.getType());
  116.                System.out.println("znode path:" + watchedEvent.getPath());
  117.                System.out.println("-----------------------------------------");
  118.                try {
  119.                    zk.getChildren("/watcher-getChildren",this);
  120.               } catch (KeeperException e) {
  121.                    e.printStackTrace();
  122.               } catch (InterruptedException e) {
  123.                    e.printStackTrace();
  124.               }
  125.           }
  126.       });
  127.        zk.setData("/watcher-getChildren/watcher-getChildren01","watcher-getChildren02".getBytes(), -1);//修改子节点
  128.        zk.delete("/watcher-getChildren/watcher-getChildren01", -1);//删除子节点
  129.        zk.delete("/watcher-getChildren", -1);//删除根节点
  130.   }
  131. }

 

4、源码解析

原理流程:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

客户端首先将 Watcher注册到服务端,同时将Watcher对象保存到客户端的Watcher管理器中。当Zookeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的 Watcher管理器会触发相关 Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程

4.1 Watcher接口

  1. public interface Watcher {
  2.    /**
  3.     * Event的状态
  4.     */
  5.    public interface Event {
  6.        /**
  7.         * 在事件发生时,ZooKeeper的状态
  8.         */
  9.        public enum KeeperState {
  10.            @Deprecated
  11.            Unknown (-1),
  12.            Disconnected (0),
  13.            @Deprecated
  14.            NoSyncConnected (1),
  15.            SyncConnected (3),
  16.            AuthFailed (4),
  17.            ConnectedReadOnly (5),
  18.            SaslAuthenticated(6),
  19.            Expired (-112);
  20.            private final int intValue;  
  21.            KeeperState( int intValue) {
  22.                this.intValue = intValue;
  23.           }  
  24.           ......
  25.       }
  26.        /**
  27.         * ZooKeeper中的事件
  28.         */
  29.        public enum EventType {
  30.            None (-1),
  31.            NodeCreated (1),
  32.            NodeDeleted (2),
  33.            NodeDataChanged (3),
  34.            NodeChildrenChanged (4);
  35.            private final int intValue;     // Integer representation of value
  36.                                            // for sending over wire
  37.            EventType( int intValue) {
  38.                this.intValue = intValue;
  39.           }
  40.           ......  
  41.       }
  42.   }
  43.    //Watcher的回调方法
  44.    abstract public void process(WatchedEvent event);
  45. }

4.2 注册全局监听器

  1. //-------------------------------------ZooKeeper.java----------------------------------------
  2. //初始化zookeeper客户端时会将默认的监听器设置到监听器管理中
  3.    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  4.            boolean canBeReadOnly)
  5.        throws IOException
  6.   {
  7.        LOG.info("Initiating client connection, connectString=" + connectString
  8.                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
  9.        //设置默认监听器
  10.        watchManager.defaultWatcher = watcher;
  11.        ConnectStringParser connectStringParser = new ConnectStringParser(
  12.                connectString);
  13.        HostProvider hostProvider = new StaticHostProvider(
  14.                connectStringParser.getServerAddresses());
  15.        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
  16.                hostProvider, sessionTimeout, this, watchManager,
  17.                getClientCnxnSocket(), canBeReadOnly);
  18.        //事件信息的传输,通知处理,可以从下面方法可以看出来
  19.        cnxn.start();
  20.   }
  21.    
  22.    ------->
  23.    public void start() {
  24.        //负责客户端和服务器端的数据通信, 也包括事件信息的传输和心跳
  25.        sendThread.start();
  26.        //主要在客户端回调注册的 Watchers 进行通知处理
  27.        eventThread.start();
  28.   }

 

4.3 注册监听器(getChildren)

  1. public List<String> getChildren(final String path, Watcher watcher)
  2.    throws KeeperException, InterruptedException
  3. {
  4.    final String clientPath = path;
  5.    PathUtils. validatePath(clientPath);
  6.    WatchRegistration wcb = null;
  7.    //如果watcher不等于null, 构建WatchRegistration对象,执行回调的时候会用到
  8.    //该对象描述了watcher和path之间的关系
  9.    if (watcher != null) {
  10.        wcb = new ChildWatchRegistration(watcher, clientPath);
  11.   }
  12.    
  13.    //在传入的path加上root path前缀,构成服务器端的绝对路径
  14.    final String serverPath = prependChroot(clientPath);
  15.    
  16.    //构建RequestHeader对象
  17.    RequestHeader h = new RequestHeader();
  18.    //设置操作类型为OpCode. getChildren
  19.    h.setType(ZooDefs.OpCode. getChildren);
  20.    //构建GetChildrenRequest对象
  21.    GetChildrenRequest request = new GetChildrenRequest();
  22.    //设置path
  23.    request.setPath(serverPath);
  24.    //设置是否使用watcher
  25.    request.setWatch(watcher != null);
  26.    //构建GetChildrenResponse对象
  27.    GetChildrenResponse response = new GetChildrenResponse();
  28.    //提交请求,通过客户端的网络处理类去提交请求,并阻塞等待结果
  29.    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
  30.    if (r.getErr() != 0) {
  31.        throw KeeperException.create(KeeperException.Code. get(r.getErr()),
  32.                clientPath);
  33.   }
  34.    return response.getChildren();
  35. }

4.4 请求包入列并发送

  1. public ReplyHeader submitRequest(RequestHeader h, Record request,
  2.            Record response, WatchRegistration watchRegistration)
  3.            throws InterruptedException {
  4.        ReplyHeader r = new ReplyHeader();//响应头
  5.   //组装请求入队
  6.        Packet packet = queuePacket(h, r, request, response, null, null, null,
  7.                    null, watchRegistration);
  8.        synchronized (packet) {
  9.            //阻塞等待请求结果
  10.            while (!packet.finished) {
  11.                packet.wait();
  12.           }
  13.       }
  14.        return r;
  15.   }
  16.    
  17. Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
  18.            Record response, AsyncCallback cb, String clientPath,
  19.            String serverPath, Object ctx, WatchRegistration watchRegistration)
  20.   {
  21.        Packet packet = null;
  22.        // Note that we do not generate the Xid for the packet yet. It is
  23.        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
  24.        // where the packet is actually sent.
  25.        // 这个队列就是存放我们请求的队列,注意,我们还没有为包生成Xid。
  26.        // 它是在发送时生成,通过实现ClientCnxnSocket::doIO(),数据包实际发送的地方
  27.        synchronized (outgoingQueue) {
  28.            packet = new Packet(h, r, request, response, watchRegistration);
  29.            packet.cb = cb;
  30.            packet.ctx = ctx;
  31.            packet.clientPath = clientPath;
  32.            packet.serverPath = serverPath;
  33.            if (!state.isAlive() || closing) {
  34.                conLossPacket(packet);
  35.           } else {
  36.                // If the client is asking to close the session then
  37.                // mark as closing
  38.                if (h.getType() == OpCode.closeSession) {
  39.                    closing = true;
  40.               }
  41.                //请求包入队
  42.                outgoingQueue.add(packet);
  43.           }
  44.       }
  45.        //请求包入列后,唤醒客户发送线程,将请求包发送出去,进入到发送线程SendThread.run()方法中查看
  46.        sendThread.getClientCnxnSocket().wakeupCnxn();
  47.        return packet;
  48.   }
  1. public void run() {
  2.            clientCnxnSocket.introduce(this,sessionId);
  3.            clientCnxnSocket.updateNow();
  4.            clientCnxnSocket.updateLastSendAndHeard();
  5.            int to;
  6.            long lastPingRwServer = Time.currentElapsedTime();
  7.            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
  8.            InetSocketAddress serverAddress = null;
  9.            while (state.isAlive()) {
  10.                try {
  11.                    //判断当前session会话中socket请求是否还连接着
  12.                    if (!clientCnxnSocket.isConnected()) {
  13.                        if(!isFirstConnect){
  14.                            try {
  15.                                Thread.sleep(r.nextInt(1000));
  16.                           } catch (InterruptedException e) {
  17.                                LOG.warn("Unexpected exception", e);
  18.                           }
  19.                       }
  20.                        // don't re-establish connection if we are closing
  21.                        if (closing || !state.isAlive()) {
  22.                            break;
  23.                       }
  24.                        if (rwServerAddress != null) {
  25.                            serverAddress = rwServerAddress;
  26.                            rwServerAddress = null;
  27.                       } else {
  28.                            serverAddress = hostProvider.next(1000);
  29.                       }
  30.                        startConnect(serverAddress);
  31.                        clientCnxnSocket.updateLastSendAndHeard();
  32.                   }
  33.                    //校验判断客户端与服务器连接状态
  34.                    if (state.isConnected()) {
  35.                        // determine whether we need to send an AuthFailed event.
  36.                        if (zooKeeperSaslClient != null) {
  37.                            boolean sendAuthEvent = false;
  38.                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
  39.                                try {
  40.                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
  41.                               } catch (SaslException e) {
  42.                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
  43.                                    state = States.AUTH_FAILED;
  44.                                    sendAuthEvent = true;
  45.                               }
  46.                           }
  47.                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
  48.                            if (authState != null) {
  49.                                if (authState == KeeperState.AuthFailed) {
  50.                                    // An authentication error occurred during authentication with the Zookeeper Server.
  51.                                    state = States.AUTH_FAILED;
  52.                                    sendAuthEvent = true;
  53.                               } else {
  54.                                    if (authState == KeeperState.SaslAuthenticated) {
  55.                                        sendAuthEvent = true;
  56.                                   }
  57.                               }
  58.                           }
  59.                            if (sendAuthEvent == true) {
  60.                                eventThread.queueEvent(new WatchedEvent(
  61.                                      Watcher.Event.EventType.None,
  62.                                      authState,null));
  63.                           }
  64.                       }
  65.                        to = readTimeout - clientCnxnSocket.getIdleRecv();
  66.                   } else {
  67.                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
  68.                   }
  69.                    
  70.                    if (to <= 0) {
  71.                        String warnInfo;
  72.                        warnInfo = "Client session timed out, have not heard from server in "
  73.                            + clientCnxnSocket.getIdleRecv()
  74.                            + "ms"
  75.                            + " for sessionid 0x"
  76.                            + Long.toHexString(sessionId);
  77.                        LOG.warn(warnInfo);
  78.                        throw new SessionTimeoutException(warnInfo);
  79.                   }
  80.                    //心跳维持操作,心跳次数复位
  81.                    if (state.isConnected()) {
  82.                   //1000(1 second) is to prevent race condition missing to send the second ping
  83.                   //also make sure not to send too many pings when readTimeout is small
  84.                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
  85.                       ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
  86.                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
  87.                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
  88.                            sendPing();
  89.                            clientCnxnSocket.updateLastSend();
  90.                       } else {
  91.                            if (timeToNextPing < to) {
  92.                                to = timeToNextPing;
  93.                           }
  94.                       }
  95.                   }
  96.                    // If we are in read-only mode, seek for read/write server
  97.                    //如果我们处于只读模式节点,请根据情况查找读or写服务器
  98.                    if (state == States.CONNECTEDREADONLY) {
  99.                        long now = Time.currentElapsedTime();
  100.                        int idlePingRwServer = (int) (now - lastPingRwServer);
  101.                        if (idlePingRwServer >= pingRwTimeout) {
  102.                            lastPingRwServer = now;
  103.                            idlePingRwServer = 0;
  104.                            pingRwTimeout =
  105.                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
  106.                            pingRwServer();
  107.                       }
  108.                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
  109.                   }
  110.                    //调用ClientCnxnSocketNIO发起socket网络请求
  111.                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
  112.               }
  113.                //省略......................................................
  1. //发送前一些判断
  2. void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
  3.                     ClientCnxn cnxn)
  4.            throws IOException, InterruptedException {
  5.        selector.select(waitTimeOut);
  6.        Set<SelectionKey> selected;
  7.        synchronized (this) {
  8.            selected = selector.selectedKeys();
  9.       }
  10.        // Everything below and until we get back to the select is
  11.        // non blocking, so time is effectively a constant. That is
  12.        // Why we just have to do this once, here
  13.        updateNow();
  14.        for (SelectionKey k : selected) {
  15.            SocketChannel sc = ((SocketChannel) k.channel());
  16.            // readyOps :获取此键上ready操作集合.即在当前通道上已经就绪的事件
  17.            // SelectKey.OP_CONNECT 连接就绪事件,表示客户与服务器的连接已经建立成功
  18.            // 如果全部已经就绪,进行一次复位处理,包括心跳时间和发送时间
  19.            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
  20.                if (sc.finishConnect()) {
  21.                    updateLastSendAndHeard();
  22.                    sendThread.primeConnection();
  23.               }
  24.           } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
  25.                //读或者写通道准备完毕,进行IO传输
  26.                doIO(pendingQueue, outgoingQueue, cnxn);
  27.           }
  28.       }
  29.        if (sendThread.getZkState().isConnected()) {
  30.            synchronized(outgoingQueue) {
  31.                if (findSendablePacket(outgoingQueue,
  32.                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
  33.                    enableWrite();
  34.               }
  35.           }
  36.       }
  37.        selected.clear();
  38.   }

4.5 服务器端循环监听

  1. //...................................NIOServerCnxnFactory.java.................................
  2. public void run() {
  3.        // socket不是关闭状态就进入阻塞
  4.        while (!ss.socket().isClosed()) {
  5.            try {
  6.                selector.select(1000);
  7.                Set<SelectionKey> selected;
  8.                synchronized (this) {
  9.                    //获取事件键列表
  10.                    selected = selector.selectedKeys();
  11.               }
  12.                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
  13.                        selected);
  14.                Collections.shuffle(selectedList);
  15.                //遍历事件keys
  16.                for (SelectionKey k : selectedList) {
  17.                    //就绪等待连接事件那就先创建连接
  18.                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
  19.                        SocketChannel sc = ((ServerSocketChannel) k
  20.                               .channel()).accept();
  21.                        InetAddress ia = sc.socket().getInetAddress();
  22.                        int cnxncount = getClientCnxnCount(ia);
  23.                        if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
  24.                            LOG.warn("Too many connections from " + ia
  25.                                     + " - max is " + maxClientCnxns );
  26.                            sc.close();
  27.                       } else {
  28.                            LOG.info("Accepted socket connection from "
  29.                                     + sc.socket().getRemoteSocketAddress());
  30.                            sc.configureBlocking(false);
  31.                            SelectionKey sk = sc.register(selector,
  32.                                    SelectionKey.OP_READ);
  33.                            NIOServerCnxn cnxn = createConnection(sc, sk);
  34.                            sk.attach(cnxn);
  35.                            addCnxn(cnxn);
  36.                       }
  37.                   } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
  38.                        // 就绪读写事件,开始执行(包括处理消息,返回监听)
  39.                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
  40.                        c.doIO(k);
  41.                   } else {
  42.                        if (LOG.isDebugEnabled()) {
  43.                            LOG.debug("Unexpected ops in select "
  44.                                      + k.readyOps());
  45.                       }
  46.                   }
  47.               }
  48.                selected.clear();
  49.           } catch (RuntimeException e) {
  50.                LOG.warn("Ignoring unexpected runtime exception", e);
  51.           } catch (Exception e) {
  52.                LOG.warn("Ignoring exception", e);
  53.           }
  54.       }
  55.        closeAll();
  56.        LOG.info("NIOServerCnxn factory exited run method");
  57.   }

 

4.6 触发Watcher(setData)

在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。

如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。

其关SetData请求的时序图如下:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

4.6.1 DataTree.setData()方法

根据上面的时序图,一路跟踪到DataTree.setData方法:

  1. public Stat setData(String path, byte data[], int version, long zxid,
  2.            long time) throws KeeperException.NoNodeException {
  3.        Stat s = new Stat();
  4.        //根据path, 获得DataNode对象n
  5.        DataNode n = nodes.get(path);
  6.        //如果n为null, 则抛出NoNodeException异常
  7.        if (n == null) {
  8.            throw new KeeperException.NoNodeException();
  9.       }
  10.        byte lastdata[] = null;
  11.        synchronized (n) {
  12.            lastdata = n.data;
  13.            n.data = data;
  14.            n.stat.setMtime(time);
  15.            n.stat.setMzxid(zxid);
  16.            n.stat.setVersion(version);
  17.            n.copyStat(s);
  18.       }
  19.        // now update if the path is in a quota subtree.
  20.   // 更新数据
  21.        String lastPrefix;
  22.        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
  23.          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
  24.              - (lastdata == null ? 0 : lastdata.length));
  25.       }
  26.        //触发Watcher
  27.        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
  28.        return s;
  29.   }

4.6.2 WatchManager触发Watcher

  1. public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
  2.        WatchedEvent e = new WatchedEvent(type,
  3.                KeeperState.SyncConnected, path);
  4.        HashSet<Watcher> watchers;
  5.        synchronized (this) {
  6.            //从watchTable删除掉path对应的watcher
  7.            watchers = watchTable.remove(path);
  8.            if (watchers == null || watchers.isEmpty()) {
  9.                if (LOG.isTraceEnabled()) {
  10.                    ZooTrace.logTraceMessage(LOG,
  11.                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
  12.                            "No watchers for " + path);
  13.               }
  14.                return null;
  15.           }
  16.            for (Watcher w : watchers) {
  17.                HashSet<String> paths = watch2Paths.get(w);
  18.                if (paths != null) {
  19.                    paths.remove(path);
  20.               }
  21.           }
  22.       }
  23.        //循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象
  24.        for (Watcher w : watchers) {
  25.            if (supress != null && supress.contains(w)) {
  26.                continue;
  27.           }
  28.            //ServerCnxn执行监控事件对象
  29.            w.process(e);
  30.       }
  31.        return watchers;
  32.   }

4.6.3 ServerCnxn发送notification

包括NIOServerCnxn和NettyServerCnxn,其实代码逻辑都一样

  1. //NIO
  2. synchronized public void process(WatchedEvent event) {
  3.        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
  4.        if (LOG.isTraceEnabled()) {
  5.            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
  6.                                     "Deliver event " + event + " to 0x"
  7.                                     + Long.toHexString(this.sessionId)
  8.                                     + " through " + this);
  9.       }
  10.        // Convert WatchedEvent to a type that can be sent over the wire
  11.        WatcherEvent e = event.getWrapper();
  12.        //发送notification给Client端
  13.        sendResponse(h, e, "notification");
  14.   }
  15.    
  16.    //Netty
  17.    public void process(WatchedEvent event) {
  18.        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
  19.        if (LOG.isTraceEnabled()) {
  20.            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
  21.                                     "Deliver event " + event + " to 0x"
  22.                                     + Long.toHexString(this.sessionId)
  23.                                     + " through " + this);
  24.       }
  25.        // Convert WatchedEvent to a type that can be sent over the wire
  26.        WatcherEvent e = event.getWrapper();
  27.        try {
  28.            //发送notification给Client端
  29.            sendResponse(h, e, "notification");
  30.       } catch (IOException e1) {
  31.            if (LOG.isDebugEnabled()) {
  32.                LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
  33.           }
  34.            close();
  35.       }
  36.   }
  37. protected void internalSendBuffer(ByteBuffer bb) {
  38.        if (bb != ServerCnxnFactory.closeConn) {
  39.            // We check if write interest here because if it is NOT set,
  40.            // nothing is queued, so we can try to send the buffer right
  41.            // away without waking up the selector
  42.            if(sk.isValid() &&
  43.                   ((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) {
  44.                try {
  45.                    sock.write(bb);//通过socket向客户端写入字节流,调度process
  46.               } catch (IOException e) {
  47.                    // we are just doing best effort right now
  48.               }
  49.           }
  50.            // if there is nothing left to send, we are done
  51.            if (bb.remaining() == 0) {
  52.                packetSent();
  53.                return;
  54.           }
  55.       }
  56.        synchronized(this.factory){
  57.            sk.selector().wakeup();
  58.            if (LOG.isTraceEnabled()) {
  59.                LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
  60.                        + " is valid: " + sk.isValid());
  61.           }
  62.            outgoingBuffers.add(bb);
  63.            if (sk.isValid()) {
  64.                sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
  65.           }
  66.       }
  67.   }
  68.    //至于远程客户端地址端口怎么知道,初始化ServerCnxn的时候就设置了
  69. public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
  70.            SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
  71.        this.zkServer = zk;
  72.        this.sock = sock;//保存客户端sock对象
  73.        this.sk = sk;
  74.        this.factory = factory;
  75.        if (this.factory.login != null) {
  76.            this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
  77.       }
  78.        if (zk != null) {
  79.            outstandingLimit = zk.getGlobalOutstandingLimit();
  80.       }
  81.        sock.socket().setTcpNoDelay(true);
  82.        /* set socket linger to false, so that socket close does not
  83.         * block */
  84.        sock.socket().setSoLinger(false, -1);
  85.        InetAddress addr = ((InetSocketAddress) sock.socket()
  86.               .getRemoteSocketAddress()).getAddress();
  87.        authInfo.add(new Id("ip", addr.getHostAddress()));
  88.        sk.interestOps(SelectionKey.OP_READ);
  89.   }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/974843
推荐阅读
相关标签
  

闽ICP备14008679号