当前位置:   article > 正文

Zookeeper源码解析-Zookeeper服务端启动流程初探(单机版)_zkserver.cmd start

zkserver.cmd start

前言:

在介绍完client端的基本操作之后,后续我们就要对zookeeper server端的相关操作进行解析了。

实际重点还是在server端,数据存储同步恢复、事务请求执行、master选举等都是在server端进行的。

server有单机版和集群版,大致流程都差不多,为了便于理解,我们先从单机版进行分析。

1.zkServer.cmd的分析

windows环境下,启动server端,就是通过bin/zkServer.cmd start命令执行的(Linux环境下,是bin/zkServer.sh start命令)。我们来看下这个脚本主要执行了哪些启动类

  1. setlocal
  2. call "%~dp0zkEnv.cmd"
  3. # 启动类为QuorumPeerMain
  4. set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
  5. echo on
  6. call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %*
  7. endlocal

通过启动脚本,可以看到QuorumPeerMain为其启动类,而启动参数ZOOCFG是从zkEnv.cmd中获取的,如下所示,即conf/zoo.cfg文件

  1. set ZOOCFGDIR=%~dp0%..\conf
  2. set ZOO_LOG_DIR=%~dp0%..
  3. set ZOO_LOG4J_PROP=INFO,CONSOLE
  4. REM for sanity sake assume Java 1.6
  5. REM see: http://java.sun.com/javase/6/docs/technotes/tools/windows/java.html
  6. REM add the zoocfg dir to classpath
  7. set CLASSPATH=%ZOOCFGDIR%
  8. REM make it work in the release
  9. SET CLASSPATH=%~dp0..\*;%~dp0..\lib\*;%CLASSPATH%
  10. REM make it work for developers
  11. SET CLASSPATH=%~dp0..\build\classes;%~dp0..\build\lib\*;%CLASSPATH%
  12. set ZOOCFG=%ZOOCFGDIR%\zoo.cfg

总结下来,zkServer启动时主要是启动QuorumPeerMain类,参数为conf/zoo.cfg

2.源码debug启动测试

通过脚本我们都知道该如何启动了,那么我们能不能直接通过源码来启动呢,方便我们进行debug。本质上是可以的。

2.1 修改zoo.cfg

在conf/目录下,只有zoo_sample.cfg文件,我们需要复制一下该文件,并重命名为zoo.cfg。完成之后,如下所示

 

2.2 添加log4j.properties文件

将conf/log4j.properties文件复制到src/java/main目录下。完成之后,如下所示

 

2.3 设置启动参数

需要在QuorumPeerMain类的启动参数中添加conf/zoo.cfg。完成之后,如下所示:

 

到这里,就可以对QuorumPeerMain进行debug启动。

3.QuorumPeerMain启动流程

3.1 QuorumPeerMain.main()

此时args[] 为conf/zoo.cfg

  1. public class QuorumPeerMain {
  2. public static void main(String[] args) {
  3. QuorumPeerMain main = new QuorumPeerMain();
  4. try {
  5. // 直接调用initializeAndRun
  6. main.initializeAndRun(args);
  7. } catch (IllegalArgumentException e) {
  8. ...
  9. System.exit(0);
  10. }
  11. protected void initializeAndRun(String[] args)
  12. throws ConfigException, IOException
  13. {
  14. QuorumPeerConfig config = new QuorumPeerConfig();
  15. if (args.length == 1) {
  16. // 直接将zoo.cfg配置文件交由QuorumPeerConfig解析,具体见3.2
  17. config.parse(args[0]);
  18. }
  19. // 提供一个定时任务,来清理data数据和log数据,这个后续我们专门来分析
  20. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  21. .getDataDir(), config.getDataLogDir(), config
  22. .getSnapRetainCount(), config.getPurgeInterval());
  23. purgeMgr.start();
  24. if (args.length == 1 && config.servers.size() > 0) {
  25. runFromConfig(config);
  26. } else {
  27. LOG.warn("Either no config or no quorum defined in config, running "
  28. + " in standalone mode");
  29. // 单机模式的启动,是我们本次分析的重点,具体见3.3,上面的是集群模式,后续我们专门分析
  30. ZooKeeperServerMain.main(args);
  31. }
  32. }
  33. }

