赞
踩
搞搞netty时发现默认的id很长,无法直接自定义。
于是我网上搜索了search一下,发现没有相关文章,那就自己看看源码手撸一个实现。这难不倒拥有7年代码经验的我
,通过本文章你能大概学到如何根据源码定制功能。
通过netty官网说明唯一id:https://netty.io/wiki/new-and-noteworthy-in-4.1.html
全局唯一通道ID
每个频道现在都有一个全局唯一的ID,该ID由以下内容生成:
优选全局唯一的MAC地址(EUI-48或EUI-64),
当前进程ID,
系统#currentTimeMillis()
系统#nanoTime()
随机32位整数
顺序递增的32位整数
可以使用信道获得信道的ID.id()方法。
默认的id是这样的:通道id:a85e45fffec07f9b-00002454-00000000-084db96c90765225-d150e0b4
虽然很唯一,但不符合我们的系统,应该将它自定义。
我是基于netty 4.1.79.Final
2022年8月12日最新版本。
首先看id的实现接口:IDEA中按Ctrl + Alt + O
搜索 channel ID
应该就是这个了ChannelId
接口了,接着看他的实现类
应该就是默认的实现了
观察到id为空时他就会new一个
接着看一下哪里用他生成:
此时我们发现是一个构造类用到了他AbstractChannel
,AbstractChannel
应该就是所有管道类的父类。
他的构造方法中也newId()
了
他就是在这里吧ID给生成出来:
框架开发一般规则/套路:不直接使用构造类,那么我们看看他的继承使用情况:
太多了,我们不知道哪个会加载。
回到我们的客户端初始化类,看看能不能在初始化配置时找到自定义他的地方:
此时发现打印的是class io.netty.channel.socket.nio.NioSocketChannel
这个类,恰好对应上面的NioSocketChannel
处理
NioSocketChannel
这个是客户端的管道处理类,默认是使用socket协议。恰好发现它是继承了AbstractChannel
,
那应该就是通过自定义NioSocketChannel
这个类进行自定义id生成。
通过上面的结果,我们通过继承NioSocketChannel
来实现自定义id生成:使用UUID
import io.netty.channel.ChannelId; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.UUID; /** * @author lingkang * Created by 2022/8/12 */ public class MyNioSocketChannel extends NioSocketChannel { protected ChannelId newId() { ChannelId channelId = new ChannelId() { @Override public String asShortText() { return UUID.randomUUID().toString(); } @Override public String asLongText() { return UUID.randomUUID().toString(); } @Override public int compareTo(ChannelId o) { return 0; } }; return channelId; } }
客户端初始化那就是用我们集成自定义的类
运行结果:
这个自定义是正确的,所以服务端也能按照上面的思路进行自定义。
import io.netty.channel.ChannelId; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.UUID; /** * @author lingkang * Created by 2022/8/12 */ public class MyNioSocketChannel extends NioSocketChannel { protected ChannelId newId() { ChannelId channelId = new ChannelId() { @Override public String asShortText() { return UUID.randomUUID().toString(); } @Override public String asLongText() { return UUID.randomUUID().toString(); } @Override public int compareTo(ChannelId o) { return 0; } }; return channelId; } }
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import top.lingkang.flychat.common.MsgBody; import top.lingkang.flychat.common.code.RpcDecoder; import top.lingkang.flychat.common.code.RpcEncoder; import top.lingkang.flychat.server.ServerHandler; import top.lingkang.flychat.server.ServerInit; import java.util.Date; /** * @author lingkang * Created by 2022/8/12 */ public class Test01Server { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) // 当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。 .option(ChannelOption.SO_BACKLOG, 50) // .childOption(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addFirst("encode", new RpcEncoder(MsgBody.class))//编码器 .addFirst("decode", new RpcDecoder(MsgBody.class))//解码器 .addLast(new ChannelInboundHandlerAdapter(){ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MsgBody body = (MsgBody) msg; System.out.println("接收到Client端信息:" + body.toString()); //返回的数据结构 MsgBody response = new MsgBody(); response.setCode(200); response.setData(new Date()); response.setMsg("server响应结果"); System.out.println("server thread id=" + Thread.currentThread().getId()); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); closeOnFlush(ctx.channel()); } private void closeOnFlush(Channel ch) { if (ch.isActive()) { ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); System.out.println("有链接断开"); } }); } }); //启动同步监听 serverBootstrap.bind("127.0.0.1", 8081).sync().channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import top.lingkang.flychat.common.MsgBody; import top.lingkang.flychat.common.code.RpcDecoder; import top.lingkang.flychat.common.code.RpcEncoder; import java.util.Date; /** * @author lingkang * Created by 2022/8/12 */ public class Test01Client { public static void main(String[] args) throws Exception { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(1); try { bootstrap .group(group) .channel(MyNioSocketChannel.class)// 使用NioSocketChannel来作为连接用的channel类 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println(ch.getClass()); ch.pipeline() .addFirst("encode", new RpcEncoder(MsgBody.class))//编码器 .addFirst("decode", new RpcDecoder(MsgBody.class))//解码器 .addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MsgBody body = (MsgBody) msg; System.out.println("接收到Server端响应消息:" + body.toString()); // throw new RuntimeException("手动抛出"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { closeOnFlush(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); closeOnFlush(ctx.channel()); } private void closeOnFlush(Channel ch) { if (ch.isActive()) { ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } } }); } }); // Start the connection attempt. Channel channel = bootstrap.connect("127.0.0.1", 8081).sync().channel(); System.out.println("连接服务器成功"); int i = 0; while (true) { try { //每2秒给服务器发一次数据 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } i++; channel.writeAndFlush(new MsgBody(200, "客户端给服务端发消息:" + i, new Date())); if (i == 5) { System.out.println("通道id:" + channel.id().asLongText()); channel.close(); break; } } } finally { //关闭线程组 group.shutdownGracefully(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。