赞
踩
zkServer.sh start 底层的实际执行内容
- nohup "$JAVA"
- + 一堆提交参数
- + $ZOOMAIN(org.apache.zookeeper.server.quorum.QuorumPeerMain)
- + "$ZOOCFG" (zkEnv.sh 文件中 ZOOCFG="zoo.cfg")
程序的入口是 QuorumPeerMain.java 类
- public static void main(String[] args) {
- // 创建了一个 zk 节点
- QuorumPeerMain main = new QuorumPeerMain();
- try {
- // 初始化节点并运行, ,args 相当于提交参数中的 zoo.cfg
- main.initializeAndRun(args);
- } catch (IllegalArgumentException e) {
- ... ...
- }
- LOG.info("Exiting normally");
- System.exit(0);
- }
- protected void initializeAndRun(String[] args)
- throws ConfigException, IOException, AdminServerException
- {
- // 管理 zk 的配置信息
- QuorumPeerConfig config = new QuorumPeerConfig();
- if (args.length == 1) {
- // 1 解析参数, ,zoo.cfg 和 myid
- config.parse(args[0]);
- }
- // 2 启动 定时 任务, 对过期的快照,执行删除 (默认该功能关闭)
- // Start and schedule the the purge task
- DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
- .getDataDir(), config.getDataLogDir(), config
- .getSnapRetainCount(), config.getPurgeInterval());
- purgeMgr.start();
- if (args.length == 1 && config.isDistributed()) {
- // 3 启动集群
- runFromConfig(config);
- } else {
- LOG.warn("Either no config or no quorum defined in config, running "
- + " in standalone mode");
- // there is only server in the quorum -- run as standalone
- ZooKeeperServerMain.main(args);
- }
- }

(1)QuorumPeerConfig
- public void parse(String path) throws ConfigException {
- LOG.info("Reading configuration from: " + path);
- try {
- // 校验文件路径及是否存在
- File configFile = (new VerifyingFileFactory.Builder(LOG)
- .warnForRelativePath()
- .failForNonExistingPath()
- .build()).create(path);
- Properties cfg = new Properties();
- FileInputStream in = new FileInputStream(configFile);
- try {
- // 加载配置文件
- cfg.load(in);
- configFileStr = path;
- } finally {
- in.close();
- }
- // 解析配置文件
- parseProperties(cfg);
- } catch (IOException e) {
- throw new ConfigException("Error processing " + path, e);
- } catch (IllegalArgumentException e) {
- throw new ConfigException("Error processing " + path, e);
- }
- ... ...
- }

(2)QuorumPeerConfig
- public void parseProperties(Properties zkProp)
- throws IOException, ConfigException {
- int clientPort = 0;
- int secureClientPort = 0;
- String clientPortAddress = null;
- String secureClientPortAddress = null;
- VerifyingFileFactory vff = new
- VerifyingFileFactory.Builder(LOG).warnForRelativePath().build();
- // 读取 zoo.cfg 文件中的属性值,并赋值给 QuorumPeerConfig 的类对象
- for (Entry<Object, Object> entry : zkProp.entrySet()) {
- String key = entry.getKey().toString().trim();
- String value = entry.getValue().toString().trim();
- if (key.equals("dataDir")) {
- dataDir = vff.create(value);
- } else if (key.equals("dataLogDir")) {
- dataLogDir = vff.create(value);
- } else if (key.equals("clientPort")) {
- clientPort = Integer.parseInt(value);
- } else if (key.equals("localSessionsEnabled")) {
- localSessionsEnabled = Boolean.parseBoolean(value);
- } else if (key.equals("localSessionsUpgradingEnabled")) {
- localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
- } else if (key.equals("clientPortAddress")) {
- clientPortAddress = value.trim();
- } else if (key.equals("secureClientPort")) {
- secureClientPort = Integer.parseInt(value);
- } else if (key.equals("secureClientPortAddress")){
- secureClientPortAddress = value.trim();
- } else if (key.equals("tickTime")) {
- tickTime = Integer.parseInt(value);
- } else if (key.equals("maxClientCnxns")) {
- maxClientCnxns = Integer.parseInt(value);
- } else if (key.equals("minSessionTimeout")) {
- minSessionTimeout = Integer.parseInt(value);
- }
- ... ...
- }
-
- ... ...
- if (dynamicConfigFileStr == null) {
- setupQuorumPeerConfig(zkProp, true);
- if (isDistributed() && isReconfigEnabled()) {
- // we don't backup static config for standalone mode.
- // we also don't backup if reconfig feature is disabled.
- backupOldConfig();
- }
- }
- }

