当前位置:   article > 正文

Netty详解:Springboot整合Netty_springboot netty

springboot netty

小知识:21天效应
行为心理学中,人们把一个人的新习惯或新理念的形成并得以巩固至少需要21天的现象,称之为21天效应。也就是说,一个人的动作或想法,如果重复21天就会变成一个习惯性的动作或想法。

步骤

1 先写好基本的Netty客户端和Netty服务的代码。参考文章【netty初识】
2.搭建好基本的Springboot项目。
3.将Netty服务端代码的启动代码和关闭代码分离,服务端加上@Component注解,交由Spring管理实例。
4.Springboot启动时,将Netty服务给启动;同时Springboot停止时,将Netty服务销毁。

实现

Netty服务端

主要工作:
将Netty服务端代码的启动代码和关闭代码分离,服务端加上@Component注解,交由Spring管理实例

 

  1. /**
  2. * 服务端
  3. * 1.创建一个ServerBootstrap的实例引导和绑定服务器。
  4. * 2.创建并分配一个NioEventLoopGroup实例以进行事件的处理,比如接受连接以及读写数据。
  5. * 3.指定服务器绑定的本地的InetSocketAddress。
  6. * 4.使用一个EchoServerHandler的实例初始化每一个新的Channel。
  7. * 5.调用ServerBootstrap.bind()方法以绑定服务器。
  8. */
  9. @Slf4j
  10. @Component
  11. public class EchoServer {
  12. /**
  13. * NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O的读写之外
  14. * 创建了两个NioEventLoopGroup,
  15. * 它们实际是两个独立的Reactor线程池。
  16. * 一个用于接收客户端的TCP连接,
  17. * 另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。
  18. */
  19. private final EventLoopGroup bossGroup = new NioEventLoopGroup();
  20. private final EventLoopGroup workerGroup = new NioEventLoopGroup();
  21. private Channel channel;
  22. /**
  23. * 启动服务
  24. * @param hostname
  25. * @param port
  26. * @return
  27. * @throws Exception
  28. */
  29. public ChannelFuture start(String hostname,int port) throws Exception {
  30. final EchoServerHandler serverHandler = new EchoServerHandler();
  31. ChannelFuture f = null;
  32. try {
  33. //ServerBootstrap负责初始化netty服务器,并且开始监听端口的socket请求
  34. ServerBootstrap b = new ServerBootstrap();
  35. b.group(bossGroup, workerGroup)
  36. .channel(NioServerSocketChannel.class)
  37. .localAddress(new InetSocketAddress(hostname,port))
  38. .childHandler(new ChannelInitializer<SocketChannel>() {
  39. @Override
  40. protected void initChannel(SocketChannel socketChannel) throws Exception {
  41. // 为监听客户端read/write事件的Channel添加用户自定义的ChannelHandler
  42. socketChannel.pipeline().addLast(serverHandler);
  43. }
  44. });
  45. f = b.bind().sync();
  46. channel = f.channel();
  47. log.info("======EchoServer启动成功!!!=========");
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. } finally {
  51. if (f != null && f.isSuccess()) {
  52. log.info("Netty server listening " + hostname + " on port " + port + " and ready for connections...");
  53. } else {
  54. log.error("Netty server start up Error!");
  55. }
  56. }
  57. return f;
  58. }
  59. /**
  60. * 停止服务
  61. */
  62. public void destroy() {
  63. log.info("Shutdown Netty Server...");
  64. if(channel != null) { channel.close();}
  65. workerGroup.shutdownGracefully();
  66. bossGroup.shutdownGracefully();
  67. log.info("Shutdown Netty Server Success!");
  68. }
  69. }

服务端业务处理handler

服务端的生命周期以及接收客户端的信息收发和处理。这里不建议使用阻塞的操作,容易影响netty的性能。

 

  1. /***
  2. * 服务端自定义业务处理handler
  3. */
  4. public class EchoServerHandler extends ChannelInboundHandlerAdapter {
  5. /**
  6. * 对每一个传入的消息都要调用;
  7. * @param ctx
  8. * @param msg
  9. * @throws Exception
  10. */
  11. @Override
  12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  13. ByteBuf in = (ByteBuf) msg;
  14. System.out.println("server received: "+in.toString(CharsetUtil.UTF_8));
  15. ctx.write(in);
  16. }
  17. /**
  18. * 通知ChannelInboundHandler最后一次对channelRead()的调用时当前批量读取中的的最后一条消息。
  19. * @param ctx
  20. * @throws Exception
  21. */
  22. @Override
  23. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  24. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
  25. }
  26. /**
  27. * 在读取操作期间,有异常抛出时会调用。
  28. * @param ctx
  29. * @param cause
  30. * @throws Exception
  31. */
  32. @Override
  33. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  34. cause.printStackTrace();
  35. ctx.close();
  36. }
  37. }

