当前位置:   article > 正文

ZooKeeper架构篇 - 监听器机制_zk监听器

zk监听器

ZooKeeper监听器原理

ZooKeeper 的监听器原理如下:

  • 客户端注册监听器:客户端调用注册监听器的方法(比如 exists、getData、getChildren、addWatch),传递节点路径和监听器,然后组装成一个 Packet 对象发送给服务端。
  • 服务端存储监听器:服务端根据客户端的请求判断是否需要注册监听器,需要的话将节点路径和 ServerCnxn(表示客户端与服务端的连接)存储到服务端本地的 WatchManager 中,然后向客户端发送响应。
  • 客户端接收服务端的响应:客户端接收到服务端的响应后,将监听器注册到客户端本地的 ZKWatcherManager。
  • 服务端触发事件通知:对于客户端的 create、delete、setData 方法的调用会触发服务端向客户端发送事件通知。
  • 客户端接收服务端的事件通知:客户端接收到服务端的事件通知后执行监听器的回调方法。

ZooKeeper监听器特点

exists 方法监听节点创建、删除、数据内容变化。

getData 方法监听节点删除、数据内容变化。

getChildren 方法监听节点的删除、子节点的创建或者删除。

这三个方法注册的监听器具有如下三个特点:

  • 一次性 : (对于 Standard 类型的监听器,即默认类型)无论是服务端还是客户端,一旦监听器被触发,都会从存储中删除该监听器。因此需要反复注册,可以有效减轻服务器的压力。
  • 客户端串行执行监听器的回调 : 因为是从阻塞队列中取出,保证了顺序性。
  • 轻量 : 客户端向服务端注册监听器的时候,并不会把客户端的监听器对象传递给服务端,仅仅是在客户端请求中使用boolean类型的属性进行标记。服务端的监听器事件通知非常简单,只会通知客户端发生了事件,不会说明具体的事件内容。

从 3.6.0 版本开始,增加了 addWatch 方法,支持注册持久监听器(监听节点的创建、删除、数据内容变化)、持久递归监听器(监听节点/子节点的创建、删除、数据内容变化)。

源码分析

源码分析基于 ZooKeeper 3.8.0 版本。

客户端注册监听器

概述:客户端调用 exists、getData、getChildren 方法,传递节点路径和监听器,然后组装成一个 Packet 对象发送给服务端。

ZooKeeper

public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException {
   
    final String clientPath = path;
  	// 校验路径的合法性
    PathUtils.validatePath(clientPath);
    WatchRegistration wcb = null;
    if (watcher != null) {
   
      	// 封装监听器、节点路径到一个WatchRegistration实例
        wcb = new ExistsWatchRegistration(watcher, clientPath);
    }
	// 如果指定了chrootPath,则在路径的前面拼装chrootPath
    final String serverPath = prependChroot(clientPath);
	// 构建请求头
    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.exists);
  	// 构建请求体
    ExistsRequest request = new ExistsRequest();
    request.setPath(serverPath);
    // 标识是否有监听器
    request.setWatch(watcher != null);
    SetDataResponse response = new SetDataResponse();
  	// 提交请求
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
   
        if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
   
            return null;
        }
        throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
    }

    return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

ClientCnxn

public ReplyHeader submitRequest(
    RequestHeader h,
    Record request,
    Record response,
    WatchRegistration watchRegistration) throws InterruptedException {
   
    // 传递的WatchDeregistration参数为空
    return submitRequest(h, request, response, watchRegistration, null);
}

public ReplyHeader submitRequest(
    RequestHeader h,
    Record request,
    Record response,
    WatchRegistration watchRegistration,
    WatchDeregistration watchDeregistration) throws InterruptedException {
   
    ReplyHeader r = new ReplyHeader();
  	// 构造Packet实例
    Packet packet = queuePacket(
        h,
        r,
        request,
        response,
        null,
        null,
        null,
        null,
        watchRegistration,
        watchDeregistration);
    synchronized (packet) {
   
        if (requestTimeout > 0) {
   
            waitForPacketFinish(r, packet);
        } else {
   
            while (!packet.finished) {
   
                packet.wait();
            }
        }
    }
    if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
   
        sendThread.cleanAndNotifyState();
    }
    return r;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

