当前位置:   article > 正文

zookeeper服务端初始化_zk初始化

zk初始化

一、ZK  服务端 启动脚本

1、Zookeeper 服务的启动命令是 zkServer.sh start

zkServer.sh start 底层的实际执行内容

  1. nohup "$JAVA"
  2. + 一堆提交参数
  3. + $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
  4. + "$ZOOCFG" (zkEnv.sh 文件中 ZOOCFG="zoo.cfg"

程序的入口是 QuorumPeerMain.java 类

二、ZK  服务端启动入口

1、QuorumPeerMain

  1. public static void main(String[] args) {
  2. // 创建了一个 zk 节点
  3. QuorumPeerMain main = new QuorumPeerMain();
  4. try {
  5. // 初始化节点并运行, ,args 相当于提交参数中的 zoo.cfg
  6. main.initializeAndRun(args);
  7. } catch (IllegalArgumentException e) {
  8. ... ...
  9. }
  10. LOG.info("Exiting normally");
  11. System.exit(0);
  12. }
  1. protected void initializeAndRun(String[] args)
  2. throws ConfigException, IOException, AdminServerException
  3. {
  4. // 管理 zk 的配置信息
  5. QuorumPeerConfig config = new QuorumPeerConfig();
  6. if (args.length == 1) {
  7. // 1 解析参数, ,zoo.cfg 和 myid
  8. config.parse(args[0]);
  9. }
  10. // 2 启动 定时 任务, 对过期的快照,执行删除 (默认该功能关闭)
  11. // Start and schedule the the purge task
  12. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  13. .getDataDir(), config.getDataLogDir(), config
  14. .getSnapRetainCount(), config.getPurgeInterval());
  15. purgeMgr.start();
  16. if (args.length == 1 && config.isDistributed()) {
  17. // 3 启动集群
  18. runFromConfig(config);
  19. } else {
  20. LOG.warn("Either no config or no quorum defined in config, running "
  21. + " in standalone mode");
  22. // there is only server in the quorum -- run as standalone
  23. ZooKeeperServerMain.main(args);
  24. }
  25. }

2、解析参数 zoo.cfg 和 和 myid

(1)QuorumPeerConfig

  1. public void parse(String path) throws ConfigException {
  2. LOG.info("Reading configuration from: " + path);
  3. try {
  4. // 校验文件路径及是否存在
  5. File configFile = (new VerifyingFileFactory.Builder(LOG)
  6. .warnForRelativePath()
  7. .failForNonExistingPath()
  8. .build()).create(path);
  9. Properties cfg = new Properties();
  10. FileInputStream in = new FileInputStream(configFile);
  11. try {
  12. // 加载配置文件
  13. cfg.load(in);
  14. configFileStr = path;
  15. } finally {
  16. in.close();
  17. }
  18. // 解析配置文件
  19. parseProperties(cfg);
  20. } catch (IOException e) {
  21. throw new ConfigException("Error processing " + path, e);
  22. } catch (IllegalArgumentException e) {
  23. throw new ConfigException("Error processing " + path, e);
  24. }
  25. ... ...
  26. }

(2)QuorumPeerConfig

  1. public void parseProperties(Properties zkProp)
  2. throws IOException, ConfigException {
  3. int clientPort = 0;
  4. int secureClientPort = 0;
  5. String clientPortAddress = null;
  6. String secureClientPortAddress = null;
  7. VerifyingFileFactory vff = new
  8. VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
  9. // 读取 zoo.cfg 文件中的属性值,并赋值给 QuorumPeerConfig 的类对象
  10. for (Entry<Object, Object> entry : zkProp.entrySet()) {
  11. String key = entry.getKey().toString().trim();
  12. String value = entry.getValue().toString().trim();
  13. if (key.equals("dataDir")) {
  14. dataDir = vff.create(value);
  15. } else if (key.equals("dataLogDir")) {
  16. dataLogDir = vff.create(value);
  17. } else if (key.equals("clientPort")) {
  18. clientPort = Integer.parseInt(value);
  19. } else if (key.equals("localSessionsEnabled")) {
  20. localSessionsEnabled = Boolean.parseBoolean(value);
  21. } else if (key.equals("localSessionsUpgradingEnabled")) {
  22. localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
  23. } else if (key.equals("clientPortAddress")) {
  24. clientPortAddress = value.trim();
  25. } else if (key.equals("secureClientPort")) {
  26. secureClientPort = Integer.parseInt(value);
  27. } else if (key.equals("secureClientPortAddress")){
  28. secureClientPortAddress = value.trim();
  29. } else if (key.equals("tickTime")) {
  30. tickTime = Integer.parseInt(value);
  31. } else if (key.equals("maxClientCnxns")) {
  32. maxClientCnxns = Integer.parseInt(value);
  33. } else if (key.equals("minSessionTimeout")) {
  34. minSessionTimeout = Integer.parseInt(value);
  35. }
  36. ... ...
  37. }
  38. ... ...
  39. if (dynamicConfigFileStr == null) {
  40. setupQuorumPeerConfig(zkProp, true);
  41. if (isDistributed() && isReconfigEnabled()) {
  42. // we don't backup static config for standalone mode.
  43. // we also don't backup if reconfig feature is disabled.
  44. backupOldConfig();
  45. }
  46. }
  47. }

