当前位置:   article > 正文

Netty学习——实战篇8 Handler链调用、TCP粘包和拆包 备份

Netty学习——实战篇8 Handler链调用、TCP粘包和拆包 备份

 1 Handler链调用-需求

        使用自定义的编码器和解码器来说明Netty的Handler调用机制。客户端发送long类型数据到服务端;服务端发送long到客户端。

2 Handler链调用-实战

2.1  NettyServer.java

  1. public class NettyServer {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  4. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap serverBootstrap = new ServerBootstrap();
  7. serverBootstrap.group(bossGroup,workerGroup)
  8. .channel(NioServerSocketChannel.class)
  9. .childHandler(new MyServerInitializer());//自定义一个初始化类
  10. ChannelFuture sync = serverBootstrap.bind(8000).sync();
  11. sync.channel().closeFuture().sync();
  12. }catch (Exception e){
  13. e.printStackTrace();
  14. }finally {
  15. bossGroup.shutdownGracefully();
  16. workerGroup.shutdownGracefully();
  17. }
  18. }
  19. }

2.2 MyServerInitializer.java

  1. public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. //对入站的Hadler进行解码
  6. pipeline.addLast(new MyByteToLongDecoder2());
  7. //对出站的Handler进行解码
  8. pipeline.addLast(new MyLongToByteEncoder());
  9. //自定义Handler
  10. pipeline.addLast(new MyServerHandler());
  11. }
  12. }

2.3 MyByteToLongDecoder2.java

  1. public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
  2. @Override
  3. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  4. System.out.println("MyByteToLongDecoder2 被调用");
  5. out.add(in.readLong());
  6. }
  7. }

 2.4 MyLongToByteEncoder.java

  1. public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
  2. @Override
  3. protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
  4. System.out.println("MyLongToByteEncoder 被调用");
  5. System.out.println("msg = " + msg);
  6. out.writeLong(msg);
  7. }
  8. }

2.5 MyServerHandler.java

  1. public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
  4. System.out.println("从客户端:"+ctx.channel().remoteAddress() + "读取到long类型数据:"+msg);
  5. //向客户端发送一个long类型数据
  6. ctx.writeAndFlush(123456L);
  7. }
  8. @Override
  9. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  10. cause.printStackTrace();
  11. ctx.close();
  12. }
  13. }

2.6 NettyClient.java

  1. public class NettyClient {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup g = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(g)
  7. .channel(NioSocketChannel.class)
  8. .handler(new MyClientInitializer());
  9. ChannelFuture sync = bootstrap.connect("127.0.0.1", 8000).sync();
  10. sync.channel().closeFuture().sync();
  11. }catch (Exception e){
  12. e.printStackTrace();
  13. }finally {
  14. g.shutdownGracefully();
  15. }
  16. }
  17. }

2.7 MyClientInitializer.java

  1. public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. //加入一个出站的handler 对数据进行一个编码
  6. pipeline.addLast(new MyLongToByteEncoder());
  7. //这时一个入站的解码器(入站handler )
  8. pipeline.addLast(new MyByteToLongDecoder2());
  9. //加入一个自定义的handler , 处理业务
  10. pipeline.addLast(new MyClientHandler());
  11. }
  12. }

2.8 MyClientHandler.java

  1. public class MyClientHandler extends SimpleChannelInboundHandler<SocketChannel> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, SocketChannel msg) throws Exception {
  4. System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
  5. System.out.println("收到服务器消息=" + msg);
  6. }
  7. @Override
  8. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  9. System.out.println("MyClientHandler 发送数据");
  10. //ctx.writeAndFlush(Unpooled.copiedBuffer(""))
  11. ctx.writeAndFlush(654321L); //发送的是一个long
  12. }
  13. }

3 运行结果

服务端

客户端

 4 复现TCP粘包和拆包

4.1 NettyServer.java

  1. public class NettyServer {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  4. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap serverBootstrap = new ServerBootstrap();
  7. serverBootstrap.group(bossGroup,workerGroup)
  8. .channel(NioServerSocketChannel.class)
  9. .childHandler(new MyServerInitializer ());
  10. ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
  11. channelFuture.channel().closeFuture().sync();
  12. }catch (Exception e){
  13. e.printStackTrace();
  14. }finally {
  15. bossGroup.shutdownGracefully();
  16. workerGroup.shutdownGracefully();
  17. }
  18. }
  19. }

