当前位置:   article > 正文

springboot集成netty实战

springboot集成netty

今天我们分享springboot集成netty的过程:

1、jar包依赖,Netty服务端和客户端依赖的jar包一样:

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.28.Final</version>
  5. </dependency>
  6. <!-- 序列化jar -->
  7. <dependency>
  8. <groupId>de.javakaffee</groupId>
  9. <artifactId>kryo-serializers</artifactId>
  10. <version>0.42</version>
  11. </dependency>

2、Netty服务端核心代码(boot启动类省略):

2.1、netty功能启动入口

  1. import io.netty.channel.ChannelInitializer;
  2. import io.netty.channel.socket.SocketChannel;
  3. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  4. import io.netty.handler.codec.LengthFieldPrepender;
  5. import io.netty.handler.timeout.ReadTimeoutHandler;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * 类说明:服务端Handler的初始化
  10. * 交给Spring 托管,ServerBusiHandler用注入方式实例化后加入Netty的pipeline
  11. */
  12. @Service//启动时扫描,即交给spring管理
  13. public class ServerInit extends ChannelInitializer<SocketChannel> {
  14. @Autowired
  15. private ServerBusiHandler serverBusiHandler;
  16. @Override
  17. protected void initChannel(SocketChannel ch) throws Exception {
  18. /*Netty提供的日志打印Handler,可以展示发送接收出去的字节*/
  19. //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
  20. /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
  21. ch.pipeline().addLast("frameDecoder",
  22. new LengthFieldBasedFrameDecoder(65535,
  23. 0,2,0,
  24. 2));
  25. /*给发送出去的消息增加长度字段*/
  26. ch.pipeline().addLast("frameEncoder",
  27. new LengthFieldPrepender(2));
  28. /*反序列化,将字节数组转换为消息实体*/
  29. ch.pipeline().addLast(new KryoDecoder());
  30. /*序列化,将消息实体转换为字节数组准备进行网络传输*/
  31. ch.pipeline().addLast("MessageEncoder",
  32. new KryoEncoder());
  33. /*超时检测*/
  34. ch.pipeline().addLast("readTimeoutHandler",
  35. new ReadTimeoutHandler(50));
  36. /*登录应答*/
  37. ch.pipeline().addLast(new LoginAuthRespHandler());
  38. /*心跳应答*/
  39. ch.pipeline().addLast("HeartBeatHandler",
  40. new HeartBeatRespHandler());
  41. /*服务端业务处理*/
  42. ch.pipeline().addLast("ServerBusiHandler",
  43. serverBusiHandler);
  44. }
  45. }

2.2、netty自定义配置入口 ServerInit

  1. import io.netty.channel.ChannelInitializer;
  2. import io.netty.channel.socket.SocketChannel;
  3. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  4. import io.netty.handler.codec.LengthFieldPrepender;
  5. import io.netty.handler.timeout.ReadTimeoutHandler;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * 类说明:服务端Handler的初始化
  10. * 交给Spring 托管,ServerBusiHandler用注入方式实例化后加入Netty的pipeline
  11. */
  12. @Service//启动时扫描,即交给spring管理
  13. public class ServerInit extends ChannelInitializer<SocketChannel> {
  14. @Autowired
  15. private ServerBusiHandler serverBusiHandler;
  16. @Override
  17. protected void initChannel(SocketChannel ch) throws Exception {
  18. /*Netty提供的日志打印Handler,可以展示发送接收出去的字节*/
  19. //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
  20. /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
  21. ch.pipeline().addLast("frameDecoder",
  22. new LengthFieldBasedFrameDecoder(65535,
  23. 0,2,0,
  24. 2));
  25. /*给发送出去的消息增加长度字段*/
  26. ch.pipeline().addLast("frameEncoder",
  27. new LengthFieldPrepender(2));
  28. /*反序列化,将字节数组转换为消息实体*/
  29. ch.pipeline().addLast(new KryoDecoder());
  30. /*序列化,将消息实体转换为字节数组准备进行网络传输*/
  31. ch.pipeline().addLast("MessageEncoder",
  32. new KryoEncoder());
  33. /*超时检测*/
  34. ch.pipeline().addLast("readTimeoutHandler",
  35. new ReadTimeoutHandler(50));
  36. /*登录应答*/
  37. ch.pipeline().addLast(new LoginAuthRespHandler());
  38. /*心跳应答*/
  39. ch.pipeline().addLast("HeartBeatHandler",
  40. new HeartBeatRespHandler());
  41. /*服务端业务处理*/
  42. ch.pipeline().addLast("ServerBusiHandler",
  43. serverBusiHandler);
  44. }
  45. }