3.2 QuorumPeerConfig.parse() 解析zoo.cfg配置文件

  1. public class QuorumPeerConfig {
  2. public void parse(String path) throws ConfigException {
  3. File configFile = new File(path);
  4. try {
  5. ...
  6. Properties cfg = new Properties();
  7. FileInputStream in = new FileInputStream(configFile);
  8. try {
  9. // 加载配置文件到Properties中
  10. cfg.load(in);
  11. } finally {
  12. in.close();
  13. }
  14. // 这个方法就是将Properties中对应属性回写到QuorumPeerConfig中,方法过长,笔者不再展示
  15. parseProperties(cfg);
  16. } catch (IOException e) {
  17. throw new ConfigException("Error processing " + path, e);
  18. } catch (IllegalArgumentException e) {
  19. throw new ConfigException("Error processing " + path, e);
  20. }
  21. }
  22. }

3.3 单机模式zookeeper启动 ZooKeeperServerMain.main()

  1. public class ZooKeeperServerMain {
  2. public static void main(String[] args) {
  3. ZooKeeperServerMain main = new ZooKeeperServerMain();
  4. try {
  5. // 一样的套路
  6. main.initializeAndRun(args);
  7. } catch (IllegalArgumentException e) {
  8. ...
  9. }
  10. LOG.info("Exiting normally");
  11. System.exit(0);
  12. }
  13. protected void initializeAndRun(String[] args)
  14. throws ConfigException, IOException
  15. {
  16. try {
  17. // JDK提供的对应用程序提供管理监控功能的框架,非本文重点,先忽略
  18. ManagedUtil.registerLog4jMBeans();
  19. } catch (JMException e) {
  20. LOG.warn("Unable to register log4j JMX control", e);
  21. }
  22. ServerConfig config = new ServerConfig();
  23. // 解析zoo.cfg,同样是将文件解析到QuorumPeerConfig中,参考上面3.2
  24. if (args.length == 1) {
  25. config.parse(args[0]);
  26. } else {
  27. config.parse(args);
  28. }
  29. // 启动server
  30. runFromConfig(config);
  31. }
  32. public void runFromConfig(ServerConfig config) throws IOException {
  33. LOG.info("Starting server");
  34. FileTxnSnapLog txnLog = null;
  35. try {
  36. final ZooKeeperServer zkServer = new ZooKeeperServer();
  37. // 通过ZooKeeperServerShutdownHandler进行shutdownLatch锁释放
  38. final CountDownLatch shutdownLatch = new CountDownLatch(1);
  39. zkServer.registerServerShutdownHandler(
  40. new ZooKeeperServerShutdownHandler(shutdownLatch));
  41. // 日志文件和数据文件的处理类,后续单独文章来讲解
  42. txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
  43. config.dataDir));
  44. txnLog.setServerStats(zkServer.serverStats());
  45. zkServer.setTxnLogFactory(txnLog);
  46. // 设置基本参数
  47. zkServer.setTickTime(config.tickTime);
  48. zkServer.setMinSessionTimeout(config.minSessionTimeout);
  49. zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
  50. // 在这里创建端口服务类,用于监听端口请求,具体方法见4
  51. cnxnFactory = ServerCnxnFactory.createFactory();
  52. cnxnFactory.configure(config.getClientPortAddress(),
  53. config.getMaxClientCnxns());
  54. cnxnFactory.startup(zkServer);
  55. shutdownLatch.await();
  56. shutdown();
  57. cnxnFactory.join();
  58. if (zkServer.canShutdown()) {
  59. zkServer.shutdown(true);
  60. }
  61. } catch (InterruptedException e) {
  62. // warn, but generally this is ok
  63. LOG.warn("Server interrupted", e);
  64. } finally {
  65. if (txnLog != null) {
  66. txnLog.close();
  67. }
  68. }
  69. }
  70. }

