当前位置:   article > 正文

SpringBoot集成Netty_springboot集成netty服务端

springboot集成netty服务端

目录

一、pom依赖

二、配置yml文件

三、服务端

四、客户端

五、粘包和拆包问题


一、pom依赖

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.77.Final</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>cn.hutool</groupId>
  8. <artifactId>hutool-all</artifactId>
  9. <version>5.5.8</version>
  10. </dependency>

二、配置yml文件

  1. server:
  2. port: 8001
  3. servlet:
  4. context-path: /netty
  5. netty:
  6. url: 0.0.0.0 #0.0.0.0表示绑定任意ip
  7. port: 20004

三、服务端

  1. package com.tlxy.lhn.controller.netty;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. public class NettyServer {
  8. public static void main(String[] args) throws InterruptedException {
  9. //创建两个线程组bossGroup和workerGroup,含有的子线程NioEventLoop的个数默认是CPU的两倍
  10. //bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成
  11. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  12. EventLoopGroup workerGroup = new NioEventLoopGroup(1);
  13. try {
  14. //创建服务器端的启动对象
  15. ServerBootstrap bootstrap = new ServerBootstrap();
  16. //使用链式编程来配置参数
  17. bootstrap.group(bossGroup, workerGroup)//设置两个线程组
  18. .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现
  19. //初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
  20. //多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
  21. .option(ChannelOption.SO_BACKLOG, 1024)
  22. .childHandler(new ChannelInitializer<SocketChannel>() {
  23. @Override
  24. protected void initChannel(SocketChannel channel) throws Exception {
  25. //对workerGroup的SocketChannel设置处理器
  26. channel.pipeline().addLast(new NettyServerHandler());
  27. }
  28. });
  29. System.out.println("netty server start..");
  30. //绑定一个端口并且同步生成一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
  31. //启动服务器(并绑定的端口),bind是异步操作,sync方法是等待异步操作执行完毕
  32. ChannelFuture cf = bootstrap.bind(9000).sync();
  33. //给cf注册监听器,监听我们关心的事件
  34. cf.addListener(new ChannelFutureListener() {
  35. @Override
  36. public void operationComplete(ChannelFuture channelFuture) throws Exception {
  37. if (cf.isSuccess()) {
  38. System.out.println("监听端口9000成功");
  39. } else {
  40. System.out.println("监听端口9000失败");
  41. }
  42. }
  43. });
  44. //等待服务端监听端口关闭,closeFuture是异步操作
  45. //通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
  46. cf.channel().closeFuture().sync();
  47. } finally {
  48. bossGroup.shutdownGracefully();
  49. workerGroup.shutdownGracefully();
  50. }
  51. }
  52. }
NettyServer类中的channel.pipeline().addLast(new NettyServerHandler());对应以下的处理器。
  1. package com.tlxy.lhn.controller.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. import lombok.extern.slf4j.Slf4j;
  8. @Slf4j
  9. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  10. @Override
  11. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  12. ByteBuf buf = (ByteBuf) msg;
  13. System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
  14. // 读取byteBuf
  15. // 业务处理
  16. // 回消息给客户端
  17. }
  18. @Override
  19. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  20. ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
  21. ctx.writeAndFlush(buf);
  22. }
  23. //只要Netty抛出错误就会执行,Netty断会开连接会抛出连接超时的错误
  24. @Override
  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  26. log.info("关闭通道");
  27. cause.printStackTrace();
  28. ctx.close();
  29. }
  30. }

四、客户端

  1. package com.tlxy.lhn.controller.netty;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. public class NettyClient {
  9. public static void main(String[] args) throws InterruptedException {
  10. //客户端需要一个事件循环组
  11. NioEventLoopGroup group = new NioEventLoopGroup();
  12. try {
  13. //创建客户端启动对象
  14. //注意客户端使用的不是SocketBootstrap而是Bootstrap
  15. Bootstrap bootstrap = new Bootstrap();
  16. // 设置相关参数
  17. bootstrap.group(group) //设置线程组
  18. .channel(NioSocketChannel.class)// 使用NioSocketChannel作为客户端的通道实现
  19. .handler(new ChannelInitializer<SocketChannel>() {
  20. @Override
  21. protected void initChannel(SocketChannel ch) throws Exception {
  22. ch.pipeline().addLast(new NettyClientHandler());
  23. }
  24. });
  25. System.out.println("netty client start..");
  26. ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
  27. cf.channel().closeFuture().sync();
  28. }finally {
  29. group.shutdownGracefully();
  30. }
  31. }
  32. }