public Packet queuePacket(
    RequestHeader h,
    ReplyHeader r,
    Record request,
    Record response,
    AsyncCallback cb,
    String clientPath,
    String serverPath,
    Object ctx,
    WatchRegistration watchRegistration,
    WatchDeregistration watchDeregistration) {
   
    Packet packet = null;
  	// 构造Packet实例
    packet = new Packet(h, r, request, response, watchRegistration);
    packet.cb = cb;
    packet.ctx = ctx;
    packet.clientPath = clientPath;
    packet.serverPath = serverPath;
    packet.watchDeregistration = watchDeregistration;
    synchronized (state) {
   
        if (!state.isAlive() || closing) {
   
            conLossPacket(packet);
        } else {
   
            if (h.getType() == OpCode.closeSession) {
   
                closing = true;
            }
          	// 将Packet实例加入到outgoingQueue阻塞队列中
            outgoingQueue.add(packet);
        }
    }
  	// 打开ClientCnxnSocketNIO的Selector
    sendThread.getClientCnxnSocket().packetAdded();
    return packet;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

SendThread

ZooKeeper 的构造器方法中会启动 SendThread 线程,接下来看下 SendThread 的 run 方法。

@Override
public void run() {
   
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000;
    InetSocketAddress serverAddress = null;
    while (state.isAlive()) {
   
        try {
   
            if (!clientCnxnSocket.isConnected()) {
   
                if (closing) {
   
                    break;
                }
                if (rwServerAddress != null) {
   
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
   
                    serverAddress = hostProvider.next(1000);
                }
                onConnecting(serverAddress);
                startConnect(serverAddress);
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }

            if (state.isConnected()) {
   
                if (zooKeeperSaslClient != null) {
   
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
   
                        try {
   
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
   
                            LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
                            changeZkState(States.AUTH_FAILED);
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
   
                        if (authState == KeeperState.AuthFailed) {
   
                            changeZkState(States.AUTH_FAILED);
                            sendAuthEvent = true;
                        } else {
   
                            if (authState == KeeperState.SaslAuthenticated) {
   
                                sendAuthEvent = true;
                            }
                        }
                    }

                    if (sendAuthEvent) {
   
                        eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                        if (state == States.AUTH_FAILED) {
   
                            eventThread.queueEventOfDeath();
                        }
                    }
                }
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
   
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

            if (to <= 0) {
   
                String warnInfo = String.format(
                    "Client session timed out, have not heard from server in %dms for session id 0x%s",
                    clientCnxnSocket.getIdleRecv(),
                    Long.toHexString(sessionId));
                LOG.warn(warnInfo);
                throw new SessionTimeoutException(warnInfo);
            }
            if (state.isConnected()) {
   
                int timeToNextPing = readTimeout / 2
                                     - clientCnxnSocket.getIdleSend()
                                     - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
   
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
   
                    if (timeToNextPing < to) {
   
                        to = timeToNextPing;
                    }
                }
            }

            if (state == States.CONNECTEDREADONLY) {
   
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
   
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }
			// 将数据传输到服务端
            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        } catch (Throwable e) {
   
            if (closing) {
   
                LOG.warn(
                    "An exception was thrown while closing send thread for session 0x{}.",
                    Long.toHexString(getSessionId()),
                    e);
                break;
            } else {
   
                LOG.warn(
                    "Session 0x{} for server {}, Closing socket connection. "
                        + "Attempting reconnect except it is a SessionExpiredException.",
                    Long.toHexString(getSessionId()),
                    serverAddress,
                    e);
                cleanAndNotifyState();
            }
        }
    }

    synchronized (state) {
   
        cleanup();
    }
    clientCnxnSocket.close();
    if (state.isAlive()) {
   
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
    }
    eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));

    if (zooKeeperSaslClient != null) {
   
        zooKeeperSaslClient.shutdown();
    }
    ZooTrace.logTraceMessage(
        LOG,
        ZooTrace.getTextTraceLevel(),
        "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166

ClientCnxnSocketNIO

@Override
void doTransport(
    int waitTimeOut,
    Queue<Packet> pendingQueue,
    ClientCnxn cnxn) throws IOException, InterruptedException {
   
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
   
        selected = selector.selectedKeys();
    }
    updateNow();
    for (SelectionKey k : selected) {
   
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
   
            if (sc.finishConnect()) {
   
                updateLastSendAndHeard();
                updateSocketAddresses();
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
   
            // IO交互
          	doIO(pendingQueue, cnxn);
        }
    }
  	// 如果服务器状态是已连接状态
    if (sendThread.getZkState().isConnected()) {
   
      	// 如果可以从outgoingQueue阻塞队列中取出Packet实例
        if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
   
            // 将通道设置为可写状态
          	enableWrite();
        }
    }
    selected.clear();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
   
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
   
        throw new IOException("Socket is null!");
    }
    if (sockKey.isReadable()) {
   
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
   
            throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                           + Long.toHexString(sessionId)
                                           + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
   
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
   
                recvCount.getAndIncrement();
                readLength();
            } else if (!initialized) {
   
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
   
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else {
   
                sendThread.readResponse(incomingBuffer);
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
            }
        }
    }
  	// 如果通道是可写状态
    if (sockKey.isWritable()) {
   
      	// 从outgoingQueue阻塞队列中取出Packet实例
        Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
		// 如果该Packet实例非空
        if (p != null) {
   
            updateLastSend();
            if (p.bb == null) {
   
                if ((p.requestHeader != null)
                    && (p.requestHeader.getType() != OpCode.ping)
                    && (p.requestHeader.getType() != OpCode.auth)) {
   
                    p.requestHeader.setXid(cnxn.getXid());
                }
                p.createBB();
            }
          	// 将Packet实例的ByteBuffer写到通道中
            sock.write(p.bb);
            if (!p.bb.hasRemaining()) {
   
                sentCount.getAndIncrement();
              	// 从outgoingQueue阻塞队列中移除该Packet实例
                outgoingQueue.removeFirstOccurrence(p);
                if (p.requestHeader != null
                    && p.requestHeader.getType() != OpCode.ping
                    && p.requestHeader.getType() != OpCode.auth) {
   
                    synchronized (pendingQueue) {
   
                      	// 将该Packet实例加入到pendingQueue队列中
                        pendingQueue.add(p);
                    }
                }
            }
        }
        if (outgoingQueue.isEmpty()) {
   
            disableWrite();
        } else if (!initialized && p != null && !p.bb.hasRemaining()) {
   
            disableWrite();
        } else {
   
        	// 将通道设置为可写状态
            enableWrite();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

服务端存储监听器

概述:服务端根据客户端的请求判断是否需要注册监听器,需要的话将节点路径和 ServerCnxn(表示客户端与服务端的连接)存储到服务端本地的 WatchManager 中,然后向客户端发送响应。

NIOServerCnxnFactory

QuorumPeerMain 在执行 main 方法时,会初始化一个具体的 ServerCnxnFactory 实例,默认是 NIOServerCnxnFactory 类型。

@Override
public void start() {
   
    stopped = false;
    if (workerPool == null) {
   
    	// 实例化WorkerService
        workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
    }
    for (SelectorThread thread : selectorThreads) {
   
        if (thread.getState() == Thread.State.NEW) {
   
          	// 启动SelectorThread线程
            thread.start();
        }
    }
    if (acceptThread.getState() == Thread.State.NEW) {
   
      	// 启动AcceptThread线程
        acceptThread.start();
    }
    if (expirerThread.getState() == Thread.State.NEW) {
   
        expirerThread.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

AcceptThread

public void run
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/974777
    推荐阅读
    相关标签
      

    闽ICP备14008679号