赞
踩
本质上来说,ZK也是一种分布式存储系统,下面就从分布式存储的角度来看下ZK的设计跟实现。
ZK采用的是无master设计(物理上,逻辑上还是有的),ZK客户端连接服务端的时候,需要传入一个连接字符串,
也就是上面的
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
connectString
,最后连接到哪台机器,或者当前连接的机器挂掉需要重连,逻辑是在
StaticHostProvider#next
,
public InetSocketAddress next(long spinDelay) { ++currentIndex; if (currentIndex == serverAddresses.size()) { currentIndex = 0; } if (currentIndex == lastIndex && spinDelay > 0) { try { Thread.sleep(spinDelay); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } else if (lastIndex == -1) { // We don't want to sleep on the first ever connect attempt. lastIndex = 0; } return serverAddresses.get(currentIndex); }
serverAddresses
并不是按我们传入的字符串排列,而是在构造StaticHostProvider时就被shuffle过了,
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) throws UnknownHostException { for (InetSocketAddress address : serverAddresses) { InetAddress ia = address.getAddress(); InetAddress resolvedAddresses[] = InetAddress.getAllByName((ia!=null) ? ia.getHostAddress(): address.getHostName()); for (InetAddress resolvedAddress : resolvedAddresses) { // If hostName is null but the address is not, we can tell that // the hostName is an literal IP address. Then we can set the host string as the hostname // safely to avoid reverse DNS lookup. // As far as i know, the only way to check if the hostName is null is use toString(). // Both the two implementations of InetAddress are final class, so we can trust the return value of // the toString() method. if (resolvedAddress.toString().startsWith("/") && resolvedAddress.getAddress() != null) { this.serverAddresses.add( new InetSocketAddress(InetAddress.getByAddress( address.getHostName(), resolvedAddress.getAddress()), address.getPort())); } else { this.serverAddresses.add(new InetSocketAddress(resolvedAddress.getHostAddress(), address.getPort())); } } } if (this.serverAddresses.isEmpty()) { throw new IllegalArgumentException( "A HostProvider may not be empty!"); } Collections.shuffle(this.serverAddresses); }
所以路由规则,就是随机策略。
不过由于ZK对于一致性的设计(见下文),写操作需要由Leader节点处理,当非Leader节点(也就是Learner)接到写请求,需要转发给Leader,当然,对Client是透明的。读请求就直接本机处理了。
At the heart of ZooKeeper is an atomic messaging system that keeps all of the servers in sync.
所以所有服务器数据都是同步的(不过应该也只是最终一致,不是强一致,见下文),所以服务端不存在负载均衡的问题。而客户端的负载均衡,也就是服务路由,在上面已经说过了,就是随机策略。
ZK的核心其实是一个消息系统(ZAB,ZooKeeper Atomic Broadcast),用来保持ZK服务器的数据同步。大致过程如下(类似两段提交):
下面来看下每一步具体的一些点,Leader Election先跳过以后再说。Follower接收Leader消息的处理逻辑在Follower#processPacket
,
protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: ping(qp); break; case Leader.PROPOSAL: TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: fzk.sync(); break; } }
FollowerZooKeeperServer#logRequest
,- public void logRequest(TxnHeader hdr, Record txn) {
- Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
- hdr.getType(), null, null);
- request.hdr = hdr;
- request.txn = txn;
- request.zxid = hdr.getZxid();
- if ((request.zxid & 0xffffffffL) != 0) {
- // 放到队列里面
- pendingTxns.add(request);
- }
- // 接收到proposal就进行落盘操作了
- syncProcessor.processRequest(request);
- }
FollowerZooKeeperServer#commit
,
/** * When a COMMIT message is received, eventually this method is called, * which matches up the zxid from the COMMIT with (hopefully) the head of * the pendingTxns queue and hands it to the commitProcessor to commit. * @param zxid - must correspond to the head of pendingTxns if it exists */ public void commit(long zxid) { if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } long firstElementZxid = pendingTxns.element().zxid; if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } Request request = pendingTxns.remove(); commitProcessor.commit(request); }
commitProcessor.commit
也是将请求放进队列中,- synchronized public void commit(Request request) {
- if (!finished) {
- if (request == null) {
- LOG.warn("Committed a null!",
- new Exception("committing a null! "));
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Committing request:: " + request);
- }
- committedRequests.add(request);
- notifyAll();
- }
而CommitProcessor
有个线程会将commited的消息扔给FinalRequestProcessor
去执行真正的写操作。
说完Follower来看下Leader在上述第四步做了啥。Leader处理ack的代码在Leader#processAck
,
/** * Keep a count of acks that are received by the leader for a particular * proposal * * @param zxid * the zxid of the proposal sent out * @param followerAddr */ synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { if (LOG.isTraceEnabled()) { LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid)); for (Proposal p : outstandingProposals.values()) { long packetZxid = p.packet.getZxid(); LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid)); } LOG.trace("outstanding proposals all"); } if ((zxid & 0xffffffffL) == 0) { /* * We no longer process NEWLEADER ack by this method. However, * the learner sends ack back to the leader after it gets UPTODATE * so we just ignore the message. */ return; } if (outstandingProposals.size() == 0) { if (LOG.isDebugEnabled()) { LOG.debug("outstanding is 0"); } return; } // 已commit的proposal if (lastCommitted >= zxid) { if (LOG.isDebugEnabled()) { LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid)); } // The proposal has already been committed return; } // 记录proposal的ack Proposal p = outstandingProposals.get(zxid); if (p == null) { LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr); return; } p.ackSet.add(sid); if (LOG.isDebugEnabled()) { LOG.debug("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size()); } // 是否达到Quorum要求 if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ if (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr); LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1)); } outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } if (p.request == null) { LOG.warn("Going to commmit null request for proposal: {}", p); } // 给follower发送COMMIT消息 commit(zxid); // 给observer发送INFORM消息 inform(p); // leader本地也要提交 zk.commitProcessor.commit(p.request); if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } } } }
Quorum有两种实现,默认是QuorumMaj,只要大于一半就算通过。另一种是带权重的实现,QuorumHierarchical。
接下来看下什么情况下会触发Leader的重新选举。最简单的,假如当前Leader节点挂掉了,Follower的主循环会结束,然后回到QuorumPeer的主循环并且将节点状态设置为ServerState.LOOKING
,接下来就会进行下一波Leader Election了。另外,Leader的主循环会一直去检测当前Follower的数量是否满足Quorum要求,如果不满足也会将自己与Learner的连接关闭,并设置状态为ServerState.LOOKING,当然也会触发下一波Leader Election了。
while (true) { Thread.sleep(self.tickTime / 2); if (!tickSkip) { self.tick++; } HashSet<Long> syncedSet = new HashSet<Long>(); // lock on the followers when we use it. syncedSet.add(self.getId()); for (LearnerHandler f : getLearners()) { // Synced set is used to check we have a supporting quorum, so only // PARTICIPANT, not OBSERVER, learners should be used // f.synced()会使用syncLimit配置项 if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) { syncedSet.add(f.getSid()); } f.ping(); } if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) { //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) { // Lost quorum, shutdown shutdown("Not sufficient followers synced, only synced with sids: [ " + getSidSetString(syncedSet) + " ]"); // make sure the order is the same! // the leader goes to looking return; } tickSkip = !tickSkip; }
有多少Learner就会维护多少条LearnerHandler线程。
ZK3.5版本之前,扩容是很蛋疼的事情,需要rolling restart。到了3.5版本,终于可以进行动态配置。在原来的静态配置文件中,增加了一个配置项,dynamicConfigFile
,例子如下,
- ## zoo_replicated1.cfg
- tickTime=2000
- dataDir=/zookeeper/data/zookeeper1
- initLimit=5
- syncLimit=2
- dynamicConfigFile=/zookeeper/conf/zoo_replicated1.cfg.dynamic
- ## zoo_replicated1.cfg.dynamic
- server.1=125.23.63.23:2780:2783:participant;2791
- server.2=125.23.63.24:2781:2784:participant;2792
- server.3=125.23.63.25:2782:2785:participant;2793
运行时通过reconfig命令可以修改配置。实现的逻辑也很简单,就是将配置存在了一个特殊的znode,/zookeeper/config
。
另外当扩容后,可以通过ZooKeeper#updateServerList
对客户端的连接进行rebalance,背后使用了一个概率算法,要达到的效果就是,既能减少连接的迁移,又能做到负载均衡,如果只是简单的重新shuffle一下扩容后的服务器进行重连,那么迁移的成本就有点高了。
ZK的数据文件参考之前的文章。
操作日志文件的实现是FileTxnLog,格式说明如下,
/** * This class implements the TxnLog interface. It provides api's * to access the txnlogs and add entries to it. * <p> * The format of a Transactional log is as follows: * <blockquote><pre> * LogFile: * FileHeader TxnList ZeroPad * * FileHeader: { * magic 4bytes (ZKLG) * version 4bytes * dbid 8bytes * } * * TxnList: * Txn || Txn TxnList * * Txn: * checksum Txnlen TxnHeader Record 0x42 * * checksum: 8bytes Adler32 is currently used * calculated across payload -- Txnlen, TxnHeader, Record and 0x42 * * Txnlen: * len 4bytes * * TxnHeader: { * sessionid 8bytes * cxid 4bytes * zxid 8bytes * time 8bytes * type 4bytes * } * * Record: * See Jute definition file for details on the various record types * * ZeroPad: * 0 padded to EOF (filled during preallocation stage) * </pre></blockquote> */
至于快照文件,之前已经介绍过通过修改源码输出XML格式来看看了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。