2.3、netty处理业务的Handler入口 ServerBusiHandler 类:

  1. import io.netty.channel.ChannelHandler;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.SimpleChannelInboundHandler;
  4. import org.apache.commons.logging.Log;
  5. import org.apache.commons.logging.LogFactory;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import java.lang.reflect.Method;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * @author Mark老师 享学课堂 https://enjoy.ke.qq.com
  13. * 类说明:业务处理类
  14. * channelRead0方法中有了实际的业务处理,负责具体的业务方法的调用
  15. *
  16. */
  17. @Service
  18. @ChannelHandler.Sharable//共享的单例
  19. public class ServerBusiHandler
  20. extends SimpleChannelInboundHandler<MyMessage> {
  21. private static final Log LOG
  22. = LogFactory.getLog(ServerBusiHandler.class);
  23. @Autowired
  24. private RegisterService registerService;
  25. @Override
  26. protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg)
  27. throws Exception {
  28. LOG.info(msg);
  29. MyMessage message = new MyMessage();
  30. MyHeader myHeader = new MyHeader();
  31. myHeader.setSessionID(msg.getMyHeader().getSessionID());
  32. myHeader.setType(MessageType.SERVICE_RESP.value());
  33. message.setMyHeader(myHeader);
  34. Map<String,Object> content = (HashMap<String,Object>)msg.getBody();
  35. /*方法所在类名接口名*/
  36. String serviceName = (String) content.get("siName");
  37. /*方法的名字*/
  38. String methodName = (String) content.get("methodName");
  39. /*方法的入参类型*/
  40. Class<?>[] paramTypes = (Class<?>[]) content.get("paraTypes");
  41. /*方法的入参的值*/
  42. Object[] args = (Object[]) content.get("args");
  43. /*从容器中拿到服务的Class对象*/
  44. Class serviceClass = registerService.getLocalService(serviceName);
  45. if(serviceClass == null){
  46. throw new ClassNotFoundException(serviceName+ " not found");
  47. }
  48. /*通过反射,执行实际的服务*/
  49. Method method = serviceClass.getMethod(methodName, paramTypes);
  50. boolean result = (boolean)method.invoke(serviceClass.newInstance(),args);
  51. message.setBody(result);
  52. ctx.writeAndFlush(message);
  53. }
  54. @Override
  55. public void channelInactive(ChannelHandlerContext ctx)
  56. throws Exception {
  57. LOG.info(ctx.channel().remoteAddress()+" 主动断开了连接!");
  58. }
  59. }

3、客户端核心代码实现(boot启动类省略):