4.2 MyServerInitializer.java

  1. public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. pipeline.addLast(new NettyServerHandler());
  6. }
  7. }

4.3 NettyServerHandler.java

  1. @Slf4j
  2. public class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
  3. private int count;
  4. @Override
  5. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  6. byte[] bytes = new byte[msg.readableBytes()];
  7. msg.readBytes(bytes);
  8. //将bytes转换成字符串
  9. String result = new String(bytes, Charset.forName("utf-8"));
  10. log.info("服务器接收到的数据:{}",result);
  11. log.info("服务器接收的消息量是:{}",(++this.count));
  12. //服务器返回数据给客户端
  13. ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(),Charset.forName("utf-8"));
  14. ctx.writeAndFlush(byteBuf);
  15. }
  16. @Override
  17. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  18. cause.printStackTrace();
  19. ctx.close();
  20. }
  21. }

4.4 NettyClient.java

  1. public class NettyClient {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup group = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(group)
  7. .channel(NioSocketChannel.class)
  8. .handler(new MyClientInitializer ());
  9. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000).sync();
  10. channelFuture.channel().closeFuture().sync();
  11. }catch (Exception e){
  12. e.printStackTrace();
  13. }finally {
  14. group.shutdownGracefully();
  15. }
  16. }
  17. }

4.5 MyClientInitializer.java

  1. public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. pipeline.addLast(new MyClientHandler ());
  6. }
  7. }

4.6 MyClientHandler.java

  1. @Slf4j
  2. public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
  3. private int count;
  4. @Override
  5. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  6. for (int i = 0; i < 10; i++) {
  7. ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, Charset.forName("UTF-8"));
  8. ctx.writeAndFlush(byteBuf);
  9. }
  10. }
  11. @Override
  12. protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
  13. byte[] buffer = new byte[msg.readableBytes()];
  14. msg.readBytes(buffer);
  15. String result = new String(buffer, Charset.forName("utf-8"));
  16. log.info("客户端接收的消息是:{}",result);
  17. log.info("客户端接收的消息数量是:{}",(++this.count));
  18. }
  19. @Override
  20. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  21. cause.printStackTrace();
  22. ctx.close();
  23. }
  24. }

多次运行客户端。服务端运行结果如下:

5 TCP 粘包和拆包解决方案

5.1 解决思路

        使用自定义协议+编解码器来解决

        重点:要解决服务器端每次读取数据长度的问题,这个问题解决后,就不会出现服务器多读或者少读数据的问题,从而避免TCP粘包和拆包。

5.2 需求

         1、要求客户端发送5个Message对象,客户端每次发送一个Message对象。

         2、服务端每次接收一个Message对象,分5次进行解码。每次读取一个Message,就回复一个Message对象给客户端。

5.3 自定义 消息协议类:MessageProtocol.java

  1. public class MessageProtocol {
  2. private int length;//长度
  3. private byte[] content;//内容
  4. public int getLength() {
  5. return length;
  6. }
  7. public void setLength(int length) {
  8. this.length = length;
  9. }
  10. public byte[] getContent() {
  11. return content;
  12. }
  13. public void setContent(byte[] content) {
  14. this.content = content;
  15. }
  16. }

5.4 自定义编码器:    MyMessageEncoder.java

  1. @Slf4j
  2. public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
  3. @Override
  4. protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
  5. log.info("MyMessageEncoder 的 encode 方法被调用");
  6. out.writeInt(msg.getLength());
  7. out.writeBytes(msg.getContent());
  8. }
  9. }

5.5 自定义解码器: MyMessageDecoder.java

  1. /*
  2. 自定义解码器,把二进制字节码转换成 MessageProtocol对象
  3. */
  4. @Slf4j
  5. public class MyMessageDecoder extends ReplayingDecoder<Void> {
  6. @Override
  7. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  8. log.info("MyMessageDecoder 的 decode方法被调用");
  9. int length = in.readInt();
  10. byte[] bytes = new byte[length];
  11. in.readBytes(bytes);
  12. //封装成MessageProtocol对象
  13. MessageProtocol messageProtocol = new MessageProtocol();
  14. messageProtocol.setLength(length);
  15. messageProtocol.setContent(bytes);
  16. //把 MessageProtocol对象放入list中
  17. out.add(messageProtocol);
  18. }
  19. }

