当前位置:   article > 正文

zookeeper源码(13)客户端

zookeeper源码(13)客户端

ZooKeeper

概述

ZooKeeper客户端。要使用ZooKeeper服务,必须先实例化ZooKeeper对象。除非另有说明,否则此类的方法是线程安全的。

一旦建立了与服务器的连接,就会为客户端分配会话ID。客户端将定期向服务器发送心跳,以保持会话有效。只要会话ID有效,程序就可以通过客户端调用ZooKeeper方法。

如果由于某种原因,客户端长时间无法向服务器发送心跳,比如超过sessionTimeout值,服务器将使会话过期,会话ID将无效,客户端对象将不再可用。要调用ZooKeeper API,程序必须创建一个新的客户端对象。

如果客户端当前连接的ZooKeeper服务器出现故障或没有响应,客户端将在会话ID到期之前自动尝试连接到另一台服务器,如果成功,程序可以继续使用客户端。

ZooKeeper API方法有同步、异步两种,同步方法会阻塞,直到服务器响应。异步方法只是将请求排队等待发送并立即返回,接受一个回调对象,该对象将在成功执行请求或出现错误时调用,并带有指示错误的返回代码。

一些成功的ZooKeeper API调用可以将Watcher注册ZooKeepper服务器中的znode上,其他成功的ZooKeeper API调用可以触发这些Watcher。一旦Watcher被触发,一个事件将被传递给最初注册Watcher的客户端。每只Watcher只能触发一次,因此每一个Watcher最多会向客户端发送一个事件。

客户端需要Watcher对象来处理传递给客户端的事件,当客户端重新连接服务器时,所有现有watches都被视为已触发,但未传递的事件将丢失。为了模拟这一点,客户端将生成一个特殊事件,告诉事件处理程序连接已被断开,即EventType None和KeeperState Disconnected特殊事件。

核心字段

// 代表一个客户端连接,维护发送、接收队列,使用网络层组件收发数据包
protected final ClientCnxn cnxn;

// 维护服务器地址列表并且提供方法查找可用的服务器地址
protected final HostProvider hostProvider;
  • 1
  • 2
  • 3
  • 4
  • 5

构造方法

重要的参数:

  • connectString - 逗号分隔host:port对, 例如127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002,如果像这样127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a配置,会chroot到/app/a下,之后所有命令的path都会前缀/app/a路径,比如/foo/bar会成为/app/a/foo/bar
  • sessionTimeout - 超时时间,默认30000毫秒
  • watcher - 监听器
  • hostProvider - 用来获取可用的服务器地址
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
   
    this(connectString, sessionTimeout, watcher, false);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 ZKClientConfig conf) throws IOException {
   
    this(connectString, sessionTimeout, watcher, false, conf);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 boolean canBeReadOnly, HostProvider aHostProvider) throws IOException {
   
    this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, null);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 boolean canBeReadOnly, HostProvider hostProvider,
                 ZKClientConfig clientConfig) throws IOException {
   
    this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
    this.hostProvider = hostProvider;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);

    // 创建ClientCnxn
    // 使用connectStringParser从连接串解析chroot路径
    // getClientCnxnSocket默认使用ClientCnxnSocketNIO作为网络层实现
    cnxn = createConnection(connectStringParser.getChrootPath(),
                            hostProvider, sessionTimeout, this.clientConfig,
                            watcher, getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 boolean canBeReadOnly) throws IOException {
   
    // createDefaultHostProvider方法创建StaticHostProvider对象
    this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString));
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 boolean canBeReadOnly, ZKClientConfig conf) throws IOException {
   
    this(connectString, sessionTimeout, watcher,
         canBeReadOnly, createDefaultHostProvider(connectString), conf);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 long sessionId, byte[] sessionPasswd) throws IOException {
   
    this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 long sessionId, byte[] sessionPasswd, boolean canBeReadOnly,
                 HostProvider aHostProvider) throws IOException {
   
    this(connectString, sessionTimeout, watcher, sessionId,
         sessionPasswd, canBeReadOnly, aHostProvider, null);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId,
                 byte[] sessionPasswd, boolean canBeReadOnly, HostProvider hostProvider,
                 ZKClientConfig clientConfig) throws IOException {
   

    this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    this.hostProvider = hostProvider;

    // 创建ClientCnxn
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider,
                          sessionTimeout, this.clientConfig, watcher,
                          getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
    cnxn.seenRwServerBefore = true; // since user has provided sessionId
    cnxn.start();
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
   
    this(connectString, sessionTimeout, watcher, sessionId,
         sessionPasswd, canBeReadOnly, createDefaultHostProvider(connectString));
}

ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout,
                            ZKClientConfig clientConfig, Watcher defaultWatcher,
                            ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {
   
    return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, clientConfig,
                          defaultWatcher, clientCnxnSocket, canBeReadOnly);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