Springboot启动服务端代码

CommandLineRunner #run()
这里主要是通过CommandLineRunner 接口的run方法,实现在项目启动后执行的功能,SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的代码放在实现的run方法中。

addShutdownHook()
而 Runtime.getRuntime().addShutdownHook(shutdownHook); 这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。

详细代码如下:

 

  1. @SpringBootApplication
  2. public class SpringNettyApplication implements CommandLineRunner {
  3. @Value("${netty.port}")
  4. private int port;
  5. @Value("${netty.url}")
  6. private String url;
  7. @Autowired
  8. private EchoServer echoServer;
  9. public static void main(String[] args) {
  10. SpringApplication.run(SpringNettyApplication.class, args);
  11. }
  12. @Override
  13. public void run(String... args) throws Exception {
  14. ChannelFuture future = echoServer.start(url,port);
  15. Runtime.getRuntime().addShutdownHook(new Thread(){
  16. @Override
  17. public void run() {
  18. echoServer.destroy();
  19. }
  20. });
  21. //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
  22. future.channel().closeFuture().syncUninterruptibly();
  23. }
  24. }

Netty客户端

它在本文中的作用主要是为了测试服务端是否正常运行,通过客户端连接Netty的服务端,看到消息是否正常通信。

 

  1. /**
  2. * 客户端
  3. * 1.为初始化客户端,创建一个Bootstrap实例
  4. * 2.为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据;
  5. * 3.当连接被建立时,一个EchoClientHandler实例会被安装到(该Channel的一个ChannelPipeline中;
  6. * 4.在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。
  7. */
  8. public class EchoClient {
  9. private final String host;
  10. private final int port;
  11. public EchoClient(String host, int port) {
  12. this.host = host;
  13. this.port = port;
  14. }
  15. /**
  16. * 运行流程:
  17. * @param args
  18. * @throws Exception
  19. */
  20. public static void main(String[] args) throws Exception {
  21. new EchoClient("127.0.0.1",10010).start();
  22. }
  23. private void start() throws Exception {
  24. /**
  25. * Netty用于接收客户端请求的线程池职责如下。
  26. *1)接收客户端TCP连接,初始化Channel参数;
  27. *2)将链路状态变更事件通知给ChannelPipeline
  28. */
  29. EventLoopGroup group = new NioEventLoopGroup();
  30. try {
  31. Bootstrap b = new Bootstrap();
  32. b.group(group)
  33. .channel(NioSocketChannel.class)
  34. .remoteAddress(new InetSocketAddress(host,port))
  35. .handler(new ChannelInitializer<SocketChannel>() {
  36. @Override
  37. protected void initChannel(SocketChannel socketChannel) throws Exception {
  38. socketChannel.pipeline().addLast(new EchoClientHandler());
  39. }
  40. });
  41. //绑定端口
  42. ChannelFuture f = b.connect().sync();
  43. f.channel().closeFuture().sync();
  44. } catch (Exception e) {
  45. group.shutdownGracefully().sync();
  46. }
  47. }
  48. }

Netty客户端业务处理类

主要是监控Netty客户端的生命周期以及接收服务端的消息,往服务端发送消息等。

 

  1. /**
  2. * 客户端处理类
  3. */
  4. public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  5. /**
  6. * 在到服务器的连接已经建立之后将被调用
  7. * @param ctx
  8. * @throws Exception
  9. */
  10. @Override
  11. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  12. ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks !", CharsetUtil.UTF_8));
  13. }
  14. /**
  15. * 当从服务器接收到一个消息时被调用
  16. * @param channelHandlerContext
  17. * @param byteBuf
  18. * @throws Exception
  19. */
  20. @Override
  21. protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
  22. System.out.println("Client received: "+ byteBuf.toString(CharsetUtil.UTF_8));
  23. }
  24. /**
  25. * 在处理过程中引发异常时被调用
  26. * @param ctx
  27. * @param cause
  28. * @throws Exception
  29. */
  30. @Override
  31. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  32. cause.printStackTrace();
  33. ctx.close();
  34. }
  35. }

总结

从整体来看,只不过是将Netty服务端从main函数启动方式改为交给Spring来管理启动和销毁的工作,也就说以后你有个什么代码要交给Spring管理的也是可以这样子处理。

最后

如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。
关注公众号【爱编码】,小编会一直更新文章的哦。



作者:xbmchina
链接:https://www.jianshu.com/p/2a2562f85241
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/206504
推荐阅读
相关标签
  

闽ICP备14008679号