(3)QuorumPeerConfig

  1. void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
  2. throws IOException, ConfigException {
  3. quorumVerifier = parseDynamicConfig(prop, electionAlg, true,
  4. configBackwardCompatibilityMode);
  5. setupMyId();
  6. setupClientPort();
  7. setupPeerType();
  8. checkValidity();
  9. }

(4)QuorumPeerConfig

  1. private void setupMyId() throws IOException {
  2. File myIdFile = new File(dataDir, "myid");
  3. // standalone server doesn't need myid file.
  4. if (!myIdFile.isFile()) {
  5. return;
  6. }
  7. BufferedReader br = new BufferedReader(new FileReader(myIdFile));
  8. String myIdString;
  9. try {
  10. myIdString = br.readLine();
  11. } finally {
  12. br.close();
  13. }
  14. try {
  15. // 将解析 myid 文件中的 id 赋值给 serverId
  16. serverId = Long.parseLong(myIdString);
  17. MDC.put("myid", myIdString);
  18. } catch (NumberFormatException e) {
  19. throw new IllegalArgumentException("serverid " + myIdString
  20. + " is not a number");
  21. }
  22. }

3、过期快照删除

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的

  1. protected void initializeAndRun(String[] args)
  2. throws ConfigException, IOException, AdminServerException
  3. {
  4. // 管理 zk 的配置信息
  5. QuorumPeerConfig config = new QuorumPeerConfig();
  6. if (args.length == 1) {
  7. // 1 解析参数,zoo.cfg 和 myid
  8. config.parse(args[0]);
  9. }
  10. // 2 启动 定时 任务, 对过期的快照,执行删除 (默认是关闭)
  11. // config.getSnapRetainCount() = 3 最少保留的快照个数
  12. // config.getPurgeInterval() = 0 默认 0 表示关闭
  13. // Start and schedule the the purge task
  14. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  15. .getDataDir(), config.getDataLogDir(), config
  16. .getSnapRetainCount(), config.getPurgeInterval());
  17. purgeMgr.start();
  18. if (args.length == 1 && config.isDistributed()) {
  19. // 3 启动集群
  20. runFromConfig(config);
  21. } else {
  22. LOG.warn("Either no config or no quorum defined in config, running "
  23. + " in standalone mode");
  24. // there is only server in the quorum -- run as standalone
  25. ZooKeeperServerMain.main(args);
  26. }
  27. }
  28. protected int snapRetainCount = 3;
  29. protected int purgeInterval = 0;
  30. public void start() {
  31. if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
  32. LOG.warn("Purge task is already running.");
  33. return;
  34. }
  35. // 默认情况 purgeInterval=0 ,该任务关闭,直接返回
  36. // Don't schedule the purge task with zero or negative purge interval.
  37. if (purgeInterval <= 0) {
  38. LOG.info("Purge task is not scheduled.");
  39. return;
  40. }
  41. // 创建一个定时器
  42. timer = new Timer("PurgeTask", true);
  43. // 创建一个清理快照任务
  44. TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
  45. // 如果 purgeInterval 设置的值是 1 ,表示 1 小时检查一次,判断是否有过期快照,
  46. 有则删除
  47. timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
  48. purgeTaskStatus = PurgeTaskStatus.STARTED;
  49. }
  50. static class PurgeTask extends TimerTask {
  51. private File logsDir;
  52. private File snapsDir;
  53. private int snapRetainCount;
  54. public PurgeTask(File dataDir, File snapDir, int count) {
  55. logsDir = dataDir;
  56. snapsDir = snapDir;
  57. snapRetainCount = count;
  58. }
  59. @Override
  60. public void run() {
  61. LOG.info("Purge task started.");
  62. try {
  63. // 清理过期的数据
  64. PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
  65. } catch (Exception e) {
  66. LOG.error("Error occurred while purging.", e);
  67. }
  68. LOG.info("Purge task completed.");
  69. }
  70. }
  71. public static void purge(File dataDir, File snapDir, int num) throws IOException {
  72. if (num < 3) {
  73. throw new IllegalArgumentException(COUNT_ERR_MSG);
  74. }
  75. FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
  76. List<File> snaps = txnLog.findNRecentSnapshots(num);
  77. int numSnaps = snaps.size();
  78. if (numSnaps > 0) {
  79. purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
  80. }
  81. }

