当前位置:   article > 正文

zookeeper leader选举

zookeeper leader选举

1. 前言

本文讲述的是zk集群管理中核心算法,Leader选举。

zookeeper有很多用途,比如可以做分布式锁,分布式监听事件,集群管理,负载均衡,发布订阅等等。我认为这些功能用法和redis差不多。这里不讨论用法,我只想纪录下Leader选举的思路。

2. 服务器角色介绍

在zookeeper集群中,分别有Leader、Follower、Observer、Learner四种类型的服务器角色。

  • Leader

Leader服务器是整个zookeeper集群的核心,是集群的领导者,主要有以下两个功能:

a. 事务请求的唯一调度和处理者,保证集群处理事务的顺序性

b. 集群内部各服务器调度者

  • Follower

Follower服务器在zookeeper集群只能算备胎,主要有以下三个功能:

a. 处理客户端非事务请求,转发事务请求给Leader服务器

b. 参与事务请求proposal投票,这个地方没有理解,不过我觉得应该在zookeeper集群中管理其他中间件集群用的吧

c. 参与Leader选举投票

d. 数据备份

  • Observer

Observer服务器充当了一个观察者的角色,其工作原理与Follower类似,只处理非事务请求,会将事务请求转发给Leader服务器。Observer服务器不参与任何形式的投票。言下之意是连当备胎的权力都没有。

a. 处理客户端非事务请求,转发事务请求给Leader服务器

b. 接收事务请求proposal投票,但不参与过程

c. 数据备份

  • Learner

Learner服务器负责管理Follower和Observer与Leader服务器之间的网络通信。这地方还没有去研究

3.Leader选举

在清朝还没入关时期,那时还叫大金国,当时老汗王努尔哈赤去世了,但是没有留下传位遗诏,后金是一个部落不能一日无主啊,当时皇太极在军中的威望很高,而且非常有远见,因此在老汗王的诸多王子中脱颖而出,所以就选举了皇太极为他们的leader。

再来看看zookeeper,当时的大金国就相当于现在的zookeeper集群,当时的老汗王就相当于上一个Leader服务器,并且已经挂掉了,皇太极就相当于现任被选举出来的Leader服务器,而其他的王子就相当于Follower就相当于备胎了,当然还有Observer,连选举的权力都没有。

那么zookeeper是如何来选举的呢?

3.1 概述

既然是选举,那就必然有投票。

每个服务器都有一个id,是服务器的唯一标识。除了这个还有一个事务id,一般来讲哪个服务器的数据越新,就代表哪个服务器的事务id越大,为什么要说这个呢,因为后面有用O(∩_∩)O

假设有三台服务器server1、server2、server3分别对应自身的id为1、2、3,事务id为4、4、5,并且server2是Leader,其余两台是Follower。

如果server2挂了,那么当前集群需要重新选择Leader。server1首先投自己投票为(1,2),server3也投自己(3,5)。第一轮投票完成后server1接收到了server3的投票,那么这个时候就要进行竞争了。首先比较第二个数,第二个数代表事务id,2比5小,那好吧,server1承认没server3厉害,自动把选票改为(3,5)。到server3进行竞争了,在server3看来server显然没达到自己的标准,因此第二轮投票不变。在第二轮选票中大家都投server3,因此server3成了新的Leader。

3.2 具体实现

上面举了一个例子简单介绍了选举操作,接下来看看具体的实现吧。

下面截取了zookeeper3.6版本的一部分源码

先来看看有哪些状态吧:

public enum ServerState {
    //代表没有当前集群中没有Leader,此时是投票选举状态
    LOOKING, 
    //代表已经是伴随者状态
    FOLLOWING, 
    //代表已经是领导者了
    LEADING, 
    //代表已经是观察者了
    OBSERVING;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

再来看看投票实体类:

public class Vote {
    /**
     * 投票的版本号,作为一个标识
     */
    final private int version;

    /**
     * 当前服务器唯一标识
     */
    final private long id;
    
    /**
     * 当前服务器事务id
     */
    final private long zxid;
    
    /**
     * 当前服务器选举的轮次
     */
    final private long electionEpoch;
    
    /**
     * 被推举服务器的选举轮次
     */
    final private long peerEpoch;
    
