赞
踩
ZooKeeper的客户端主要由以下几个核心组件组成。
private final ZKWatchManager watchManager = new ZKWatchManager(); public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
根据事件类型,materialize方法会执行以下操作:
private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
private final String chrootPath;
在ConnectStringParser解析器中会对服务器地址做一个简单的处理,并将服务器地址和相应的端口封装成一个InetSocketAddress对象,以ArrayList形式保存在ConnectStringParser.serverAddresses属性中,然后,经过处理的地址列表会被进一步封装到StaticHostProvider类中。
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId; this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath; connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); }
public void start() {
sendThread.start();
eventThread.start();
}
启动SendThread和EventThread
SendThread首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送“会话创建”请求做准备。
在开始创建TCP连接之前,SendThread首先需要获取一个ZooKeeper服务器的目标地址,这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与ZooKeeper服务器之间的TCP连接。
private void startConnect() throws IOException { state = States.CONNECTING; InetSocketAddress addr; if (rwServerAddress != null) { addr = rwServerAddress; rwServerAddress = null; } else { addr = hostProvider.next(1000); } setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr); clientCnxnSocket.connect(addr); }
获取到一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。
这里有两个实现类,默认第一个,封装连接请求,ConnectRequest,最后在封装成Packet对象,放入请求发送队列outgoingQueue中去。
当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。
void sendPacket(Packet p) throws IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
p.createBB();
ByteBuffer pbb = p.bb;
sock.write(pbb);
}
首先,方法通过instanceof关键字检查传入的事件对象是否是WatcherSetEventPair类型。如果是,这意味着事件包含了一组Watcher对象,每个Watcher都会处理这个事件。代码遍历这些Watcher对象,并调用它们的process方法来处理事件。如果在处理过程中抛出异常,将会记录错误日志。
如果事件对象不是WatcherSetEventPair类型,那么它将被当作Packet类型来处理。Packet对象包含了客户端路径、回复头信息以及回调接口(cb)。代码首先检查回复头中的错误码,如果有错误,就将错误码保存在局部变量rc中。
接下来,代码根据响应的类型(例如ExistsResponse、GetDataResponse等)来调用相应的回调接口方法。这些回调接口是Zookeeper客户端用来接收操作结果的机制。例如,如果响应是GetDataResponse类型,那么代码会调用DataCallback接口的processResult方法,并传入操作结果码、客户端路径、上下文信息、节点数据和状态信息。
此外,代码还处理了MultiResponse类型,这是一种特殊的情况,表示一个请求包含了多个操作,每个操作都有自己的结果。在这种情况下,代码会遍历结果列表,并在所有操作都成功的情况下调用MultiCallback接口的processResult方法。
最后,如果回调接口是VoidCallback类型,那么代码会调用processResult方法,但不会传入任何数据,因为VoidCallback不关心操作结果。
ClientCnxnSocket接收到服务端的响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。
EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出步骤2中存储的默认Watcher,然后将其放到EventThread的waitingEvents队列中去。
EventThread不断地从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。
Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper中请求与响应的载体
Packet中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath/serverPath)和注册的Watcher(watchRegistration)等信息。
Packet的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会将requestHeader、request和readOnly三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。
public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } }
outgoingQueue和pendingQueueClientCnxn中,有两个比较核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端响应的等待队列。Outgoing队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。Pending队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。