3.1、netty客户端启动入口

  1. import io.netty.bootstrap.Bootstrap;
  2. import io.netty.channel.Channel;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelOption;
  5. import io.netty.channel.EventLoopGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import org.apache.commons.logging.Log;
  9. import org.apache.commons.logging.LogFactory;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Service;
  12. import javax.annotation.PostConstruct;
  13. import javax.annotation.PreDestroy;
  14. import java.lang.reflect.InvocationHandler;
  15. import java.lang.reflect.Method;
  16. import java.lang.reflect.Proxy;
  17. import java.net.InetSocketAddress;
  18. import java.util.HashMap;
  19. import java.util.Map;
  20. import java.util.concurrent.Executors;
  21. import java.util.concurrent.ScheduledExecutorService;
  22. import java.util.concurrent.TimeUnit;
  23. /**
  24. *类说明:rpc框架的客户端代理部分,交给Spring 托管
  25. * 1、动态代理的实现中,不再连接服务器,而是直接发送请求
  26. * 2、客户端网络部分的主体,包括Netty组件的初始化,连接服务器等
  27. */
  28. @Service
  29. public class RpcClientFrame implements Runnable{
  30. private static final Log LOG = LogFactory.getLog(RpcClientFrame.class);
  31. private ScheduledExecutorService executor = Executors
  32. .newScheduledThreadPool(1);
  33. private Channel channel;
  34. private EventLoopGroup group = new NioEventLoopGroup();
  35. /*是否用户主动关闭连接的标志值*/
  36. private volatile boolean userClose = false;
  37. /*连接是否成功关闭的标志值*/
  38. private volatile boolean connected = false;
  39. @Autowired
  40. private ClientInit clientInit;
  41. @Autowired
  42. private ClientBusiHandler clientBusiHandler;
  43. /*远程服务的代理对象,参数为客户端要调用的的服务*/
  44. public <T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception {
  45. /*拿到一个代理对象,由这个代理对象通过网络进行实际的服务调用*/
  46. return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(),
  47. new Class<?>[]{serviceInterface},
  48. new DynProxy(serviceInterface,clientBusiHandler));
  49. }
  50. /*动态代理,实现对远程服务的访问*/
  51. private static class DynProxy implements InvocationHandler{
  52. private Class<?> serviceInterface;
  53. private ClientBusiHandler clientBusiHandler;
  54. public DynProxy(Class<?> serviceInterface, ClientBusiHandler clientBusiHandler) {
  55. this.serviceInterface = serviceInterface;
  56. this.clientBusiHandler = clientBusiHandler;
  57. }
  58. @Override
  59. public Object invoke(Object proxy, Method method, Object[] args)
  60. throws Throwable {
  61. Map<String,Object> content = new HashMap<>();
  62. content.put("siName",serviceInterface.getName());
  63. content.put("methodName",method.getName());
  64. content.put("paraTypes",method.getParameterTypes());
  65. content.put("args",args);
  66. return clientBusiHandler.send(content);
  67. }
  68. }
  69. public boolean isConnected() {
  70. return connected;
  71. }
  72. /*连接服务器*/
  73. public void connect(int port, String host) throws Exception {
  74. try {
  75. Bootstrap b = new Bootstrap();
  76. b.group(group).channel(NioSocketChannel.class)
  77. .option(ChannelOption.TCP_NODELAY, true)
  78. .handler(clientInit);
  79. // 发起异步连接操作
  80. ChannelFuture future = b.connect(
  81. new InetSocketAddress(host, port)).sync();
  82. channel = future.sync().channel();
  83. /*连接成功后通知等待线程,连接已经建立*/
  84. synchronized (this){
  85. this.connected = true;
  86. this.notifyAll();
  87. }
  88. future.channel().closeFuture().sync();
  89. } finally {
  90. if(!userClose){/*非用户主动关闭,说明发生了网络问题,需要进行重连操作*/
  91. System.out.println("发现异常,可能发生了服务器异常或网络问题," +
  92. "准备进行重连.....");
  93. //再次发起重连操作
  94. executor.execute(new Runnable() {
  95. @Override
  96. public void run() {
  97. try {
  98. TimeUnit.SECONDS.sleep(1);
  99. try {
  100. // 发起重连操作
  101. connect(NettyConstant.REMOTE_PORT,
  102. NettyConstant.REMOTE_IP);
  103. } catch (Exception e) {
  104. e.printStackTrace();
  105. }
  106. } catch (InterruptedException e) {
  107. e.printStackTrace();
  108. }
  109. }
  110. });
  111. }else{/*用户主动关闭,释放资源*/
  112. channel = null;
  113. group.shutdownGracefully().sync();
  114. connected = false;
  115. // synchronized (this){
  116. // this.connected = false;
  117. // this.notifyAll();
  118. // }
  119. }
  120. }
  121. }
  122. @Override
  123. public void run() {
  124. try {
  125. connect(NettyConstant.REMOTE_PORT, NettyConstant.REMOTE_IP);
  126. } catch (Exception e) {
  127. e.printStackTrace();
  128. }
  129. }
  130. public void close() {
  131. userClose = true;
  132. channel.close();
  133. }
  134. @PostConstruct
  135. public void startNet() throws InterruptedException {
  136. new Thread(this).start();
  137. while(!this.isConnected()){
  138. synchronized (this){
  139. this.wait();
  140. }
  141. }
  142. LOG.info("网络通信已准备好,可以进行业务操作了........");
  143. }
  144. @PreDestroy
  145. public void stopNet(){
  146. close();
  147. }
  148. }

