赞
踩
ZooKeeper 的 Leader 选举步骤如下:
更新内部选举周期。
初始化选票,第一次选自己。
将选票放到发送队列中,然后通过网络发送给所有参与者。
从接收队列中获取其它参与者发送的选票。
判断此时的服务器的状态。
如果是 Looking 状态
统计选票,如果有选票获得半数以上支持,再从接收队列中循环获取新的选票,并且与上文得到半数以上的选票进行比较 epoch、zxid、sid,选出更优的选票作为最终的选票,最后确定服务器状态。
如果是 Observing 状态,仅日志记录。
如果是 Leading、Following 状态
在 QuorumPeerMain 的 main 方法中接连调用 initializeAndRun、runFromConfig 方法,而 runFromConfig 方法中定义了如下一行代码:
quorumPeer.start();
@Override public void run() { updateThreadName(); LOG.debug("Starting quorum peer"); try { jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(jmxQuorumBean, null); for (QuorumServer s : getView().values()) { ZKMBeanInfo p; if (getId() == s.id) { p = jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, jmxQuorumBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxLocalPeerBean = null; } } else { RemotePeerBean rBean = new RemotePeerBean(this, s); try { MBeanRegistry.getInstance().register(rBean, jmxQuorumBean); jmxRemotePeerBean.put(s.id, rBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxQuorumBean = null; } try { while (running) { if (unavailableStartTime == 0) { unavailableStartTime = Time.currentElapsedTime(); } switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); ServerMetrics.getMetrics().LOOKING_COUNT.add(1); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); Thread roZkMgr = new Thread() { public void run() { try { sleep(Math.max(2000, tickTime)); if (ServerState.LOOKING.equals(getPeerState())) { roZk.startup(); } } catch (InterruptedException e) { LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception e) { LOG.error("FAILED to start ReadOnlyZooKeeperServer", e); } } }; try { roZkMgr.start(); reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } finally { roZkMgr.interrupt(); roZk.shutdown(); } } else { try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; case OBSERVING: try { LOG.info("OBSERVING"); setObserver(makeObserver(logFactory)); observer.observeLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { observer.shutdown(); setObserver(null); updateServerState(); if (isRunning()) { Observer.waitForObserverElectionDelay(); } } break; case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } break; } } } finally { LOG.warn("QuorumPeer main thread exited"); MBeanRegistry instance = MBeanRegistry.getInstance(); instance.unregister(jmxQuorumBean); instance.unregister(jmxLocalPeerBean); for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) { instance.unregister(remotePeerBean); } jmxQuorumBean = null; jmxLocalPeerBean = null; jmxRemotePeerBean = null; } }
在 star
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。