4、初始化 通信组件

  1. protected void initializeAndRun(String[] args)
  2. throws ConfigException, IOException, AdminServerException
  3. {
  4. // 管理 zk 的配置信息
  5. QuorumPeerConfig config = new QuorumPeerConfig();
  6. if (args.length == 1) {
  7. // 1 解析参数,zoo.cfg 和 myid
  8. config.parse(args[0]);
  9. }
  10. // 2 启动 定时 任务, 对过期的快照,执行删除 (默认是关闭)
  11. // config.getSnapRetainCount() = 3 最少保留的快照个数
  12. // config.getPurgeInterval() = 0 默认 0 表示关闭
  13. // Start and schedule the the purge task
  14. DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
  15. .getDataDir(), config.getDataLogDir(), config
  16. .getSnapRetainCount(), config.getPurgeInterval());
  17. purgeMgr.start();
  18. if (args.length == 1 && config.isDistributed()) {
  19. // 3 启动集群 (集群模式)
  20. runFromConfig(config);
  21. } else {
  22. LOG.warn("Either no config or no quorum defined in config, running "
  23. + " in standalone mode");
  24. // there is only server in the quorum -- run as standalone
  25. // 本地模式
  26. ZooKeeperServerMain.main(args);
  27. }
  28. }

(1)通信协议默认 NIO (可以支持 Netty)

  1. public void runFromConfig(QuorumPeerConfig config)
  2. throws IOException, AdminServerException
  3. {
  4. … …
  5. LOG.info("Starting quorum peer");
  6. try {
  7. ServerCnxnFactory cnxnFactory = null;
  8. ServerCnxnFactory secureCnxnFactory = null;
  9. // 通信组件初始化,默认是 NIO 通信
  10. if (config.getClientPortAddress() != null) {
  11. cnxnFactory = ServerCnxnFactory.createFactory();
  12. cnxnFactory.configure(config.getClientPortAddress(),
  13. config.getMaxClientCnxns(), false);
  14. }
  15. if (config.getSecureClientPortAddress() != null) {
  16. secureCnxnFactory = ServerCnxnFactory.createFactory();
  17. secureCnxnFactory.configure(config.getSecureClientPortAddress(),
  18. config.getMaxClientCnxns(), true);
  19. }
  20. // 把解析的参数赋值给该 zookeeper 节点
  21. quorumPeer = getQuorumPeer();
  22. quorumPeer.setTxnFactory(new FileTxnSnapLog(
  23. config.getDataLogDir(),
  24. config.getDataDir()));
  25. quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
  26. quorumPeer.enableLocalSessionsUpgrading(
  27. config.isLocalSessionsUpgradingEnabled());
  28. //quorumPeer.setQuorumPeers(config.getAllMembers());
  29. quorumPeer.setElectionType(config.getElectionAlg());
  30. quorumPeer.setMyid(config.getServerId());
  31. quorumPeer.setTickTime(config.getTickTime());
  32. quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
  33. quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
  34. quorumPeer.setInitLimit(config.getInitLimit());
  35. quorumPeer.setSyncLimit(config.getSyncLimit());
  36. quorumPeer.setConfigFileName(config.getConfigFilename());
  37. // 管理 zk 数据的存储
  38. quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
  39. quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
  40. if (config.getLastSeenQuorumVerifier()!=null) {
  41. quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),
  42. false);
  43. }
  44. quorumPeer.initConfigInZKDatabase();
  45. // 管理 zk 的通信
  46. quorumPeer.setCnxnFactory(cnxnFactory);
  47. quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
  48. quorumPeer.setSslQuorum(config.isSslQuorum());
  49. quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
  50. quorumPeer.setLearnerType(config.getPeerType());
  51. quorumPeer.setSyncEnabled(config.getSyncEnabled());
  52. quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
  53. if (config.sslQuorumReloadCertFiles) {
  54. quorumPeer.getX509Util().enableCertFileReloading();
  55. }
  56. … …
  57. quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
  58. quorumPeer.initialize();
  59. // 启动 zk
  60. quorumPeer.start();
  61. quorumPeer.join();
  62. } catch (InterruptedException e) {
  63. // warn, but generally this is ok
  64. LOG.warn("Quorum Peer interrupted", e);
  65. }
  66. }
  67. static public ServerCnxnFactory createFactory() throws IOException {
  68. String serverCnxnFactoryName =
  69. System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
  70. if (serverCnxnFactoryName == null) {
  71. serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
  72. }
  73. try {
  74. ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)
  75. Class.forName(serverCnxnFactoryName)
  76. .getDeclaredConstructor().newInstance();
  77. LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
  78. return serverCnxnFactory;
  79. } catch (Exception e) {
  80. IOException ioe = new IOException("Couldn't instantiate "
  81. + serverCnxnFactoryName);
  82. ioe.initCause(e);
  83. throw ioe;
  84. }
  85. }
  86. public static final String ZOOKEEPER_SERVER_CNXN_FACTORY =
  87. "zookeeper.serverCnxnFactory";
  88. zookeeperAdmin.md 文件中
  89. * *serverCnxnFactory* :
  90. (Java system property: **zookeeper.serverCnxnFactory**)
  91. Specifies ServerCnxnFactory implementation.
  92. This should be set to `NettyServerCnxnFactory` in order to use TLS based server
  93. communication.
  94. Default is `NIOServerCnxnFactory`.

