当前位置:   article > 正文

ZooKeeper#2:分布式存储

ZooKeeper#2:分布式存储

本质上来说,ZK也是一种分布式存储系统,下面就从分布式存储的角度来看下ZK的设计跟实现。

服务路由

ZK采用的是无master设计(物理上,逻辑上还是有的),ZK客户端连接服务端的时候,需要传入一个连接字符串,

  1. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  2. boolean canBeReadOnly)
  3. throws IOException
  4. {
  5. LOG.info("Initiating client connection, connectString=" + connectString
  6. + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
  7. watchManager.defaultWatcher = watcher;
  8. ConnectStringParser connectStringParser = new ConnectStringParser(
  9. connectString);
  10. HostProvider hostProvider = new StaticHostProvider(
  11. connectStringParser.getServerAddresses());
  12. cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
  13. hostProvider, sessionTimeout, this, watchManager,
  14. getClientCnxnSocket(), canBeReadOnly);
  15. cnxn.start();
  16. }
也就是上面的 connectString ,最后连接到哪台机器,或者当前连接的机器挂掉需要重连,逻辑是在 StaticHostProvider#next
  1. public InetSocketAddress next(long spinDelay) {
  2. ++currentIndex;
  3. if (currentIndex == serverAddresses.size()) {
  4. currentIndex = 0;
  5. }
  6. if (currentIndex == lastIndex && spinDelay > 0) {
  7. try {
  8. Thread.sleep(spinDelay);
  9. } catch (InterruptedException e) {
  10. LOG.warn("Unexpected exception", e);
  11. }
  12. } else if (lastIndex == -1) {
  13. // We don't want to sleep on the first ever connect attempt.
  14. lastIndex = 0;
  15. }
  16. return serverAddresses.get(currentIndex);
  17. }

serverAddresses 并不是按我们传入的字符串排列,而是在构造StaticHostProvider时就被shuffle过了,
  1. public StaticHostProvider(Collection<InetSocketAddress> serverAddresses)
  2. throws UnknownHostException {
  3. for (InetSocketAddress address : serverAddresses) {
  4. InetAddress ia = address.getAddress();
  5. InetAddress resolvedAddresses[] = InetAddress.getAllByName((ia!=null) ? ia.getHostAddress():
  6. address.getHostName());
  7. for (InetAddress resolvedAddress : resolvedAddresses) {
  8. // If hostName is null but the address is not, we can tell that
  9. // the hostName is an literal IP address. Then we can set the host string as the hostname
  10. // safely to avoid reverse DNS lookup.
  11. // As far as i know, the only way to check if the hostName is null is use toString().
  12. // Both the two implementations of InetAddress are final class, so we can trust the return value of
  13. // the toString() method.
  14. if (resolvedAddress.toString().startsWith("/")
  15. && resolvedAddress.getAddress() != null) {
  16. this.serverAddresses.add(
  17. new InetSocketAddress(InetAddress.getByAddress(
  18. address.getHostName(),
  19. resolvedAddress.getAddress()),
  20. address.getPort()));
  21. } else {
  22. this.serverAddresses.add(new InetSocketAddress(resolvedAddress.getHostAddress(), address.getPort()));
  23. }
  24. }
  25. }
  26. if (this.serverAddresses.isEmpty()) {
  27. throw new IllegalArgumentException(
  28. "A HostProvider may not be empty!");
  29. }
  30. Collections.shuffle(this.serverAddresses);
  31. }

所以路由规则,就是随机策略

不过由于ZK对于一致性的设计(见下文),写操作需要由Leader节点处理,当非Leader节点(也就是Learner)接到写请求,需要转发给Leader,当然,对Client是透明的。读请求就直接本机处理了。

负载均衡

At the heart of ZooKeeper is an atomic messaging system that keeps all of the servers in sync.

所以所有服务器数据都是同步的(不过应该也只是最终一致,不是强一致,见下文),所以服务端不存在负载均衡的问题。而客户端的负载均衡,也就是服务路由,在上面已经说过了,就是随机策略。

容错机制

ZK的核心其实是一个消息系统(ZAB,ZooKeeper Atomic Broadcast),用来保持ZK服务器的数据同步。大致过程如下(类似两段提交):

  1. 集群启动后,投票选出一个Leader;
  2. Client发来写操作请求时,Leader向所有Follower发送Proposal消息;
  3. Follower记录请求,并给Leader返回ack;
  4. 当ack满足Quorum要求时,Leader给所有Learner发送COMMIT消息;
  5. Learner接收COMMIT消息并执行相应操作;

