当前位置:   article > 正文

zookeeper源码分析(五)——网络通信组件ServerCnxnFactory

zookeeper源码分析(五)——网络通信组件ServerCnxnFactory

1.概述

Zookeeper作为一个服务器,需要与客户端进行网络通信,Zookeeper使用ServerCnxFactory管理与客户端的连接,其中有两个实现,一个是NIOServerCnxnFactory,使用java原生Nio实现,一个是NettyServerCnxnFactory,使用netty实现。

2.NIOServerCnxnFactory

使用的是java的NIO思路,1个accept Thread,该线程主要接收客户端的连接,并将其分配给selector thread; selector thread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个selector thread,使用zookeeper.nio.numSelectorThreads来进行配置该类线程数,默认个数为核心数/2;worker thread:该线程执行基本的操作,可以理解为真正的处理线程,使用zookeeper.nio.numWorkerThreads来进行配置该线程的线程数,默认的线程数为:核心线程数*2;connection expiration thread:连接上的session过期,则关闭该连接。

上面是对这个NIOServerCnxnFactory类类上的注释说明

  1. /**
  2. * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
  3. * NIO non-blocking socket calls. Communication between threads is handled via
  4. * queues.
  5. *
  6. * - 1 accept thread, which accepts new connections and assigns to a
  7. * selector thread
  8. * - 1-N selector threads, each of which selects on 1/N of the connections.
  9. * The reason the factory supports more than one selector thread is that
  10. * with large numbers of connections, select() itself can become a
  11. * performance bottleneck.
  12. * - 0-M socket I/O worker threads, which perform basic socket reads and
  13. * writes. If configured with 0 worker threads, the selector threads
  14. * do the socket I/O directly.
  15. * - 1 connection expiration thread, which closes idle connections; this is
  16. * necessary to expire connections on which no session is established.
  17. *
  18. * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
  19. * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
  20. */
  21. public class NIOServerCnxnFactory extends ServerCnxnFactory

2.1 AcceptThread

下面是AcceptThread线程的源码,我先把里面的实现方法给删除了。该线程的执行流程:run方法执行selector.select(),并进行调用doAccept()接收客户端连接,因此我们需要重点关注doAccept()方法

  1. private class AcceptThread extends AbstractSelectThread {
  2. private final ServerSocketChannel acceptSocket;
  3. private final SelectionKey acceptKey;
  4. private final RateLogger acceptErrorLogger = new RateLogger(LOG);
  5. private final Collection<SelectorThread> selectorThreads;
  6. private Iterator<SelectorThread> selectorIterator;
  7. private volatile boolean reconfiguring = false;
  8. public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {
  9. }
  10. public void run() {
  11. }
  12. public void setReconfiguring() {
  13. reconfiguring = true;
  14. }
  15. private void select() {
  16. }
  17. /**
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/552440
推荐阅读
相关标签
  

闽ICP备14008679号