4 zookeeper端口服务暴露监听

zookeeper中,默认对外开放2181端口,那么作为server端,肯定要对这个端口进行监听,接收client请求并处理,返回响应。

具体服务的创建都是通过ServerCnxnFactory来完成的。

4.1 ServerCnxnFactory.createFactory() 创建具体工厂类

  1. public abstract class ServerCnxnFactory {
  2. public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
  3. static public ServerCnxnFactory createFactory() throws IOException {
  4. String serverCnxnFactoryName =
  5. System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
  6. // 默认创建的是NIOServerCnxnFactory,其使用JDK原生的NIO来完成服务暴露监听
  7. if (serverCnxnFactoryName == null) {
  8. serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
  9. }
  10. try {
  11. ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
  12. .getDeclaredConstructor().newInstance();
  13. LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
  14. return serverCnxnFactory;
  15. } catch (Exception e) {
  16. IOException ioe = new IOException("Couldn't instantiate "
  17. + serverCnxnFactoryName);
  18. ioe.initCause(e);
  19. throw ioe;
  20. }
  21. }
  22. }

默认我们的服务是由NIOServerCnxnFactory来创建的。通过其源码可以看出,它是对JDK NIO的原生使用。

在性能方面,肯定是不如Netty的,所以在实际使用中,我们可以在系统中设置如下参数

-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory

主动将原生NIO服务切换为Netty服务以提高性能。

4.2 NIOServerCnxnFactory.configure() 加载配置、注册监听事件

  1. public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
  2. public void configure(InetSocketAddress addr, int maxcc) throws IOException {
  3. configureSaslLogin();
  4. thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
  5. thread.setDaemon(true);
  6. maxClientCnxns = maxcc;
  7. this.ss = ServerSocketChannel.open();
  8. ss.socket().setReuseAddress(true);
  9. LOG.info("binding to port " + addr);
  10. ss.socket().bind(addr);
  11. ss.configureBlocking(false);
  12. ss.register(selector, SelectionKey.OP_ACCEPT);
  13. }
  14. }

代码很简单,就是标准的NioServer启动端口,创建连接监听。

4.3 NIOServerCnxnFactory.startup() 启动zookeeper服务,加载相关数据

  1. public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
  2. public void startup(ZooKeeperServer zks) throws IOException,
  3. InterruptedException {
  4. // 这里本质上是启动NIOServerCnxnFactory.run()方法用于监听读写事件
  5. start();
  6. setZooKeeperServer(zks);
  7. // 加载数据到内存中,这里我们先知道有这个操作,后续分析事务日志时再一起说明下
  8. zks.startdata();
  9. // 最终启动,调用ZookeeperServer.startup(),具体见4.4
  10. zks.startup();
  11. }
  12. }

startup启动zookeeper服务时,需要启动监听客户端读写事件服务;加载数据到内存中

4.4 ZookeeperServer.startup() 启动服务

  1. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
  2. public synchronized void startup() {
  3. if (sessionTracker == null) {
  4. createSessionTracker();
  5. }
  6. // sessionTracker是会话管理器,负责会话的创建、管理清理等操作,这个后续我们专门一篇文章来说明
  7. startSessionTracker();
  8. // 业务处理器,后续介绍服务端处理请求时会详细说明
  9. setupRequestProcessors();
  10. registerJMX();
  11. setState(State.RUNNING);
  12. notifyAll();
  13. }
  14. }

总结:

一张图来了解下单机版Zookeeper服务启动全貌

到这,我们简单的分析了Zookeeper服务器单机版的启动过程。总体来说有以下内容:

1.启动端口服务监听,监听客户端的连接、读写等事件

2.启动会话管理器SessionTracker,用于管理、清理过期会话

3.加载数据到内存中

4.提供日志文件和数据文件的处理类

主要就是这些,本文对其骨架进行分析,后续上面这些点都会通过专门的文章来详细分析。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/948692
推荐阅读
相关标签
  

闽ICP备14008679号