NettyClient类中ch.pipeline().addLast(new NettyClientHandler());为处理器。
  1. package com.tlxy.lhn.controller.netty;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.ChannelInboundHandlerAdapter;
  6. import io.netty.util.CharsetUtil;
  7. import lombok.extern.slf4j.Slf4j;
  8. @Slf4j
  9. public class NettyClientHandler extends ChannelInboundHandlerAdapter {
  10. /**
  11. * 客户端连接标识
  12. * @param ctx
  13. * @throws Exception
  14. */
  15. @Override
  16. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  17. ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
  18. ctx.writeAndFlush(buf);
  19. }
  20. //当通道建立后有事件时会触发,即服务端发送数据给客户端
  21. @Override
  22. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  23. ByteBuf buf = (ByteBuf) msg;
  24. System.out.println("收到服务端的消息是:" + buf.toString(CharsetUtil.UTF_8));
  25. System.out.println("服务端地址是:" + ctx.channel().remoteAddress());
  26. }
  27. @Override
  28. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  29. log.info("关闭通道");
  30. cause.printStackTrace();
  31. ctx.close();
  32. }
  33. }

五、粘包和拆包问题

客户端和服务端都是固定的框架,我们只需写处理器。

粘包和拆包问题,可以自己手写通过固定长度发送数据,或者使用Google的Protostuff。

  1. <dependency>
  2. <groupId>com.dyuproject.protostuff</groupId>
  3. <artifactId>protostuff-api</artifactId>
  4. <version>1.0.8</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.dyuproject.protostuff</groupId>
  8. <artifactId>protostuff-core</artifactId>
  9. <version>1.0.8</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.dyuproject.protostuff</groupId>
  13. <artifactId>protostuff-runtime</artifactId>
  14. <version>1.0.8</version>
  15. </dependency>
  1. package com.tlxy.lhn.controller.netty;
  2. import com.dyuproject.protostuff.LinkedBuffer;
  3. import com.dyuproject.protostuff.ProtostuffIOUtil;
  4. import com.dyuproject.protostuff.Schema;
  5. import com.dyuproject.protostuff.runtime.RuntimeSchema;
  6. import java.util.Map;
  7. import java.util.concurrent.ConcurrentHashMap;
  8. public class ProtostuffUtil {
  9. private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
  10. private static <T> Schema<T> getSchema(Class<T> clazz) {
  11. @SuppressWarnings("unchecked")
  12. Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
  13. if (schema == null) {
  14. schema = RuntimeSchema.getSchema(clazz);
  15. if (schema != null) {
  16. cachedSchema.put(clazz, schema);
  17. }
  18. }
  19. return schema;
  20. }
  21. /**
  22. * 序列化
  23. *
  24. * @param obj
  25. * @return
  26. */
  27. public static <T> byte[] serializer(T obj) {
  28. @SuppressWarnings("unchecked")
  29. Class<T> clazz = (Class<T>) obj.getClass();
  30. LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
  31. try {
  32. Schema<T> schema = getSchema(clazz);
  33. return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
  34. } catch (Exception e) {
  35. throw new IllegalStateException(e.getMessage(), e);
  36. } finally {
  37. buffer.clear();
  38. }
  39. }
  40. /**
  41. * 反序列化
  42. *
  43. * @param data
  44. * @param clazz
  45. * @return
  46. */
  47. public static <T> T deserializer(byte[] data, Class<T> clazz) {
  48. try {
  49. T obj = clazz.newInstance();
  50. Schema<T> schema = getSchema(clazz);
  51. ProtostuffIOUtil.mergeFrom(data, obj, schema);
  52. return obj;
  53. } catch (Exception e) {
  54. throw new IllegalStateException(e.getMessage(), e);
  55. }
  56. }
  57. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/572447
推荐阅读
相关标签
  

闽ICP备14008679号