赞
踩
在介绍完client端的基本操作之后,后续我们就要对zookeeper server端的相关操作进行解析了。
实际重点还是在server端,数据存储同步恢复、事务请求执行、master选举等都是在server端进行的。
server有单机版和集群版,大致流程都差不多,为了便于理解,我们先从单机版进行分析。
windows环境下,启动server端,就是通过bin/zkServer.cmd start命令执行的(Linux环境下,是bin/zkServer.sh start命令)。我们来看下这个脚本主要执行了哪些启动类
- setlocal
- call "%~dp0zkEnv.cmd"
-
- # 启动类为QuorumPeerMain
- set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
- echo on
- call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
-
- endlocal
通过启动脚本,可以看到QuorumPeerMain为其启动类,而启动参数ZOOCFG是从zkEnv.cmd中获取的,如下所示,即conf/zoo.cfg文件
- set ZOOCFGDIR=%~dp0%..\conf
- set ZOO_LOG_DIR=%~dp0%..
- set ZOO_LOG4J_PROP=INFO,CONSOLE
-
- REM for sanity sake assume Java 1.6
- REM see: http://java.sun.com/javase/6/docs/technotes/tools/windows/java.html
-
- REM add the zoocfg dir to classpath
- set CLASSPATH=%ZOOCFGDIR%
-
- REM make it work in the release
- SET CLASSPATH=%~dp0..\*;%~dp0..\lib\*;%CLASSPATH%
-
- REM make it work for developers
- SET CLASSPATH=%~dp0..\build\classes;%~dp0..\build\lib\*;%CLASSPATH%
-
- set ZOOCFG=%ZOOCFGDIR%\zoo.cfg
总结下来,zkServer启动时主要是启动QuorumPeerMain类,参数为conf/zoo.cfg
通过脚本我们都知道该如何启动了,那么我们能不能直接通过源码来启动呢,方便我们进行debug。本质上是可以的。
在conf/目录下,只有zoo_sample.cfg文件,我们需要复制一下该文件,并重命名为zoo.cfg。完成之后,如下所示
将conf/log4j.properties文件复制到src/java/main目录下。完成之后,如下所示
需要在QuorumPeerMain类的启动参数中添加conf/zoo.cfg。完成之后,如下所示:
到这里,就可以对QuorumPeerMain进行debug启动。
此时args[] 为conf/zoo.cfg
- public class QuorumPeerMain {
- public static void main(String[] args) {
- QuorumPeerMain main = new QuorumPeerMain();
- try {
- // 直接调用initializeAndRun
- main.initializeAndRun(args);
- } catch (IllegalArgumentException e) {
- ...
- System.exit(0);
- }
-
- protected void initializeAndRun(String[] args)
- throws ConfigException, IOException
- {
- QuorumPeerConfig config = new QuorumPeerConfig();
- if (args.length == 1) {
- // 直接将zoo.cfg配置文件交由QuorumPeerConfig解析,具体见3.2
- config.parse(args[0]);
- }
-
- // 提供一个定时任务,来清理data数据和log数据,这个后续我们专门来分析
- DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
- .getDataDir(), config.getDataLogDir(), config
- .getSnapRetainCount(), config.getPurgeInterval());
- purgeMgr.start();
-
- if (args.length == 1 && config.servers.size() > 0) {
- runFromConfig(config);
- } else {
- LOG.warn("Either no config or no quorum defined in config, running "
- + " in standalone mode");
- // 单机模式的启动,是我们本次分析的重点,具体见3.3,上面的是集群模式,后续我们专门分析
- ZooKeeperServerMain.main(args);
- }
- }
- }
- public class QuorumPeerConfig {
- public void parse(String path) throws ConfigException {
- File configFile = new File(path);
- try {
- ...
- Properties cfg = new Properties();
- FileInputStream in = new FileInputStream(configFile);
- try {
- // 加载配置文件到Properties中
- cfg.load(in);
- } finally {
- in.close();
- }
-
- // 这个方法就是将Properties中对应属性回写到QuorumPeerConfig中,方法过长,笔者不再展示
- parseProperties(cfg);
- } catch (IOException e) {
- throw new ConfigException("Error processing " + path, e);
- } catch (IllegalArgumentException e) {
- throw new ConfigException("Error processing " + path, e);
- }
- }
- }
- public class ZooKeeperServerMain {
- public static void main(String[] args) {
- ZooKeeperServerMain main = new ZooKeeperServerMain();
- try {
- // 一样的套路
- main.initializeAndRun(args);
- } catch (IllegalArgumentException e) {
- ...
- }
- LOG.info("Exiting normally");
- System.exit(0);
- }
-
- protected void initializeAndRun(String[] args)
- throws ConfigException, IOException
- {
- try {
- // JDK提供的对应用程序提供管理监控功能的框架,非本文重点,先忽略
- ManagedUtil.registerLog4jMBeans();
- } catch (JMException e) {
- LOG.warn("Unable to register log4j JMX control", e);
- }
-
- ServerConfig config = new ServerConfig();
- // 解析zoo.cfg,同样是将文件解析到QuorumPeerConfig中,参考上面3.2
- if (args.length == 1) {
- config.parse(args[0]);
- } else {
- config.parse(args);
- }
-
- // 启动server
- runFromConfig(config);
- }
-
- public void runFromConfig(ServerConfig config) throws IOException {
- LOG.info("Starting server");
- FileTxnSnapLog txnLog = null;
- try {
- final ZooKeeperServer zkServer = new ZooKeeperServer();
- // 通过ZooKeeperServerShutdownHandler进行shutdownLatch锁释放
- final CountDownLatch shutdownLatch = new CountDownLatch(1);
- zkServer.registerServerShutdownHandler(
- new ZooKeeperServerShutdownHandler(shutdownLatch));
-
- // 日志文件和数据文件的处理类,后续单独文章来讲解
- txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
- config.dataDir));
- txnLog.setServerStats(zkServer.serverStats());
- zkServer.setTxnLogFactory(txnLog);
-
- // 设置基本参数
- zkServer.setTickTime(config.tickTime);
- zkServer.setMinSessionTimeout(config.minSessionTimeout);
- zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
-
- // 在这里创建端口服务类,用于监听端口请求,具体方法见4
- cnxnFactory = ServerCnxnFactory.createFactory();
- cnxnFactory.configure(config.getClientPortAddress(),
- config.getMaxClientCnxns());
- cnxnFactory.startup(zkServer);
- shutdownLatch.await();
- shutdown();
-
- cnxnFactory.join();
- if (zkServer.canShutdown()) {
- zkServer.shutdown(true);
- }
- } catch (InterruptedException e) {
- // warn, but generally this is ok
- LOG.warn("Server interrupted", e);
- } finally {
- if (txnLog != null) {
- txnLog.close();
- }
- }
- }
- }
zookeeper中,默认对外开放2181端口,那么作为server端,肯定要对这个端口进行监听,接收client请求并处理,返回响应。
具体服务的创建都是通过ServerCnxnFactory来完成的。
- public abstract class ServerCnxnFactory {
- public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
- static public ServerCnxnFactory createFactory() throws IOException {
- String serverCnxnFactoryName =
- System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
- // 默认创建的是NIOServerCnxnFactory,其使用JDK原生的NIO来完成服务暴露监听
- if (serverCnxnFactoryName == null) {
- serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
- }
- try {
- ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
- .getDeclaredConstructor().newInstance();
- LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
- return serverCnxnFactory;
- } catch (Exception e) {
- IOException ioe = new IOException("Couldn't instantiate "
- + serverCnxnFactoryName);
- ioe.initCause(e);
- throw ioe;
- }
- }
- }
默认我们的服务是由NIOServerCnxnFactory来创建的。通过其源码可以看出,它是对JDK NIO的原生使用。
在性能方面,肯定是不如Netty的,所以在实际使用中,我们可以在系统中设置如下参数
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
主动将原生NIO服务切换为Netty服务以提高性能。
- public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
- public void configure(InetSocketAddress addr, int maxcc) throws IOException {
- configureSaslLogin();
-
- thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
- thread.setDaemon(true);
- maxClientCnxns = maxcc;
- this.ss = ServerSocketChannel.open();
- ss.socket().setReuseAddress(true);
- LOG.info("binding to port " + addr);
- ss.socket().bind(addr);
- ss.configureBlocking(false);
- ss.register(selector, SelectionKey.OP_ACCEPT);
- }
- }
代码很简单,就是标准的NioServer启动端口,创建连接监听。
- public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
- public void startup(ZooKeeperServer zks) throws IOException,
- InterruptedException {
- // 这里本质上是启动NIOServerCnxnFactory.run()方法用于监听读写事件
- start();
- setZooKeeperServer(zks);
- // 加载数据到内存中,这里我们先知道有这个操作,后续分析事务日志时再一起说明下
- zks.startdata();
- // 最终启动,调用ZookeeperServer.startup(),具体见4.4
- zks.startup();
- }
- }
startup启动zookeeper服务时,需要启动监听客户端读写事件服务;加载数据到内存中
- public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
- public synchronized void startup() {
- if (sessionTracker == null) {
- createSessionTracker();
- }
- // sessionTracker是会话管理器,负责会话的创建、管理清理等操作,这个后续我们专门一篇文章来说明
- startSessionTracker();
- // 业务处理器,后续介绍服务端处理请求时会详细说明
- setupRequestProcessors();
-
- registerJMX();
-
- setState(State.RUNNING);
- notifyAll();
- }
- }
一张图来了解下单机版Zookeeper服务启动全貌
到这,我们简单的分析了Zookeeper服务器单机版的启动过程。总体来说有以下内容:
1.启动端口服务监听,监听客户端的连接、读写等事件
2.启动会话管理器SessionTracker,用于管理、清理过期会话
3.加载数据到内存中
4.提供日志文件和数据文件的处理类
主要就是这些,本文对其骨架进行分析,后续上面这些点都会通过专门的文章来详细分析。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。