(2)初始化 NIO  服务端 Socket (并未启动) 

NIOServerCnxnFactory

  1. public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException
  2. {
  3. if (secure) {
  4. throw new UnsupportedOperationException("SSL isn't supported in
  5. NIOServerCnxn");
  6. }
  7. configureSaslLogin();
  8. maxClientCnxns = maxcc;
  9. sessionlessCnxnTimeout = Integer.getInteger(
  10. ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
  11. // We also use the sessionlessCnxnTimeout as expiring interval for
  12. // cnxnExpiryQueue. These don't need to be the same, but the expiring
  13. // interval passed into the ExpiryQueue() constructor below should be
  14. // less than or equal to the timeout.
  15. cnxnExpiryQueue =
  16. new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
  17. expirerThread = new ConnectionExpirerThread();
  18. int numCores = Runtime.getRuntime().availableProcessors();
  19. // 32 cores sweet spot seems to be 4 selector threads
  20. numSelectorThreads = Integer.getInteger(
  21. ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
  22. Math.max((int) Math.sqrt((float) numCores/2), 1));
  23. if (numSelectorThreads < 1) {
  24. throw new IOException("numSelectorThreads must be at least 1");
  25. }
  26. numWorkerThreads = Integer.getInteger(
  27. ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
  28. workerShutdownTimeoutMS = Long.getLong(
  29. ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
  30. ... ...
  31. for(int i=0; i<numSelectorThreads; ++i) {
  32. selectorThreads.add(new SelectorThread(i));
  33. }
  34. // 初始化 NIO 服务端 socket ,绑定 2181 端口,可以接收客户端请求
  35. this.ss = ServerSocketChannel.open();
  36. ss.socket().setReuseAddress(true);
  37. LOG.info("binding to port " + addr);
  38. // 绑定 2181 端口
  39. ss.socket().bind(addr);
  40. ss.configureBlocking(false);
  41. acceptThread = new AcceptThread(ss, addr, selectorThreads);
  42. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/804586
推荐阅读
相关标签
  

闽ICP备14008679号