赞
踩
com.rabbitmq:amqp-client:4.0.3
RabbitMQ
版本声明: 3.6.15主要用于NIO事件循环的管理,根据配置参数初始化读写Buffer以及启动事件循环。初始化分析参见RabbitMQ客户端源码分析(二)之Frame与FrameHandler
构造方法:根据NioParams
配置的readByteBufferSize
和writeByteBufferSize
生成读写创建读写Buffer
,默认大小32768
。
public class NioLoopContext { private static final Logger LOGGER = LoggerFactory.getLogger(NioLoopContext.class); private final SocketChannelFrameHandlerFactory socketChannelFrameHandlerFactory; private final ExecutorService executorService; private final ThreadFactory threadFactory; final ByteBuffer readBuffer, writeBuffer; SelectorHolder readSelectorState; SelectorHolder writeSelectorState; public NioLoopContext(SocketChannelFrameHandlerFactory socketChannelFrameHandlerFactory, NioParams nioParams) { this.socketChannelFrameHandlerFactory = socketChannelFrameHandlerFactory; this.executorService = nioParams.getNioExecutor(); this.threadFactory = nioParams.getThreadFactory(); this.readBuffer = ByteBuffer.allocate(nioParams.getReadByteBufferSize()); this.writeBuffer = ByteBuffer.allocate(nioParams.getWriteByteBufferSize()); } }
初始化方法initStateIfNecessary()
:创建读写Selector
,启动NIO事件循环
void initStateIfNecessary() throws IOException { if (this.readSelectorState == null) { //Selector.open() 创建一个Selector this.readSelectorState = new SelectorHolder(Selector.open()); this.writeSelectorState = new SelectorHolder(Selector.open()); startIoLoops(); } } private void startIoLoops() { if (executorService == null) { Thread nioThread = Environment.newThread( threadFactory, new NioLoop(socketChannelFrameHandlerFactory.nioParams, this), "rabbitmq-nio" ); nioThread.start(); } else { this.executorService.submit(new NioLoop(socketChannelFrameHandlerFactory.nioParams, this)); } }
NioLoop
的代码写的相对比较晦涩,而且整体代码的逻辑不够清晰。主要就是对读事件和写事件的处理public class NioLoop implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(NioLoop.class); private final NioLoopContext context; private final NioParams nioParams; public NioLoop(NioParams nioParams, NioLoopContext loopContext) { this.nioParams = nioParams; this.context = loopContext; } @Override public void run() { final SelectorHolder selectorState = context.readSelectorState; final Selector selector = selectorState.selector; final Set<SocketChannelRegistration> registrations = selectorState.registrations; final ByteBuffer buffer = context.readBuffer; final SelectorHolder writeSelectorState = context
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。