赞
踩
ConnectionFactory主要屏蔽底层Netty通信API,为整个通讯提供一致的模型,也就是将Channel泛化为更加接近业务的Connection对象。
目录
本章重点分析ConnectionFactory实现。首先展示类图,涉及ConnectionFactory,AbstractConnectionFactory,和DefaultConnectionFactory,具体可以参考模板方法;
AbstractConnectionFactory为抽象工厂类,实现ConnectFactory封装了Netty的Bootstrap
初始化BootStrap,并且设置WorkerGroup,以及Channel的相关参数,设置高低水位 ByteBufAllocator,ChannelInitializer
// 初始化方法,真正调用Netty的BootStrap初始化,然后对Netty参数进行调优;
- @Override
- public void init(final ConnectionEventHandler connectionEventHandler) {
- bootstrap = new Bootstrap();
- //设置Channel的相关参数,从ConfigManager获取
- bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSocketChannelClass())
- .option(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
- .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())
- .option(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
-
- //设置高低水位, init netty write buffer water mark
- initWriteBufferWaterMark();
-
- // 设置ByteBuf的分配器init byte buf allocator
- if (ConfigManager.netty_buffer_pooled()) {
- this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- } else {
- this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
- }
-
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel channel) {
- ChannelPipeline pipeline = channel.pipeline();
- //设置编解码器,这款要实现Netty提供的相关接口
- pipeline.addLast("decoder", codec.newDecoder());
- pipeline.addLast("encoder", codec.newEncoder());
-
- boolean idleSwitch = ConfigManager.tcp_idle_switch();
- if (idleSwitch) {
- pipeline.addLast("idleStateHandler",
- new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0,
- TimeUnit.MILLISECONDS));
- pipeline.addLast("heartbeatHandler", heartbeatHandler);
- }
- //设置连接时间处理器
- pipeline.addLast("connectionEventHandler", connectionEventHandler);
- pipeline.addLast("handler", handler);
- }
- });
- }

-
- // do方法一般真正逻辑的方法,负责真正connect服务
- protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout)
- throws Exception {
- // prevent unreasonable value, at least 1000
- connectTimeout = Math.max(connectTimeout, 1000);
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
- //调用Bootstrap发生真正链接,连接过程阻塞
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
- future.awaitUninterruptibly();
- //此处省略future结果的处理
- return future.channel();
- }
-
-
- // 封装doCreateConnection,提供多元化操作,方便调用方使用
- @Override
- public Connection createConnection(String targetIP, int targetPort, byte version,
- int connectTimeout) throws Exception {
- Channel channel = doCreateConnection(targetIP, targetPort, connectTimeout);
- //构造Connection对象,主要设置协议版本;也就是一个Connection只能发送指定的协议对策消息,
- Connection conn = new Connection(channel,
- ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE), version, new Url(targetIP,
- targetPort));
- //触发Connect事件,之前注册在时间处理器生效;
- channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
- return conn;
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。