赞
踩
在zookeeper中,follower也可以接收客户端连接,处理客户端请求,本文将分析follower处理客户端请求的流程:
leader、follower都会启动ServerCnxnFactory组件,用来接收客户端连接、读取客户端数据包、将客户端数据包转发给zk应用层。
在"zookeeper源码(08)请求处理及数据读写流程"一文中已经介绍,ServerCnxn在读取到客户端数据包之后,会调用zookeeperServer的processConnectRequest或processPacket方法:
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
// 生成sessionId、创建session对象
long sessionId = sessionTracker.createSession(timeout);
// 生成密码
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
// 提交createSession类型Request
CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);
在follower端,客户端请求会由FollowerRequestProcessor处理:
public void run() { try { while (!finished) { Request request = queuedRequests.take(); // Screen quorum requests against ACLs first 略 // 转发给CommitProcessor处理器 // 提交到queuedRequests队列 // 写请求还会提交到queuedWriteRequests队列 maybeSendRequestToNextProcessor(request); // ... // 写请求需要转发给leader处理 switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); // 待同步命令 zks.getFollower().request(request); break; case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: case OpCode.multi: case OpCode.check: zks.getFollower().request(request); break; case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { zks.getFollower().request(request); } break; } } } catch (Exception e) { handleException(this.getName(), e); } }
zks.getFollower().request(request);
Learner转发请求:
void request(Request request) throws IOException { // 略 ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); // sessionId oa.writeInt(request.cxid); // 客户端xid oa.writeInt(request.type); // 业务类型 byte[] payload = request.readRequestBytes(); // 请求体 if (payload != null) { oa.write(payload); } oa.close(); // 封装REQUEST数据包 QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); writePacket(qp, true); // 通过网络发给leader服务器 }
case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); // 解析请求信息 cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if (type == OpCode.sync) { si = new LearnerSyncRequest( this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); } si.setOwner(this); // 用来判断请求来自follower learnerMaster.submitLearnerRequest(si); // 提交给业务处理器 requestsReceived.incrementAndGet();
submitLearnerRequest提交业务处理器:
public void submitLearnerRequest(Request si) {
zk.submitLearnerRequest(si);
}
LeaderZooKeeperServer提交业务处理器:
public void submitLearnerRequest(Request request) {
// 提交给PrepRequestProcessor处理器
prepRequestProcessor.processRequest(request);
}
从此处开始走leader处理写请求流程。
在follower中,Follower使用processPacket方法处理来自leader的数据包,此处看一下PROPOSAL和COMMIT的逻辑。
fzk.logRequest(hdr, txn, digest);
logRequest会使用syncProcessor将事务写入到txnlog文件,之后调用SendAckRequestProcessor处理器给leader发ack数据包。
leader收到超过半数的ack之后会发COMMIT数据包让各个节点将事务应用到ZKDatabase中。
fzk.commit(qp.getZxid());
CommitProcessor处理器会将其提交到committedRequests队列,之后客户端Request会继续向下游FinalRequestProcessor处理器传递。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。