赞
踩
Zookeeper作为一个服务器,需要与客户端进行网络通信,Zookeeper使用ServerCnxFactory管理与客户端的连接,其中有两个实现,一个是NIOServerCnxnFactory,使用java原生Nio实现,一个是NettyServerCnxnFactory,使用netty实现。
使用的是java的NIO思路,1个accept Thread,该线程主要接收客户端的连接,并将其分配给selector thread; selector thread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个selector thread,使用zookeeper.nio.numSelectorThreads来进行配置该类线程数,默认个数为核心数/2;worker thread:该线程执行基本的操作,可以理解为真正的处理线程,使用zookeeper.nio.numWorkerThreads来进行配置该线程的线程数,默认的线程数为:核心线程数*2;connection expiration thread:连接上的session过期,则关闭该连接。
上面是对这个NIOServerCnxnFactory类类上的注释说明
- /**
- * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
- * NIO non-blocking socket calls. Communication between threads is handled via
- * queues.
- *
- * - 1 accept thread, which accepts new connections and assigns to a
- * selector thread
- * - 1-N selector threads, each of which selects on 1/N of the connections.
- * The reason the factory supports more than one selector thread is that
- * with large numbers of connections, select() itself can become a
- * performance bottleneck.
- * - 0-M socket I/O worker threads, which perform basic socket reads and
- * writes. If configured with 0 worker threads, the selector threads
- * do the socket I/O directly.
- * - 1 connection expiration thread, which closes idle connections; this is
- * necessary to expire connections on which no session is established.
- *
- * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
- * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
- */
- public class NIOServerCnxnFactory extends ServerCnxnFactory
下面是AcceptThread线程的源码,我先把里面的实现方法给删除了。该线程的执行流程:run方法执行selector.select(),并进行调用doAccept()接收客户端连接,因此我们需要重点关注doAccept()方法
- private class AcceptThread extends AbstractSelectThread {
-
- private final ServerSocketChannel acceptSocket;
- private final SelectionKey acceptKey;
- private final RateLogger acceptErrorLogger = new RateLogger(LOG);
- private final Collection<SelectorThread> selectorThreads;
- private Iterator<SelectorThread> selectorIterator;
- private volatile boolean reconfiguring = false;
-
- public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {
-
- }
-
- public void run() {
-
- }
-
- public void setReconfiguring() {
- reconfiguring = true;
- }
-
- private void select() {
-
-
- }
-
- /**
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。