当前位置:   article > 正文

zookeeper源码(09)follower处理客户端请求

zookeeper源码(09)follower处理客户端请求

在zookeeper中,follower也可以接收客户端连接,处理客户端请求,本文将分析follower处理客户端请求的流程:

  • 读请求处理
  • 写请求转发与响应

follower接收转发客户端请求

网络层接收客户端数据包

leader、follower都会启动ServerCnxnFactory组件,用来接收客户端连接、读取客户端数据包、将客户端数据包转发给zk应用层。

在"zookeeper源码(08)请求处理及数据读写流程"一文中已经介绍,ServerCnxn在读取到客户端数据包之后,会调用zookeeperServer的processConnectRequest或processPacket方法:

  • processConnectRequest方法:创建session
  • processPacket方法:处理业务请求

processConnectRequest创建session

  • 会使用sessionTracker生成sessionId、创建session对象
  • 生成一个密码
  • 提交一个createSession类型Request并提交给业务处理器
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
    // 生成sessionId、创建session对象
    long sessionId = sessionTracker.createSession(timeout);
    // 生成密码
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    // 提交createSession类型Request
    CreateSessionTxn txn = new CreateSessionTxn(timeout);
    cnxn.setSessionId(sessionId);
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
    submitRequest(si);
    return sessionId;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

processPacket处理业务请求

  • 封装Request
  • 验证largeRequest
  • 提交业务层处理器
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
if (isLargeRequest(length)) {
    // checkRequestSize will throw IOException if request is rejected
    checkRequestSizeWhenMessageReceived(length);
    si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

FollowerRequestProcessor处理器

在follower端,客户端请求会由FollowerRequestProcessor处理:

  1. 把请求提交下游CommitProcessor处理器
  2. 写请求转发给leader处理
  3. 读请求经过CommitProcessor直接转发给FinalRequestProcessor处理器,直接查询数据返回给客户端
public void run() {
    try {
        while (!finished) {

            Request request = queuedRequests.take();

            // Screen quorum requests against ACLs first 略

            // 转发给CommitProcessor处理器
            // 提交到queuedRequests队列
            // 写请求还会提交到queuedWriteRequests队列
            maybeSendRequestToNextProcessor(request);

            // ...

            // 写请求需要转发给leader处理
            switch (request.type) {
            case OpCode.sync:
                zks.pendingSyncs.add(request); // 待同步命令
                zks.getFollower().request(request);
                break;
            case OpCode.create:
            case OpCode.create2:
            case OpCode.createTTL:
            case OpCode.createContainer:
            case OpCode.delete:
            case OpCode.deleteContainer:
            case OpCode.setData:
            case OpCode.reconfig:
            case OpCode.setACL:
            case OpCode.multi:
            case OpCode.check:
                zks.getFollower().request(request);
                break;
            case OpCode.createSession:
            case OpCode.closeSession:
                if (!request.isLocalSession()) {
                    zks.getFollower().request(request);
                }
                break;
            }
        }
    } catch (Exception e) {
        handleException(this.getName(), e);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

转发leader

zks.getFollower().request(request);
  • 1

Learner转发请求:

void request(Request request) throws IOException {
    // 略

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream oa = new DataOutputStream(baos);
    oa.writeLong(request.sessionId); // sessionId
    oa.writeInt(request.cxid); // 客户端xid
    oa.writeInt(request.type); // 业务类型
    byte[] payload = request.readRequestBytes(); // 请求体
    if (payload != null) {
        oa.write(payload);
    }
    oa.close();
    // 封装REQUEST数据包
    QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
    writePacket(qp, true); // 通过网络发给leader服务器
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

leader处理follower请求

LearnerHandler接收REQUEST请求

case Leader.REQUEST:
    bb = ByteBuffer.wrap(qp.getData());
    sessionId = bb.getLong(); // 解析请求信息
    cxid = bb.getInt();
    type = bb.getInt();
    bb = bb.slice();
    Request si;
    if (type == OpCode.sync) {
        si = new LearnerSyncRequest(
            this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
    } else {
        si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
    }
    si.setOwner(this); // 用来判断请求来自follower
    learnerMaster.submitLearnerRequest(si); // 提交给业务处理器
    requestsReceived.incrementAndGet();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

submitLearnerRequest提交业务处理器:

public void submitLearnerRequest(Request si) {
    zk.submitLearnerRequest(si);
}
  • 1
  • 2
  • 3

LeaderZooKeeperServer提交业务处理器:

public void submitLearnerRequest(Request request) {
    // 提交给PrepRequestProcessor处理器
    prepRequestProcessor.processRequest(request);
}
  • 1
  • 2
  • 3
  • 4

从此处开始走leader处理写请求流程。

leader处理写请求流程回顾

  • PrepRequestProcessor - 做事务设置
  • ProposalRequestProcessor - 发起proposal,将Request转发给SyncRequestProcessor写事务log、本地ack
  • CommitProcessor - 读请求直接调用下游处理器,写请求需要等待足够的ack之后commit再调用下游RequestProcessor处理器
  • ToBeAppliedRequestProcessor - 维护toBeApplied列表
  • FinalRequestProcessor - 把事务应用到ZKDatabase,提供查询功能,返回响应

follower处理leader数据

在follower中,Follower使用processPacket方法处理来自leader的数据包,此处看一下PROPOSAL和COMMIT的逻辑。

PROPOSAL数据包

fzk.logRequest(hdr, txn, digest);
  • 1

logRequest会使用syncProcessor将事务写入到txnlog文件,之后调用SendAckRequestProcessor处理器给leader发ack数据包。

leader收到超过半数的ack之后会发COMMIT数据包让各个节点将事务应用到ZKDatabase中。

COMMIT数据包

fzk.commit(qp.getZxid());
  • 1

CommitProcessor处理器会将其提交到committedRequests队列,之后客户端Request会继续向下游FinalRequestProcessor处理器传递。

FinalRequestProcessor处理器

  • 把事务应用到ZKDatabase中
  • 提供查询功能
  • 给客户端返回响应
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/152166
推荐阅读
相关标签
  

闽ICP备14008679号