(3)QuorumPeerConfig
- void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode)
- throws IOException, ConfigException {
- quorumVerifier = parseDynamicConfig(prop, electionAlg, true,
- configBackwardCompatibilityMode);
- setupMyId();
- setupClientPort();
- setupPeerType();
- checkValidity();
- }
(4)QuorumPeerConfig
- private void setupMyId() throws IOException {
- File myIdFile = new File(dataDir, "myid");
- // standalone server doesn't need myid file.
- if (!myIdFile.isFile()) {
- return;
- }
- BufferedReader br = new BufferedReader(new FileReader(myIdFile));
- String myIdString;
- try {
- myIdString = br.readLine();
- } finally {
- br.close();
- }
- try {
- // 将解析 myid 文件中的 id 赋值给 serverId
- serverId = Long.parseLong(myIdString);
- MDC.put("myid", myIdString);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("serverid " + myIdString
- + " is not a number");
- }
- }

可以启动定时任务,对过期的快照,执行删除。默认该功能时关闭的
- protected void initializeAndRun(String[] args)
- throws ConfigException, IOException, AdminServerException
- {
- // 管理 zk 的配置信息
- QuorumPeerConfig config = new QuorumPeerConfig();
-
- if (args.length == 1) {
- // 1 解析参数,zoo.cfg 和 myid
- config.parse(args[0]);
- }
- // 2 启动 定时 任务, 对过期的快照,执行删除 (默认是关闭)
- // config.getSnapRetainCount() = 3 最少保留的快照个数
- // config.getPurgeInterval() = 0 默认 0 表示关闭
- // Start and schedule the the purge task
- DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
- .getDataDir(), config.getDataLogDir(), config
- .getSnapRetainCount(), config.getPurgeInterval());
- purgeMgr.start();
- if (args.length == 1 && config.isDistributed()) {
- // 3 启动集群
- runFromConfig(config);
- } else {
- LOG.warn("Either no config or no quorum defined in config, running "
- + " in standalone mode");
- // there is only server in the quorum -- run as standalone
- ZooKeeperServerMain.main(args);
- }
- }
- protected int snapRetainCount = 3;
- protected int purgeInterval = 0;
- public void start() {
- if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
- LOG.warn("Purge task is already running.");
- return;
- }
- // 默认情况 purgeInterval=0 ,该任务关闭,直接返回
- // Don't schedule the purge task with zero or negative purge interval.
- if (purgeInterval <= 0) {
- LOG.info("Purge task is not scheduled.");
- return;
- }
- // 创建一个定时器
- timer = new Timer("PurgeTask", true);
- // 创建一个清理快照任务
- TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
- // 如果 purgeInterval 设置的值是 1 ,表示 1 小时检查一次,判断是否有过期快照,
- 有则删除
- timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
- purgeTaskStatus = PurgeTaskStatus.STARTED;
- }
- static class PurgeTask extends TimerTask {
- private File logsDir;
- private File snapsDir;
- private int snapRetainCount;
-
- public PurgeTask(File dataDir, File snapDir, int count) {
- logsDir = dataDir;
- snapsDir = snapDir;
- snapRetainCount = count;
- }
- @Override
- public void run() {
- LOG.info("Purge task started.");
- try {
- // 清理过期的数据
- PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
- } catch (Exception e) {
- LOG.error("Error occurred while purging.", e);
- }
- LOG.info("Purge task completed.");
- }
- }
- public static void purge(File dataDir, File snapDir, int num) throws IOException {
- if (num < 3) {
- throw new IllegalArgumentException(COUNT_ERR_MSG);
- }
- FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
- List<File> snaps = txnLog.findNRecentSnapshots(num);
- int numSnaps = snaps.size();
- if (numSnaps > 0) {
- purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
- }
- }

- protected void initializeAndRun(String[] args)
- throws ConfigException, IOException, AdminServerException
- {
- // 管理 zk 的配置信息
- QuorumPeerConfig config = new QuorumPeerConfig();
- if (args.length == 1) {
- // 1 解析参数,zoo.cfg 和 myid
- config.parse(args[0]);
- }
- // 2 启动 定时 任务, 对过期的快照,执行删除 (默认是关闭)
- // config.getSnapRetainCount() = 3 最少保留的快照个数
- // config.getPurgeInterval() = 0 默认 0 表示关闭
- // Start and schedule the the purge task
- DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
- .getDataDir(), config.getDataLogDir(), config
- .getSnapRetainCount(), config.getPurgeInterval());
- purgeMgr.start();
- if (args.length == 1 && config.isDistributed()) {
- // 3 启动集群 (集群模式)
-
- runFromConfig(config);
- } else {
- LOG.warn("Either no config or no quorum defined in config, running "
- + " in standalone mode");
- // there is only server in the quorum -- run as standalone
- // 本地模式
- ZooKeeperServerMain.main(args);
- }
- }

