当前位置:   article > 正文

Sofa_bolt下的ConnectionFactory设计解析_sofabolt connection

sofabolt connection

       ConnectionFactory主要屏蔽底层Netty通信API,为整个通讯提供一致的模型,也就是将Channel泛化为更加接近业务的Connection对象。 

目录

1 类图设计

2 初始化方法(init)

3 连接方法(Connect)


1 类图设计

本章重点分析ConnectionFactory实现。首先展示类图,涉及ConnectionFactory,AbstractConnectionFactory,和DefaultConnectionFactory,具体可以参考模板方法;

 

AbstractConnectionFactory为抽象工厂类,实现ConnectFactory封装了Netty的Bootstrap


2 初始化方法(init)

     初始化BootStrap,并且设置WorkerGroup,以及Channel的相关参数,设置高低水位 ByteBufAllocator,ChannelInitializer
// 初始化方法,真正调用Netty的BootStrap初始化,然后对Netty参数进行调优;

  1. @Override
  2.     public void init(final ConnectionEventHandler connectionEventHandler) {
  3.         bootstrap = new Bootstrap();
  4.         //设置Channel的相关参数,从ConfigManager获取
  5.         bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSocketChannelClass())
  6.             .option(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
  7.             .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())
  8.             .option(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
  9.         //设置高低水位, init netty write buffer water mark
  10.         initWriteBufferWaterMark();
  11.         // 设置ByteBuf的分配器init byte buf allocator
  12.         if (ConfigManager.netty_buffer_pooled()) {
  13.             this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  14.         } else {
  15.             this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
  16.         }
  17.         bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  18.             @Override
  19.             protected void initChannel(SocketChannel channel) {
  20.                 ChannelPipeline pipeline = channel.pipeline();
  21.                 //设置编解码器,这款要实现Netty提供的相关接口
  22.                 pipeline.addLast("decoder", codec.newDecoder());
  23.                 pipeline.addLast("encoder", codec.newEncoder());
  24.                 boolean idleSwitch = ConfigManager.tcp_idle_switch();
  25.                 if (idleSwitch) {
  26.                     pipeline.addLast("idleStateHandler",
  27.                         new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0,
  28.                             TimeUnit.MILLISECONDS));
  29.                     pipeline.addLast("heartbeatHandler", heartbeatHandler);
  30.                 }
  31.                 //设置连接时间处理器
  32.                 pipeline.addLast("connectionEventHandler", connectionEventHandler);
  33.                 pipeline.addLast("handler", handler);
  34.             }
  35.         });
  36.     }

3 连接方法(Connect)

  1. // do方法一般真正逻辑的方法,负责真正connect服务
  2. protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout)
  3. throws Exception {
  4. // prevent unreasonable value, at least 1000
  5. connectTimeout = Math.max(connectTimeout, 1000);
  6. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
  7. //调用Bootstrap发生真正链接,连接过程阻塞
  8. ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
  9. future.awaitUninterruptibly();
  10. //此处省略future结果的处理
  11. return future.channel();
  12. }
  13. // 封装doCreateConnection,提供多元化操作,方便调用方使用
  14. @Override
  15. public Connection createConnection(String targetIP, int targetPort, byte version,
  16. int connectTimeout) throws Exception {
  17. Channel channel = doCreateConnection(targetIP, targetPort, connectTimeout);
  18. //构造Connection对象,主要设置协议版本;也就是一个Connection只能发送指定的协议对策消息,
  19. Connection conn = new Connection(channel,
  20. ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE), version, new Url(targetIP,
  21. targetPort));
  22. //触发Connect事件,之前注册在时间处理器生效;
  23. channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
  24. return conn;
  25. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/742754
推荐阅读
相关标签
  

闽ICP备14008679号