ClientCnxn

构造方法

public ClientCnxn(String chrootPath, HostProvider hostProvider,
       int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher,
       ClientCnxnSocket clientCnxnSocket, long sessionId,
       byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
   
    this.chrootPath = chrootPath;
    this.hostProvider = hostProvider;
    this.sessionTimeout = sessionTimeout; // 默认30000
    this.clientConfig = clientConfig;
    this.sessionId = sessionId; // 默认0
    this.sessionPasswd = sessionPasswd; // 默认byte[16]
    this.readOnly = canBeReadOnly;

    // 管理本地Watcher
    this.watchManager = new ZKWatchManager(
            clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
            defaultWatcher);

    // 以3个节点为例,值为10000
    this.connectTimeout = sessionTimeout / hostProvider.size();
    // 默认20000
    this.readTimeout = sessionTimeout * 2 / 3;

    // 使用网络层组件建立连接、收发数据
    this.sendThread = new SendThread(clientCnxnSocket);
    // 用于处理接收到的数据包
    this.eventThread = new EventThread();
    // 初始化requestTimeout参数,如果请求在该时间内没有收到响应,会以丢包处理,默认0
    initRequestTimeout();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

发送队列

// 已发送等待响应队列
private final Queue<Packet> pendingQueue = new ArrayDeque<>();

// 等待发送队列
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<>();
  • 1
  • 2
  • 3
  • 4
  • 5

建立连接

SendThread线程用来建立连接、收发数据:

public void run() {
   
    // 此处会把发送队列传递给socket层,二者共用一个队列
    clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    while (state.isAlive()) {
   
        try {
   
            if (!clientCnxnSocket.isConnected()) {
   
                // don't re-establish connection if we are closing
                if (closing) {
   
                    break;
                }
                // 获取一个可用的服务器地址
                if (rwServerAddress != null) {
   
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
   
                    serverAddress = hostProvider.next(1000);
                }
                onConnecting(serverAddress); // do nothing
                // 使用clientCnxnSocket建立网络层连接
                // 使用clientCnxnSocket.connect(addr)方法
                startConnect(serverAddress);
                // Update now to start the connection timer right after we make a connection attempt
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }

            if (state.isConnected()) {
   
                if (zooKeeperSaslClient != null) {
   
                    // sasl认证 略
                }
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
   
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }

            if (to <= 0) {
   
                // Client session timed out 连接超时
                throw new SessionTimeoutException(warnInfo);
            }
            if (state.isConnected()) {
   
                // 1000(1 second) is to prevent race condition missing to send the second ping
                // also make sure not to send too many pings when readTimeout is small
                int timeToNextPing = readTimeout / 2
                                     - clientCnxnSocket.getIdleSend()
                                     - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                // 到了ping时间或者idleSend大于10000
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
   
                    sendPing(); // ping服务器保持连接
                    clientCnxnSocket.updateLastSend();
                } else {
   
                    if (timeToNextPing < to) {
   
                        to = timeToNextPing;
                    }
                }
            }

            // If we are in read-only mode, seek for read/write server
            if (state == States.CONNECTEDREADONLY) {
   
                // readonly模式略
            }

            // 收发数据
            clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
        } catch (Throwable e) {
   
            if (closing) {
   
                // closing so this is expected
                break;
            } else {
   
                // At this point, t
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/603233
推荐阅读
相关标签
  

闽ICP备14008679号