(1)通信协议默认 NIO (可以支持 Netty)
- public void runFromConfig(QuorumPeerConfig config)
- throws IOException, AdminServerException
- {
- … …
- LOG.info("Starting quorum peer");
- try {
- ServerCnxnFactory cnxnFactory = null;
- ServerCnxnFactory secureCnxnFactory = null;
- // 通信组件初始化,默认是 NIO 通信
- if (config.getClientPortAddress() != null) {
- cnxnFactory = ServerCnxnFactory.createFactory();
- cnxnFactory.configure(config.getClientPortAddress(),
- config.getMaxClientCnxns(), false);
- }
- if (config.getSecureClientPortAddress() != null) {
- secureCnxnFactory = ServerCnxnFactory.createFactory();
- secureCnxnFactory.configure(config.getSecureClientPortAddress(),
- config.getMaxClientCnxns(), true);
- }
- // 把解析的参数赋值给该 zookeeper 节点
- quorumPeer = getQuorumPeer();
- quorumPeer.setTxnFactory(new FileTxnSnapLog(
- config.getDataLogDir(),
- config.getDataDir()));
- quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
- quorumPeer.enableLocalSessionsUpgrading(
- config.isLocalSessionsUpgradingEnabled());
- //quorumPeer.setQuorumPeers(config.getAllMembers());
- quorumPeer.setElectionType(config.getElectionAlg());
- quorumPeer.setMyid(config.getServerId());
- quorumPeer.setTickTime(config.getTickTime());
- quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
- quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
- quorumPeer.setInitLimit(config.getInitLimit());
- quorumPeer.setSyncLimit(config.getSyncLimit());
- quorumPeer.setConfigFileName(config.getConfigFilename());
- // 管理 zk 数据的存储
- quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
- quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
- if (config.getLastSeenQuorumVerifier()!=null) {
- quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(),
- false);
- }
- quorumPeer.initConfigInZKDatabase();
- // 管理 zk 的通信
- quorumPeer.setCnxnFactory(cnxnFactory);
- quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
- quorumPeer.setSslQuorum(config.isSslQuorum());
- quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
- quorumPeer.setLearnerType(config.getPeerType());
- quorumPeer.setSyncEnabled(config.getSyncEnabled());
- quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
- if (config.sslQuorumReloadCertFiles) {
- quorumPeer.getX509Util().enableCertFileReloading();
- }
- … …
- quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
- quorumPeer.initialize();
- // 启动 zk
- quorumPeer.start();
- quorumPeer.join();
- } catch (InterruptedException e) {
- // warn, but generally this is ok
- LOG.warn("Quorum Peer interrupted", e);
- }
- }
- static public ServerCnxnFactory createFactory() throws IOException {
- String serverCnxnFactoryName =
- System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
- if (serverCnxnFactoryName == null) {
- serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
- }
- try {
- ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)
- Class.forName(serverCnxnFactoryName)
- .getDeclaredConstructor().newInstance();
- LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
- return serverCnxnFactory;
- } catch (Exception e) {
- IOException ioe = new IOException("Couldn't instantiate "
- + serverCnxnFactoryName);
- ioe.initCause(e);
- throw ioe;
- }
- }
- public static final String ZOOKEEPER_SERVER_CNXN_FACTORY =
- "zookeeper.serverCnxnFactory";
- zookeeperAdmin.md 文件中
- * *serverCnxnFactory* :
- (Java system property: **zookeeper.serverCnxnFactory**)
- Specifies ServerCnxnFactory implementation.
- This should be set to `NettyServerCnxnFactory` in order to use TLS based server
- communication.
- Default is `NIOServerCnxnFactory`.

(2)初始化 NIO 服务端 Socket (并未启动)
NIOServerCnxnFactory
- public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException
- {
- if (secure) {
- throw new UnsupportedOperationException("SSL isn't supported in
- NIOServerCnxn");
- }
- configureSaslLogin();
- maxClientCnxns = maxcc;
- sessionlessCnxnTimeout = Integer.getInteger(
- ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
- // We also use the sessionlessCnxnTimeout as expiring interval for
- // cnxnExpiryQueue. These don't need to be the same, but the expiring
- // interval passed into the ExpiryQueue() constructor below should be
- // less than or equal to the timeout.
- cnxnExpiryQueue =
- new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
- expirerThread = new ConnectionExpirerThread();
- int numCores = Runtime.getRuntime().availableProcessors();
- // 32 cores sweet spot seems to be 4 selector threads
- numSelectorThreads = Integer.getInteger(
- ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
- Math.max((int) Math.sqrt((float) numCores/2), 1));
- if (numSelectorThreads < 1) {
- throw new IOException("numSelectorThreads must be at least 1");
- }
- numWorkerThreads = Integer.getInteger(
- ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
- workerShutdownTimeoutMS = Long.getLong(
- ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
- ... ...
- for(int i=0; i<numSelectorThreads; ++i) {
- selectorThreads.add(new SelectorThread(i));
- }
- // 初始化 NIO 服务端 socket ,绑定 2181 端口,可以接收客户端请求
- this.ss = ServerSocketChannel.open();
- ss.socket().setReuseAddress(true);
- LOG.info("binding to port " + addr);
- // 绑定 2181 端口
- ss.socket().bind(addr);
- ss.configureBlocking(false);
- acceptThread = new AcceptThread(ss, addr, selectorThreads);
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。