5.6 MyServerHandler.java

  1. @Slf4j
  2. public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
  3. //计数器
  4. private int count;
  5. @Override
  6. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  7. cause.printStackTrace();
  8. ctx.close();
  9. }
  10. @Override
  11. protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
  12. //接收数据,并处理
  13. int length = msg.getLength();
  14. byte[] content = msg.getContent();
  15. log.info("服务器接收到信息如下,消息长度是:{},消息内容是{}",length,new String(content, Charset.forName("utf-8")));
  16. log.info("服务器接收到数据包的数量是:{}",(++this.count));
  17. //服务器回复消息
  18. String responseMessage = UUID.randomUUID().toString();
  19. int responseLength = responseMessage.getBytes("utf-8").length;
  20. byte[] responseMessageBytes = responseMessage.getBytes("utf-8");
  21. //构建协议包
  22. MessageProtocol messageProtocol = new MessageProtocol();
  23. messageProtocol.setLength(responseLength);
  24. messageProtocol.setContent(responseMessageBytes);
  25. ctx.writeAndFlush(messageProtocol);
  26. }
  27. }

5.7 MyServerInitializer.java

  1. public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. pipeline.addLast(new MyMessageDecoder());
  6. pipeline.addLast(new MyMessageEncoder());
  7. pipeline.addLast(new MyServerHandler());
  8. }
  9. }

5.8 MyServer.java

  1. public class MyServer {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
  4. NioEventLoopGroup workerGroup = new NioEventLoopGroup();
  5. try {
  6. ServerBootstrap serverBootstrap = new ServerBootstrap();
  7. serverBootstrap.group(bossGroup,workerGroup)
  8. .channel(NioServerSocketChannel.class)
  9. .childHandler(new MyServerInitializer());
  10. ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
  11. channelFuture.channel().closeFuture().sync();
  12. }catch (Exception e){
  13. e.printStackTrace();
  14. }finally {
  15. bossGroup.shutdownGracefully();
  16. workerGroup.shutdownGracefully();
  17. }
  18. }
  19. }

5.9 MyClientHandler.java

  1. @Slf4j
  2. public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
  3. private int count;
  4. @Override
  5. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  6. //使用客户端发送10条数据
  7. for (int i = 0; i < 5; i++) {
  8. String msg = "孔乙己" + i;
  9. int length = msg.getBytes().length;
  10. byte[] bytes = msg.getBytes();
  11. MessageProtocol messageProtocol = new MessageProtocol();
  12. messageProtocol.setContent(bytes);
  13. messageProtocol.setLength(length);
  14. ctx.writeAndFlush(messageProtocol);
  15. }
  16. }
  17. @Override
  18. protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
  19. int length = msg.getLength();
  20. byte[] content = msg.getContent();
  21. log.info("客户端接收到消息如下,长度:{},内容:{}",length,new String(content, Charset.forName("utf-8")));
  22. log.info("客户端接收消息数量是:{}",(++this.count));
  23. }
  24. @Override
  25. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  26. ctx.close();
  27. }
  28. }

5.10 MyClientInitializer.java

  1. public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
  2. @Override
  3. protected void initChannel(SocketChannel ch) throws Exception {
  4. ChannelPipeline pipeline = ch.pipeline();
  5. pipeline.addLast(new MyMessageEncoder());
  6. pipeline.addLast(new MyMessageDecoder());
  7. pipeline.addLast(new MyClientHandler());
  8. }
  9. }

5.11 MyClient.java

  1. public class MyClient {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup group = new NioEventLoopGroup();
  4. try {
  5. Bootstrap bootstrap = new Bootstrap();
  6. bootstrap.group(group)
  7. .channel(NioSocketChannel.class)
  8. .handler(new MyClientInitializer());
  9. ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000).sync();
  10. channelFuture.channel().closeFuture().sync();
  11. }catch (Exception e){
  12. e.printStackTrace();
  13. }finally {
  14. group.shutdownGracefully();
  15. }
  16. }
  17. }

5.12 运行结果

服务端:

客户端:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/978280
推荐阅读
相关标签
  

闽ICP备14008679号