赞
踩
使用自定义的编码器和解码器来说明Netty的Handler调用机制。客户端发送long类型数据到服务端;服务端发送long到客户端。
- public class NettyServer {
- public static void main(String[] args) {
- NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup,workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new MyServerInitializer());//自定义一个初始化类
- ChannelFuture sync = serverBootstrap.bind(8000).sync();
- sync.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- }
- public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- //对入站的Hadler进行解码
- pipeline.addLast(new MyByteToLongDecoder2());
- //对出站的Handler进行解码
- pipeline.addLast(new MyLongToByteEncoder());
- //自定义Handler
- pipeline.addLast(new MyServerHandler());
- }
- }
- public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- System.out.println("MyByteToLongDecoder2 被调用");
- out.add(in.readLong());
- }
- }
- public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
- @Override
- protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
- System.out.println("MyLongToByteEncoder 被调用");
- System.out.println("msg = " + msg);
- out.writeLong(msg);
- }
- }
- public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
- System.out.println("从客户端:"+ctx.channel().remoteAddress() + "读取到long类型数据:"+msg);
- //向客户端发送一个long类型数据
- ctx.writeAndFlush(123456L);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
- public class NettyClient {
- public static void main(String[] args) {
- NioEventLoopGroup g = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(g)
- .channel(NioSocketChannel.class)
- .handler(new MyClientInitializer());
- ChannelFuture sync = bootstrap.connect("127.0.0.1", 8000).sync();
- sync.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- g.shutdownGracefully();
- }
- }
- }
- public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- //加入一个出站的handler 对数据进行一个编码
- pipeline.addLast(new MyLongToByteEncoder());
-
- //这时一个入站的解码器(入站handler )
- pipeline.addLast(new MyByteToLongDecoder2());
- //加入一个自定义的handler , 处理业务
- pipeline.addLast(new MyClientHandler());
- }
- }
- public class MyClientHandler extends SimpleChannelInboundHandler<SocketChannel> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, SocketChannel msg) throws Exception {
- System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
- System.out.println("收到服务器消息=" + msg);
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("MyClientHandler 发送数据");
- //ctx.writeAndFlush(Unpooled.copiedBuffer(""))
- ctx.writeAndFlush(654321L); //发送的是一个long
-
- }
- }
服务端
- public class NettyServer {
- public static void main(String[] args) {
- NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup,workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new MyServerInitializer ());
- ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
- channelFuture.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- }
- public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new NettyServerHandler());
- }
- }
- @Slf4j
- public class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
- private int count;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- byte[] bytes = new byte[msg.readableBytes()];
- msg.readBytes(bytes);
- //将bytes转换成字符串
- String result = new String(bytes, Charset.forName("utf-8"));
- log.info("服务器接收到的数据:{}",result);
- log.info("服务器接收的消息量是:{}",(++this.count));
- //服务器返回数据给客户端
- ByteBuf byteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString(),Charset.forName("utf-8"));
- ctx.writeAndFlush(byteBuf);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
- public class NettyClient {
- public static void main(String[] args) {
- NioEventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group)
- .channel(NioSocketChannel.class)
- .handler(new MyClientInitializer ());
- ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000).sync();
- channelFuture.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- group.shutdownGracefully();
- }
- }
- }
- public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new MyClientHandler ());
- }
- }
- @Slf4j
- public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
- private int count;
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- ByteBuf byteBuf = Unpooled.copiedBuffer("hello,server" + i, Charset.forName("UTF-8"));
- ctx.writeAndFlush(byteBuf);
- }
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- byte[] buffer = new byte[msg.readableBytes()];
- msg.readBytes(buffer);
- String result = new String(buffer, Charset.forName("utf-8"));
- log.info("客户端接收的消息是:{}",result);
- log.info("客户端接收的消息数量是:{}",(++this.count));
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
- }
多次运行客户端。服务端运行结果如下:
使用自定义协议+编解码器来解决
重点:要解决服务器端每次读取数据长度的问题,这个问题解决后,就不会出现服务器多读或者少读数据的问题,从而避免TCP粘包和拆包。
1、要求客户端发送5个Message对象,客户端每次发送一个Message对象。
2、服务端每次接收一个Message对象,分5次进行解码。每次读取一个Message,就回复一个Message对象给客户端。
- public class MessageProtocol {
- private int length;//长度
- private byte[] content;//内容
-
-
-
- public int getLength() {
- return length;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public byte[] getContent() {
- return content;
- }
-
- public void setContent(byte[] content) {
- this.content = content;
- }
- }
- @Slf4j
- public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
- @Override
- protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
- log.info("MyMessageEncoder 的 encode 方法被调用");
- out.writeInt(msg.getLength());
- out.writeBytes(msg.getContent());
- }
- }
- /*
- 自定义解码器,把二进制字节码转换成 MessageProtocol对象
- */
- @Slf4j
- public class MyMessageDecoder extends ReplayingDecoder<Void> {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- log.info("MyMessageDecoder 的 decode方法被调用");
- int length = in.readInt();
- byte[] bytes = new byte[length];
- in.readBytes(bytes);
- //封装成MessageProtocol对象
- MessageProtocol messageProtocol = new MessageProtocol();
- messageProtocol.setLength(length);
- messageProtocol.setContent(bytes);
- //把 MessageProtocol对象放入list中
- out.add(messageProtocol);
-
- }
- }
- @Slf4j
- public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
- //计数器
- private int count;
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
- //接收数据,并处理
- int length = msg.getLength();
- byte[] content = msg.getContent();
- log.info("服务器接收到信息如下,消息长度是:{},消息内容是{}",length,new String(content, Charset.forName("utf-8")));
- log.info("服务器接收到数据包的数量是:{}",(++this.count));
- //服务器回复消息
- String responseMessage = UUID.randomUUID().toString();
- int responseLength = responseMessage.getBytes("utf-8").length;
- byte[] responseMessageBytes = responseMessage.getBytes("utf-8");
- //构建协议包
- MessageProtocol messageProtocol = new MessageProtocol();
- messageProtocol.setLength(responseLength);
- messageProtocol.setContent(responseMessageBytes);
- ctx.writeAndFlush(messageProtocol);
- }
- }
- public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new MyMessageDecoder());
- pipeline.addLast(new MyMessageEncoder());
- pipeline.addLast(new MyServerHandler());
- }
- }
- public class MyServer {
- public static void main(String[] args) {
- NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup,workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new MyServerInitializer());
- ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
- channelFuture.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- }
- @Slf4j
- public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
- private int count;
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- //使用客户端发送10条数据
- for (int i = 0; i < 5; i++) {
- String msg = "孔乙己" + i;
- int length = msg.getBytes().length;
- byte[] bytes = msg.getBytes();
- MessageProtocol messageProtocol = new MessageProtocol();
- messageProtocol.setContent(bytes);
- messageProtocol.setLength(length);
- ctx.writeAndFlush(messageProtocol);
- }
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
- int length = msg.getLength();
- byte[] content = msg.getContent();
- log.info("客户端接收到消息如下,长度:{},内容:{}",length,new String(content, Charset.forName("utf-8")));
- log.info("客户端接收消息数量是:{}",(++this.count));
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.close();
- }
- }
- public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new MyMessageEncoder());
- pipeline.addLast(new MyMessageDecoder());
-
- pipeline.addLast(new MyClientHandler());
- }
- }
- public class MyClient {
- public static void main(String[] args) {
- NioEventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group)
- .channel(NioSocketChannel.class)
- .handler(new MyClientInitializer());
- ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000).sync();
- channelFuture.channel().closeFuture().sync();
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- group.shutdownGracefully();
- }
- }
- }
服务端:
客户端:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。