下面来看下每一步具体的一些点,Leader Election先跳过以后再说。Follower接收Leader消息的处理逻辑在Follower#processPacket

  1. protected void processPacket(QuorumPacket qp) throws IOException{
  2. switch (qp.getType()) {
  3. case Leader.PING:
  4. ping(qp);
  5. break;
  6. case Leader.PROPOSAL:
  7. TxnHeader hdr = new TxnHeader();
  8. Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
  9. if (hdr.getZxid() != lastQueued + 1) {
  10. LOG.warn("Got zxid 0x"
  11. + Long.toHexString(hdr.getZxid())
  12. + " expected 0x"
  13. + Long.toHexString(lastQueued + 1));
  14. }
  15. lastQueued = hdr.getZxid();
  16. fzk.logRequest(hdr, txn);
  17. break;
  18. case Leader.COMMIT:
  19. fzk.commit(qp.getZxid());
  20. break;
  21. case Leader.UPTODATE:
  22. LOG.error("Received an UPTODATE message after Follower started");
  23. break;
  24. case Leader.REVALIDATE:
  25. revalidate(qp);
  26. break;
  27. case Leader.SYNC:
  28. fzk.sync();
  29. break;
  30. }
  31. }

来看下上述第三步,Follower接收到Leader的Proposal后做了啥, FollowerZooKeeperServer#logRequest
  1. public void logRequest(TxnHeader hdr, Record txn) {
  2. Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
  3. hdr.getType(), null, null);
  4. request.hdr = hdr;
  5. request.txn = txn;
  6. request.zxid = hdr.getZxid();
  7. if ((request.zxid & 0xffffffffL) != 0) {
  8. // 放到队列里面
  9. pendingTxns.add(request);
  10. }
  11. // 接收到proposal就进行落盘操作了
  12. syncProcessor.processRequest(request);
  13. }

也就是说,当Follower收到Leader的Proposal消息时,只是先将请求放进队列当中。下面来看下Follower收到COMMIT消息的操作, FollowerZooKeeperServer#commit
  1. /**
  2. * When a COMMIT message is received, eventually this method is called,
  3. * which matches up the zxid from the COMMIT with (hopefully) the head of
  4. * the pendingTxns queue and hands it to the commitProcessor to commit.
  5. * @param zxid - must correspond to the head of pendingTxns if it exists
  6. */
  7. public void commit(long zxid) {
  8. if (pendingTxns.size() == 0) {
  9. LOG.warn("Committing " + Long.toHexString(zxid)
  10. + " without seeing txn");
  11. return;
  12. }
  13. long firstElementZxid = pendingTxns.element().zxid;
  14. if (firstElementZxid != zxid) {
  15. LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
  16. + " but next pending txn 0x"
  17. + Long.toHexString(firstElementZxid));
  18. System.exit(12);
  19. }
  20. Request request = pendingTxns.remove();
  21. commitProcessor.commit(request);
  22. }

commitProcessor.commit 也是将请求放进队列中,
  1. synchronized public void commit(Request request) {
  2. if (!finished) {
  3. if (request == null) {
  4. LOG.warn("Committed a null!",
  5. new Exception("committing a null! "));
  6. return;
  7. }
  8. if (LOG.isDebugEnabled()) {
  9. LOG.debug("Committing request:: " + request);
  10. }
  11. committedRequests.add(request);
  12. notifyAll();
  13. }

CommitProcessor有个线程会将commited的消息扔给FinalRequestProcessor去执行真正的写操作。

说完Follower来看下Leader在上述第四步做了啥。Leader处理ack的代码在Leader#processAck

  1. /**
  2. * Keep a count of acks that are received by the leader for a particular
  3. * proposal
  4. *
  5. * @param zxid
  6. * the zxid of the proposal sent out
  7. * @param followerAddr
  8. */
  9. synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
  10. if (LOG.isTraceEnabled()) {
  11. LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
  12. for (Proposal p : outstandingProposals.values()) {
  13. long packetZxid = p.packet.getZxid();
  14. LOG.trace("outstanding proposal: 0x{}",
  15. Long.toHexString(packetZxid));
  16. }
  17. LOG.trace("outstanding proposals all");
  18. }
  19. if ((zxid & 0xffffffffL) == 0) {
  20. /*
  21. * We no longer process NEWLEADER ack by this method. However,
  22. * the learner sends ack back to the leader after it gets UPTODATE
  23. * so we just ignore the message.
  24. */
  25. return;
  26. }
  27. if (outstandingProposals.size() == 0) {
  28. if (LOG.isDebugEnabled()) {
  29. LOG.debug("outstanding is 0");
  30. }
  31. return;
  32. }
  33. // 已commit的proposal
  34. if (lastCommitted >= zxid) {
  35. if (LOG.isDebugEnabled()) {
  36. LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
  37. Long.toHexString(lastCommitted), Long.toHexString(zxid));
  38. }
  39. // The proposal has already been committed
  40. return;
  41. }
  42. // 记录proposal的ack
  43. Proposal p = outstandingProposals.get(zxid);
  44. if (p == null) {
  45. LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
  46. Long.toHexString(zxid), followerAddr);
  47. return;
  48. }
  49. p.ackSet.add(sid);
  50. if (LOG.isDebugEnabled()) {
  51. LOG.debug("Count for zxid: 0x{} is {}",
  52. Long.toHexString(zxid), p.ackSet.size());
  53. }
  54. // 是否达到Quorum要求
  55. if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
  56. if (zxid != lastCommitted+1) {
  57. LOG.warn("Commiting zxid 0x{} from {} not first!",
  58. Long.toHexString(zxid), followerAddr);
  59. LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
  60. }
  61. outstandingProposals.remove(zxid);
  62. if (p.request != null) {
  63. toBeApplied.add(p);
  64. }
  65. if (p.request == null) {
  66. LOG.warn("Going to commmit null request for proposal: {}", p);
  67. }
  68. // 给follower发送COMMIT消息
  69. commit(zxid);
  70. // 给observer发送INFORM消息
  71. inform(p);
  72. // leader本地也要提交
  73. zk.commitProcessor.commit(p.request);
  74. if(pendingSyncs.containsKey(zxid)){
  75. for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
  76. sendSync(r);
  77. }
  78. }
  79. }
  80. }