3.2、netty客户端自定义配置入口  ClientInit

  1. import io.netty.channel.ChannelInitializer;
  2. import io.netty.channel.socket.SocketChannel;
  3. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  4. import io.netty.handler.codec.LengthFieldPrepender;
  5. import io.netty.handler.timeout.ReadTimeoutHandler;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * @author Mark老师 享学课堂 https://enjoy.ke.qq.com
  10. * 类说明:客户端Handler的初始化
  11. * 交给Spring 托管,clientBusiHandler用注入方式实例化后加入Netty的pipeline
  12. */
  13. @Service
  14. public class ClientInit extends ChannelInitializer<SocketChannel> {
  15. @Autowired
  16. private ClientBusiHandler clientBusiHandler;
  17. @Override
  18. protected void initChannel(SocketChannel ch) throws Exception {
  19. /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
  20. ch.pipeline().addLast("frameDecoder",
  21. new LengthFieldBasedFrameDecoder(65535,
  22. 0,2,0,
  23. 2));
  24. /*给发送出去的消息增加长度字段*/
  25. ch.pipeline().addLast("frameEncoder",
  26. new LengthFieldPrepender(2));
  27. /*反序列化,将字节数组转换为消息实体*/
  28. ch.pipeline().addLast(new KryoDecoder());
  29. /*序列化,将消息实体转换为字节数组准备进行网络传输*/
  30. ch.pipeline().addLast("MessageEncoder",
  31. new KryoEncoder());
  32. /*超时检测*/
  33. ch.pipeline().addLast("readTimeoutHandler",
  34. new ReadTimeoutHandler(10));
  35. /*发出登录请求*/
  36. ch.pipeline().addLast("LoginAuthHandler",
  37. new LoginAuthReqHandler());
  38. /*发出心跳请求*/
  39. ch.pipeline().addLast("HeartBeatHandler",
  40. new HeartBeatReqHandler());
  41. /*业务处理*/
  42. ch.pipeline().addLast("ClientBusiHandler",
  43. clientBusiHandler);
  44. }
  45. }

3.3、netty通信中序列化的实现

序列化的Handler

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.handler.codec.MessageToByteEncoder;
  4. /**
  5. * 类说明:序列化的Handler
  6. */
  7. public class KryoEncoder extends MessageToByteEncoder<MyMessage> {
  8. @Override
  9. protected void encode(ChannelHandlerContext ctx, MyMessage message,
  10. ByteBuf out) throws Exception {
  11. KryoSerializer.serialize(message, out);
  12. ctx.flush();
  13. }
  14. }

反序列化的Handler

  1. import io.netty.buffer.ByteBuf;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.handler.codec.ByteToMessageDecoder;
  4. import java.util.List;
  5. /**
  6. * 类说明:反序列化的Handler
  7. */
  8. public class KryoDecoder extends ByteToMessageDecoder {
  9. @Override
  10. protected void decode(ChannelHandlerContext ctx, ByteBuf in,
  11. List<Object> out) throws Exception {
  12. Object obj = KryoSerializer.deserialize(in);
  13. out.add(obj);
  14. }
  15. }

信息类的定义:

  1. /**
  2. * 类说明:消息实体类
  3. */
  4. public final class MyMessage {
  5. private MyHeader myHeader;
  6. private Object body;
  7. public final MyHeader getMyHeader() {
  8. return myHeader;
  9. }
  10. public final void setMyHeader(MyHeader myHeader) {
  11. this.myHeader = myHeader;
  12. }
  13. public final Object getBody() {
  14. return body;
  15. }
  16. public final void setBody(Object body) {
  17. this.body = body;
  18. }
  19. @Override
  20. public String toString() {
  21. return "MyMessage [myHeader=" + myHeader + "][body="+body+"]";
  22. }
  23. }

4、启动Netty服务端:

5、启动客户端

接下来就可以通信了,比如,客户端发送消息:
 

到此,springboot集成netty的过程基本结束,后期会详细分析其前后端交互过程,敬请期待。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/185420
推荐阅读
相关标签
  

闽ICP备14008679号