赞
踩
Zookeeper:客户端的入口。
ClientWatchManager:客户端watcher管理器。
HostProvider:客户端地址列表管理器。
ClientCnxn:客户端连接核心类,包含SendThread和EventThread两个线程。SendThread为I/O线程,主要负责Zookeeper客户端和服务器之间的网络I/O通信;EventThread为事件线程,主要负责对服务端事件进行处理。
上图主要描述了ZK Client和Server端互动的过程:
ZK Client和Server端建立连接从Client来说主要分为以下三个阶段:
初始化阶段
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { // 把传入的watcher注册到default的watcher中,(留心就可以发现getdata,exists,getchildren提供了参数为 // boolean类型,参数名为watch的接口,调用这些接口触发的就是default的watcher) // 设置默认watcher,之前讲watcher的时候说过 watchManager.defaultWatcher = watcher; // 负责解析配置的server地址串 // 主要有两个功能:1.加chroot(默认prefix,之前有介绍过);2.读字符串并把多个server地址分开 ConnectStringParser connectStringParser = new ConnectStringParser(connectString); // 根据之前的字符串解析hostname,ip等,并不一定会按照原来的顺序,在构造器中会将顺序打散 HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); // 实例化clientCnxn对象 // ClientCnxn的构造器中有一个非常重要的参数是ClientCnxnSocket,这也是client和server建立连接的功能类 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); // 启动sendThread和eventThread cnxn.start(); }
会话创建阶段
6. 启动SendThread和EventThread线程
7. 获取一个服务器地址:获取地址后,委托给ClientCnxnSocket去创建与Zookeeper服务器之间的TCP连接。
8. 创建TCP连接
9. 构造ConnectRequest请求:8只是从网络TCP层面完成了客户端与服务端之间的socket连接,但远未完成Zookeeper客户端的会话创建。SendThread根据设置,构造出一个ConnectRequest请求,包装成网络I/O层的Packet对象,放入请求发送队列outgoingQueue中去。
10. 发送请求
响应处理阶段
11. 接收服务端响应
12. 处理Response:ClientCnxnSocket将响应反序列化得到ConnectResponse对象,并获取会话sessionId。
13. 连接成功
14. 生成事件:SyncConnected-None:为了让上层应用感知到会话的成功创建,SendThread会生成一个事件,代表客户端与服务端会话创建成功,并将该事件传递给EventThread线程。
15. 查询Watcher
16. 处理事件
Zookeeper客户端内部在接收到服务器地址列表后,会将其首先放入一个ConnectStringParser对象封装起来
ConnectStringParser解析器将会对传入的connectString做两个主要处理:解析chrootPath,保存服务器地址列表。
在ConnectStringParser解析器中会対服各器地址做一个筒単的处理,并将服务器地址和相应的端口封装成一个InetSocketAddress对象,以ArrayList形式保存在ConnectStringParser.serverAddresses属性中。然后,经过处理的地址列表会被进一步封装到StaticHostProvider类中。
HostProvider类定义了一个客户端的服务器地址管理器。
StaticHostProvider
Packet是ClientCnxn内部定义的一个对协议层的封装,作为Zookeeper中请求与相应的载体。
Packet
Packet的createBB方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会讲里requestHeader,request,和readOnly三个属性进行序列化,其余属性都保存在客户端的上下文中,包括watcher在内的很多变量都没有序列化,这也是watcher轻量特性的保证。
outgoingqueue和pendingQueue
0utgoing队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。
Pending队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。
ClientCnxnSocket:底层Socket通信层
请求发送
响应阶段
客户端获取到来自服务端的完整响应数据后,根据不同的客户端请求类型,会进行不同的处理。
SendThread
EventThread
Zookeeper的连接与会话就是客户端通过实例化Zookeeper对象来实现客户端与服务器创建并保持TCP连接的过程。
Session是Zookeeper中的会话实体,代表了一个客户端会话。
SessionID
Zookeeper必须保证sessionId的全局唯一性。
在SessionTracker初识化的时候,会调用initializeNextSession方法生成一个初始化sessionID。
SessionTracker
会话创建
服务端对于客户端的“会话创建”请求的处理,大致可以分为四大步骤,分别是处理ConnectRequest请求,会话创建,处理器链路处理和会话响应。
会话激活
客户端会在会话超时时间过期范围内向服务端发送PING请求来保持会话的有效性,我们俗称“心跳检测”。
在创建Session后,每次Client发起请求(PING或者读写请求),Server端都会重新激活Session,而这个过程就是Session的激活,也就是所谓的TouchSession。会话激活的过程使Server可以检测到Client的存活并让Client保持连接状态。
会话超时时间检查
当客户端与服务端之间的网络连接断开时,Zookeeper客户端会自动进行反复的重连,直到最终成功连接上Zookeeper集群中的一台机器。此时,再次连接上服务端的客户端有可能处于以下两种状态之一。
CONNECTION_LOSS
若客户端在setData时出现了CONNECTION_LOSS现象,此时客户端会收到None-Disconnected通知,同时会抛出异常。应用程序需要捕捉异常并且等待Zookeeper客户端自动完成重连,一旦重连成功,那么客户端会收到None-SyncConnected通知,之后就可以重试setData操作。
SESSION_EXPIRED
客户端与服务端断开连接后,重连时间耗时太长,超过了会话超时时间限制后没有成功连上服务器,服务器会进行会话清理,此时,客户端不知道会话已经失效,状态还是DISCONNECTED,如果客户端重新连上了服务器,此时状态为SESSION_EXPIRED,用于需要重新实例化Zookeeper对象,并且看应用的复杂情况,重新恢复临时数据。
SESSION_MOVED
客户端会话从一台服务器转移到另一台服务器,即客户端与服务端S1断开连接后,重连上了服务端S2,此时会话就从S1转移到了S2。当多个客户端使用相同的sessionId/sessionPasswd创建会话时,会收到SessionMovedException异常。因为一旦有第二个客户端连接上了服务端,就被认为是会话转移了。
Zookeeper服务端的整体架构
Zookeeper服务器的启动,大体可以分为以下五个主要步骤:配置文件解析,初始化数据管理器,初始化网络I/O管理器,数据恢复和对外服务。
预启动
初始化
public void runFromConfig(ServerConfig config) throws IOException { FileTxnSnapLog txnLog = null; try { // 1. 创建服务器统计器ServerStats final ZooKeeperServer zkServer = new ZooKeeperServer(); final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch)); // 2. 创建数据管理器FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir)); zkServer.setTxnLogFactory(txnLog); // 3. 设置服务器tickTime和会话超时时间限制 zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout); // 4. 创建ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); // 初始化ServerCnxnFactory cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // 启动ServerCnxnFactory cnxnFactory.startup(zkServer); shutdownLatch.await(); shutdown(); cnxnFactory.join(); if (zkServer.canShutdown()) { zkServer.shutdown(true); } } }
public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
Thread thread;
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
// 启动主线程
start();
// 设置server端对象
setZooKeeperServer(zks);
// 恢复数据等
zks.startdata();
zks.startup();
}
public synchronized void startup() { if (sessionTracker == null) { // 创建session管理器 createSessionTracker(); } // 启动session管理器 startSessionTracker(); // 初始化zookeeper请求处理链 setupRequestProcessors(); // 注册JMX服务 registerJMX(); setState(State.RUNNING); notifyAll(); }
预启动
初始化
public void runFromConfig(QuorumPeerConfig config) throws IOException { try { // 创建ServerCnxnFactory ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); // 初始化ServerCnxnFactory cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // 创建并初始化QuorumPeer quorumPeer = getQuorumPeer(); // 初始化的数据基本都是从zoo.cfg中读到的值,方法名也叫runFromConfig quorumPeer.setQuorumPeers(config.getServers()); // 创建数据管理器FileTxnSnapLog quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir()))); quorumPeer.initialize(); // 恢复本地数据 // 启动ServerCnxnFactory线程 quorumPeer.start(); quorumPeer.join(); } }
@Override
public synchronized void start() {
// 恢复本地数据
loadDataBase();
// 启动ServerCnxnFactory线程
cnxnFactory.start();
// 启动leader选举
startLeaderElection();
// 启动QuorumPeer线程
super.start();
}
Leader选举
Leader和Follower启动期交互过程
Leader和Follower启动
大家可以关注我的微信公众号一起学习进步。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。