Quorum有两种实现,默认是QuorumMaj,只要大于一半就算通过。另一种是带权重的实现,QuorumHierarchical

接下来看下什么情况下会触发Leader的重新选举。最简单的,假如当前Leader节点挂掉了,Follower的主循环会结束,然后回到QuorumPeer的主循环并且将节点状态设置为ServerState.LOOKING,接下来就会进行下一波Leader Election了。另外,Leader的主循环会一直去检测当前Follower的数量是否满足Quorum要求,如果不满足也会将自己与Learner的连接关闭,并设置状态为ServerState.LOOKING,当然也会触发下一波Leader Election了。

  1. while (true) {
  2. Thread.sleep(self.tickTime / 2);
  3. if (!tickSkip) {
  4. self.tick++;
  5. }
  6. HashSet<Long> syncedSet = new HashSet<Long>();
  7. // lock on the followers when we use it.
  8. syncedSet.add(self.getId());
  9. for (LearnerHandler f : getLearners()) {
  10. // Synced set is used to check we have a supporting quorum, so only
  11. // PARTICIPANT, not OBSERVER, learners should be used
  12. // f.synced()会使用syncLimit配置项
  13. if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
  14. syncedSet.add(f.getSid());
  15. }
  16. f.ping();
  17. }
  18. if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
  19. //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
  20. // Lost quorum, shutdown
  21. shutdown("Not sufficient followers synced, only synced with sids: [ "
  22. + getSidSetString(syncedSet) + " ]");
  23. // make sure the order is the same!
  24. // the leader goes to looking
  25. return;
  26. }
  27. tickSkip = !tickSkip;
  28. }

有多少Learner就会维护多少条LearnerHandler线程

平滑扩容

ZK3.5版本之前,扩容是很蛋疼的事情,需要rolling restart。到了3.5版本,终于可以进行动态配置。在原来的静态配置文件中,增加了一个配置项,dynamicConfigFile,例子如下,

  1. ## zoo_replicated1.cfg
  2. tickTime=2000
  3. dataDir=/zookeeper/data/zookeeper1
  4. initLimit=5
  5. syncLimit=2
  6. dynamicConfigFile=/zookeeper/conf/zoo_replicated1.cfg.dynamic


  1. ## zoo_replicated1.cfg.dynamic
  2. server.1=125.23.63.23:2780:2783:participant;2791
  3. server.2=125.23.63.24:2781:2784:participant;2792
  4. server.3=125.23.63.25:2782:2785:participant;2793

运行时通过reconfig命令可以修改配置。实现的逻辑也很简单,就是将配置存在了一个特殊的znode,/zookeeper/config

另外当扩容后,可以通过ZooKeeper#updateServerList对客户端的连接进行rebalance,背后使用了一个概率算法,要达到的效果就是,既能减少连接的迁移,又能做到负载均衡,如果只是简单的重新shuffle一下扩容后的服务器进行重连,那么迁移的成本就有点高了。

存储机制

ZK的数据文件参考之前的文章

操作日志文件的实现是FileTxnLog,格式说明如下,

  1. /**
  2. * This class implements the TxnLog interface. It provides api's
  3. * to access the txnlogs and add entries to it.
  4. * <p>
  5. * The format of a Transactional log is as follows:
  6. * <blockquote><pre>
  7. * LogFile:
  8. * FileHeader TxnList ZeroPad
  9. *
  10. * FileHeader: {
  11. * magic 4bytes (ZKLG)
  12. * version 4bytes
  13. * dbid 8bytes
  14. * }
  15. *
  16. * TxnList:
  17. * Txn || Txn TxnList
  18. *
  19. * Txn:
  20. * checksum Txnlen TxnHeader Record 0x42
  21. *
  22. * checksum: 8bytes Adler32 is currently used
  23. * calculated across payload -- Txnlen, TxnHeader, Record and 0x42
  24. *
  25. * Txnlen:
  26. * len 4bytes
  27. *
  28. * TxnHeader: {
  29. * sessionid 8bytes
  30. * cxid 4bytes
  31. * zxid 8bytes
  32. * time 8bytes
  33. * type 4bytes
  34. * }
  35. *
  36. * Record:
  37. * See Jute definition file for details on the various record types
  38. *
  39. * ZeroPad:
  40. * 0 padded to EOF (filled during preallocation stage)
  41. * </pre></blockquote>
  42. */

至于快照文件,之前已经介绍过通过修改源码输出XML格式来看看了。

参考资料

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/967514
推荐阅读
相关标签
  

闽ICP备14008679号