赞
踩
为了防止单点故障,zookeeper 提供了集群模式 ,集群模式下具有三个角色
leader:设置leader 的目的是为了将事务请求进行排序,所有事务请求必须由一个全局唯一的服务器来协调处理
follower:参与leader的选举投票 及 事务请求的投票 以及 将事务请求转发给 leader
observer:只提供非事务请求,在不影响集群事务处理能力的前提下提升非事务处理的能力,observer不需要参与投票的过程,但是必须要同步leader的数据,从而在处理请求的时候保证数据的一致性
【分布式】Zookeeper的Leader选举-选举过程介绍比较清晰
QuorumPeerMain.main()
分析入口
public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(ExitCode.UNEXPECTED_ERROR.getValue()); } } protected void initializeAndRun(String[] args) throws ConfigException, IOException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager( config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //如果是集群,则读取配置的信息 if (args.length == 1 && config.servers.size() > 0) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } } public void runFromConfig(QuorumPeerConfig config) throws IOException { LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // 从配置文件中加载配置信息 quorumPeer = new QuorumPeer(config.getServers(), new File(config.getDataDir()), new File(config.getDataLogDir()), //将配置文件中的选举类型给quorumPeer对象,默认类型是3 config.getElectionAlg(), config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit(), config.getQuorumListenOnAllIPs(), cnxnFactory, config.getQuorumVerifier()); quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if(quorumPeer.isQuorumSaslAuthEnabled()){ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); // 开始启动 quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
QuorumPeer
public synchronized void start() { //加载数据 loadDataBase(); cnxnFactory.start(); //开始选举 leader startLeaderElection(); // 此时调用 run() super.start(); } public void run() { try { while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); try { roZkMgr.start(); setBCVote(null); //todo leader选举核心内容 //设置当前的投票,通过策略模式来决定当前用哪个选举算法来进行领导选举, //默认是FastLeaderElection setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception",e); setPeerState(ServerState.LOOKING); } finally { // If the thread is in the the grace period, interrupt // to come out of waiting. roZkMgr.interrupt(); roZk.shutdown(); } } break; } } } finally { LOG.warn("QuorumPeer main thread exited"); try { MBeanRegistry.getInstance().unregisterAll(); } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } jmxQuorumBean = null; jmxLocalPeerBean = null; } }
FastLeaderElection
public Vote lookForLeader() throws InterruptedException { try { self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { self.start_fle = System.currentTimeMillis(); } try { /** * 封装接受的投票信息 */ HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); /** * 退出选举是的封装结果:存储选举的结果信息 */ HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); int notTimeout = finalizeWait; synchronized(this){ //增加逻辑时钟 logicalclock++; //将自己的myid,zxid,epoch封装到自己的对象中 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); //发送通知,将包含自己信息的实体添加到发送队列中 sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ //如果是LOOKING状态,则开始选举 while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ //从接受队列中拿到投屏消息,自己的投票也在这里面处理 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ //未取到结果 if(n == null){ if(manager.haveDelivered()){ //如果空闲的情况下,消息发送完了,继续重新发送,直到选举中leader为止 sendNotifications(); } else { //消息还没投递出去,可能是其他server还没启动,尝试再连接 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } //如果取到结果,校验接受到消息是否属于这个集群中的 else if(self.getVotingView().containsKey(n.sid)) { /* * Only proceed if the vote comes from a replica in the * voting view. */ //选举leader switch (n.state) { case LOOKING: // If notification > current, replace and send messages out //如果通知的消息大于当前的消息,则表示当前时新一轮的选举, //将通知消息的epoch号(朝代号)给当前的对象 // logicalclock 逻辑时钟,一开始是1,表示是新的开始, //如果取出来的消息的epoch 比当前的逻辑时钟大的话,表示逻辑时钟已经是落后的朝代了,进行重新赋值 if (n.electionEpoch > logicalclock) { logicalclock = n.electionEpoch; // 启动接受的投票信息 recvset.clear(); //比较当前取出的消息是否可以胜出, /** 比较规则: 首先比较epoch,大者获胜 其次,epoch相同时比较zxid,大者获胜 最后比较myid,大者获胜 */ //TODO 比较myid,zxid,epoch,将当前对象的票据改成胜者的票据 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { //当前消息获胜的话,将投票的票据(leader、zxid、epoch)信息改成当前消息的票据 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { //否则,将当前的票据进行保持 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } // 继续广播,即将当前对象包含票据的信息添加到发送队列中, sendNotifications(); } else if (n.electionEpoch < logicalclock) { // 如果取出来的消息的 epoch 比当前的逻辑时钟小,说明该消息已经是以前的了。不做任何处理 if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock." + "n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock)); } break; // 这里表示 取出来的消息的 epoch 和逻辑时钟相等,就继续比较zxid 和 myid , // 如果胜出,则将该消息的票据进行修改,并且继续广播 } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } //将当前胜出的最终投票结果添加到接受投票信息的集合中,用来做选举的最终结果的判断 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //统计recvset集合,查看投某个id的票数是否超过一半 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /* * This predicate is true once we don't read any new * relevant message from the reception queue */ //确定leader if (n == null) { //修改状态,LEADING or FOLLOWING or OBSERVEING self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); //返回最终投票结果 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch); leaveInstance(endVote); return endVote; } } break; //OBSERVING机器不参数选举 case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; //如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经正在运行的zk集群时 case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch * together. */ if(n.electionEpoch == logicalclock){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); if(ooePredicate(recvset, outofelection, n)) { self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock = n.electionEpoch; self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } finally { try { if(self.jmxLeaderElectionBean != null){ MBeanRegistry.getInstance().unregister( self.jmxLeaderElectionBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount()); } }
初始化两个队列, 一个是发送队列 sendqueue,一个是接受队列 recvQueue
创建并启动两个线程,一个是发送线程 WorkerSender,一个是接受线程 WorkerReceiver
发送通知投票数据
- 在选举的时候,进行 FastLeaderElection#sendNotifications 时,
将遍历所有的服务,并将票据+sid 放到发送队列中(sendqueue),- 此时 WorkerSender 线程已经启动并 sendqueue.poll .
- 获取到 票据 + sid 并将通过sid 进行通信.这个时候分两种情况,
- 如果是A服务发送票据通知,也会给自己,当自己收到的时候, 会通过{#addToRecvQueue()} 添加到自己的接受队列中(recvQueue)。
- 如果不是自己, 则通过 sid 进行通信进行发送。 并启动俩个线程 {sendWorker:用来发送数据投票,recvWork: 用来接受投票数据}
接收投票数据
QuorumCnxManager.Listener listener = qcm.listener 开启了一个监听, 并用来接收其他服务传过来的投票数据
由于启动了一个RecvWorker 线程来接受投票数据 ,将数据通过{#addToRecvQueue()} 方法添加到接受队列(recvQueue)中 ,如果是 LOOKING 状态下, 选举方法中会正在获取 recvQueue 的值。
接收线程 WorkerReceiver 正在循环通过 pollRecvQueue() 方法获取 {接受队列 recvQueue} 中的数据。 这块还在捋
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。