赞
踩
ZooKeeper客户端。要使用ZooKeeper服务,必须先实例化ZooKeeper对象。除非另有说明,否则此类的方法是线程安全的。
一旦建立了与服务器的连接,就会为客户端分配会话ID。客户端将定期向服务器发送心跳,以保持会话有效。只要会话ID有效,程序就可以通过客户端调用ZooKeeper方法。
如果由于某种原因,客户端长时间无法向服务器发送心跳,比如超过sessionTimeout值,服务器将使会话过期,会话ID将无效,客户端对象将不再可用。要调用ZooKeeper API,程序必须创建一个新的客户端对象。
如果客户端当前连接的ZooKeeper服务器出现故障或没有响应,客户端将在会话ID到期之前自动尝试连接到另一台服务器,如果成功,程序可以继续使用客户端。
ZooKeeper API方法有同步、异步两种,同步方法会阻塞,直到服务器响应。异步方法只是将请求排队等待发送并立即返回,接受一个回调对象,该对象将在成功执行请求或出现错误时调用,并带有指示错误的返回代码。
一些成功的ZooKeeper API调用可以将Watcher注册ZooKeepper服务器中的znode上,其他成功的ZooKeeper API调用可以触发这些Watcher。一旦Watcher被触发,一个事件将被传递给最初注册Watcher的客户端。每只Watcher只能触发一次,因此每一个Watcher最多会向客户端发送一个事件。
客户端需要Watcher对象来处理传递给客户端的事件,当客户端重新连接服务器时,所有现有watches都被视为已触发,但未传递的事件将丢失。为了模拟这一点,客户端将生成一个特殊事件,告诉事件处理程序连接已被断开,即EventType None和KeeperState Disconnected特殊事件。
// 代表一个客户端连接,维护发送、接收队列,使用网络层组件收发数据包
protected final ClientCnxn cnxn;
// 维护服务器地址列表并且提供方法查找可用的服务器地址
protected final HostProvider hostProvider;
重要的参数:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { this(connectString, sessionTimeout, watcher, false); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig conf) throws IOException { this(connectString, sessionTimeout, watcher, false, conf); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider) throws IOException { this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, null); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig(); this.hostProvider = hostProvider; ConnectStringParser connectStringParser = new ConnectStringParser(connectString); // 创建ClientCnxn // 使用connectStringParser从连接串解析chroot路径 // getClientCnxnSocket默认使用ClientCnxnSocketNIO作为网络层实现 cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, watcher, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { // createDefaultHostProvider方法创建StaticHostProvider对象 this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString)); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig conf) throws IOException { this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString), conf); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd) throws IOException { this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider) throws IOException { this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, canBeReadOnly, aHostProvider, null); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig(); ConnectStringParser connectStringParser = new ConnectStringParser(connectString); this.hostProvider = hostProvider; // 创建ClientCnxn cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, watcher, getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly); cnxn.seenRwServerBefore = true; // since user has provided sessionId cnxn.start(); } public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException { this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, canBeReadOnly, createDefaultHostProvider(connectString)); } ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, clientConfig, defaultWatcher, clientCnxnSocket, canBeReadOnly); }
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException { this.chrootPath = chrootPath; this.hostProvider = hostProvider; this.sessionTimeout = sessionTimeout; // 默认30000 this.clientConfig = clientConfig; this.sessionId = sessionId; // 默认0 this.sessionPasswd = sessionPasswd; // 默认byte[16] this.readOnly = canBeReadOnly; // 管理本地Watcher this.watchManager = new ZKWatchManager( clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET), defaultWatcher); // 以3个节点为例,值为10000 this.connectTimeout = sessionTimeout / hostProvider.size(); // 默认20000 this.readTimeout = sessionTimeout * 2 / 3; // 使用网络层组件建立连接、收发数据 this.sendThread = new SendThread(clientCnxnSocket); // 用于处理接收到的数据包 this.eventThread = new EventThread(); // 初始化requestTimeout参数,如果请求在该时间内没有收到响应,会以丢包处理,默认0 initRequestTimeout(); }
// 已发送等待响应队列
private final Queue<Packet> pendingQueue = new ArrayDeque<>();
// 等待发送队列
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<>();
SendThread线程用来建立连接、收发数据:
public void run() { // 此处会把发送队列传递给socket层,二者共用一个队列 clientCnxnSocket.introduce(this, sessionId, outgoingQueue); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; while (state.isAlive()) { try { if (!clientCnxnSocket.isConnected()) { // don't re-establish connection if we are closing if (closing) { break; } // 获取一个可用的服务器地址 if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } onConnecting(serverAddress); // do nothing // 使用clientCnxnSocket建立网络层连接 // 使用clientCnxnSocket.connect(addr)方法 startConnect(serverAddress); // Update now to start the connection timer right after we make a connection attempt clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } if (state.isConnected()) { if (zooKeeperSaslClient != null) { // sasl认证 略 } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { // Client session timed out 连接超时 throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { // 1000(1 second) is to prevent race condition missing to send the second ping // also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); // 到了ping时间或者idleSend大于10000 if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); // ping服务器保持连接 clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { // readonly模式略 } // 收发数据 clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { // closing so this is expected break; } else { // At this point, t
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。