    /**
     * 当前服务器所处的状态
     */
    final private ServerState state;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

首先进入选举环节的服务器。

zookeeper使用的是FastLeaderElection算法。

在org.apache.zookeeper.server.quorum.FastLeaderElection类里面有个属性叫logicalclock,用来计算投票轮次,它会进行自增

synchronized(this){
    //自增
    logicalclock.incrementAndGet();
    //投票投自己
    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

服务器初始化自己的信息后会把投票信息通过sendNotifications()方法放到队列中发送给其余的服务器

sendNotifications();
  • 1

之后会接收其他服务器的投票

Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
  • 1
  • 2

接受完后对选举轮次进行判断

  • 当接收到的服务器投票的轮次大于自己的轮次,那么更新到接收到的轮次,并且将自己的选票和接收到的选票进行竞争,如果竞争成功,竞争完后放入队列发送
  • 当接收到的服务器轮次小于自己的轮次,则不做处理
  • 当接受到的服务器轮次等于自己的轮次,同样进行选票竞争,竞争完后放入队列中发送

当操作判断完成后则会统计当前投票,如果有集群中的参与投票的服务器有一半的服务器都选的是当前这个选票,那么就终止投票。

h a l f = ( n 2 ) + 1 half=(\frac{n}{2})+1 half=(2n)+1

可以参考org.apache.zookeeper.server.quorum.flexible.QuorumMaj,这里不做源码的详细分析。

public boolean containsQuorum(Set<Long> ackSet) {
    return (ackSet.size() > half);
}
  • 1
  • 2
  • 3

在终止后,如果当前服务器被选为Leader,那么状态就修改为LEADING。其余的服务器状态则修改为FOLLOWING或者OBSERVING。

注意:zk3.6版本判断是否超过半数支持相同id的逻辑已经改了,直接在voteSet.hasAllQuorums()进行判断

case LOOKING:
    if (getInitLastLoggedZxid() == -1) {
        LOG.debug("Ignoring notification as our zxid is -1");
        break;
    }
    if (n.zxid == -1) {
        LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
        break;
    }

    //接收到的服务器轮次大于自己的轮次的情况
    if (n.electionEpoch > logicalclock.get()) {
        logicalclock.set(n.electionEpoch);
        //清空当前服务器自己的投票
        recvset.clear();
        //竞争
        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
            updateProposal(n.leader, n.zxid, n.peerEpoch);
        } else {
            updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());
        }
        //放入队列发送
        sendNotifications();
    } 
    //接收到的服务器轮次小于自己的轮次,并不做任何处理
    else if (n.electionEpoch < logicalclock.get()) {
        if(LOG.isDebugEnabled()){
            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"+ Long.toHexString(n.electionEpoch)+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
        }
        break;
    }
    //接受到的服务器轮次等于自己的轮次
    else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {
        updateProposal(n.leader, n.zxid, n.peerEpoch);
        sendNotifications();
    }

    if(LOG.isDebugEnabled()){
        LOG.debug("Adding vote: from=" + n.sid +", proposed leader=" + n.leader +", proposed zxid=0x" + Long.toHexString(n.zxid) +", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
    }

    //将选票放入集合中
    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

    //进行统计选票
    voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch));

    //判断是否已有超过半数的服务器投票为相同的id
    if (voteSet.hasAllQuorums()) {

        while((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){
            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){
                recvqueue.put(n);
                break;
            }
        }

        //确定leader
        if (n == null) {
            setPeerState(proposedLeader, voteSet);
            Vote endVote = new Vote(proposedLeader,proposedZxid, proposedEpoch);
            leaveInstance(endVote);
            return endVote;
        }
    }
    break;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

以下代码是进行竞争

  • newId:接收服务器的id
  • newZxid:接收服务器的事务id
  • newEpoch:接收服务器的被选举的轮次
  • curId:当前服务器的id
  • curZxid:当前服务器的事务id
  • curEpoch:当前服务器的被选举的轮次
  • 当newEpoch>curEpoch 代表当前服务器竞争失败,需要把投票改成接收来的服务器
  • 当newEpoch==curEpoch,那就需要比较事务id,当newZxid>curZxid时也需要将投票改成接收服务器的投票
  • 当newZxid==curZxid,那么就比较服务器自身的id,当newId>curId,也需要将投票改成接收服务器的投票

投票的轮次一般不会有很大的变化,实际上事务id的变化可能会不必投票的轮次大,毕竟投票的轮次也就是Leader服务器挂了或者没有Leader的时候就一瞬间的变化,而zxid代表最后一次更新时当前节点的事务id,这是一段时间内的。

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }

    return ((newEpoch > curEpoch) ||((newEpoch == curEpoch) &&((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. 结语

以上就是zookeeper leader选举部分,也是zk的核心部分。看了《从Paxos到Zookeeper 分布式一致性原理与实践》这本书,并且主要看了leader选举这一节,因此结合源码深入学习了这一部分。写这本书的目的:首先是为了巩固知识点加深印象,其次就是想练练手O(∩_∩)O。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号