赞
踩
系列上一篇文章:《【Netty专题】Netty实战与核心组件详解》
长连接:
长连接,也叫持久连接,是指在TCP层握手成功后,不立即断开连接,并在此连接的基础上进行多次消息(包括心跳)交互,直至连接的任意一方(客户端OR服务端)主动断开连接,此过程称为一次完整的长连接。
(SOCKET 连接后不管是否使用都保持连接的一种连接)
短连接:
短连接,顾名思义,与长连接的区别就是,客户端收到服务端的响应后,立刻发送FIN消息,主动释放连接。也有服务端主动断连的情况,凡是在一次消息交互(发请求-收响应)之后立刻断开连接的情况都称为短连接。短连接是建立在TCP协议上的,有完整的握手挥手流程,区别于UDP协议。(SOCKET 连接发送数据接收完数据后马上断开连接的一种连接)
这里已经假设,我们确定使用Netty作为我们的网络编程框架了。但是我想,跟楼主一样的初学者,还是会有疑惑:那我想使用Netty开发一个简单的通信交互程序,该如何做到呢?
讲真的, 这个问题确实难倒了我。因为不在那个层次,我甚至连【需要关心什么问题】都搞不懂。思来想去,我只能以【没见过猪跑,但吃过猪肉】的视角出发, 大概说一下自己的想法了。
(序列化反序列化,跟编解码意思很像,需要大家自己多多去理解)
Q1:什么是序列化、反序列化?
答:序列化,就是将数据转换为适合存储,或者传输的过程。它侧重于聚焦于程序内部数据结构与表示格式之间的转换,尤其是面向对象编程中,将对象的状态和属性转换为适合存储或传输的格式(如JSON、XML、二进制序列化等),这个过程就是【序列化】;反之则是【反序列化】。
Q2:序列化有什么作用?
答:主要目的有:【网络传输】及【对象持久化保存】。持久化保存知道啥意思吧,就是存到各种数据库中。
Q3:什么是编解码问题?
答:编解码则主要针对的是数据的表现形式或数据格式的转换,如文本编码(ASCII、Unicode、GBK等)、图像编码(JPEG、PNG等)、音频编码(MP3、AAC等),以及网络传输层(如TCP/IP)中的比特流编码等,其关注的是数据的二进制表示和网络传输的需求。
说到编解码,不得不提字符集:
我们常见的字符集有UTF-8,GBK,ASCII等。将我们的数据,转换成某种字符集表示的二进制数的过程就是编码,反过来就是解码。
摘抄自【百度:文心一言】。
心跳保活机制是一种维持网络连接长连接的机制,它通过定时发送心跳包来检测双方是否存活。如果没有特意的设置某些选项或者实现应用层心跳包,TCP空闲的时候是不会发送任何数据包。也就是说,当一个TCP的socket,客户端与服务端谁也不发送数据,会一直保持着连接。这其中如果有一方异常掉线(例如死机、路由被破坏、防火墙切断连接等),另一端如果没有发送数据,永远也不可能知道。这对于一些服务型的程序来说,是灾难性的后果,将会导致服务端socket资源耗尽。
因此,需要心跳保活机制来维持连接的有效性,及时有效地检测到一方的非正常断开,保证连接的资源被有效的利用。心跳保活机制可以应用在TCP协议层实现(例如使用TCP Keepalive),也可以在应用层实现(例如使用心跳包)。在应用层实现心跳保活机制时,通常由客户端向服务端发送自定义消息命令,服务端收到消息后回复自定义的消息给客户端。如果服务端未收到消息,则表示连接失败,如果失败的次数达到指定上限后,则重新发起连接。
这个无需多说,哪怕是我们在做web开发期间也会定义一个统一口径
特别提醒
个人在学习的时候发现网上对于【序列化】和【编解码】这两个概念多少有点混淆,或者说边界模糊。确实,他们是有些关联的,但区别也有。以下是摘抄自【百度:文心一言】
编码器和解码器常用于处理数据在不同格式之间的转换;
而序列化是将数据结构或对象状态转换为可以存储或传输的格式的过程。
因此,编码器和解码器主要关注的是数据的表示和转换,而序列化和反序列化主要关注的是对象状态的转换。
我在后面的编码中,将围绕以下功能来实现一个简单的长连接通信框架。功能如下:
模型解读如下:
1)客户端发送应用握手请求消息,携带节点 ID 等有效身份认证信息
2)服务端对应用握手请求消息进行合法性校验,包括节点 ID 有效性校验、节点重复登录校验和 IP 地址合法性校验,校验通过后,返回登录成功的应用握手应答消息
3)链路建立成功之后,客户端发送业务消息
4)链路成功之后,服务端发送心跳消息
5)链路建立成功之后,客户端发送心跳消息
6)链路建立成功之后,服务端发送业务消息
7)服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接
PS:协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,所以通信方式有如下两种:
- TWO-WAY:即需要响应的请求。如请求登录
- ONE-WAY:即无需响应的请求。如日志记录
双方之间的心跳采用 Ping-Pong 机制,当链路处于空闲状态时,客户端主动发送Ping 消息给服务端,服务端接收到 Ping 消息后发送应答消息 Pong 给客户端,如果客户端连续发送 N 条 Ping 消息都没有接收到服务端返回的 Pong 消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期 T 后发起重连操作,直到重连成功。
在我的设计中,把消息定义分为了两个部分:消息头、消息体。代码如下:
@Getter @Setter @ToString public class CommonMessage { /** * 消息头 */ private CommonMessageHeader header; /** * 0-失败;1-成功 */ private Byte result; /** * 消息体 */ private Object body; } @Getter @Setter @ToString public final class CommonMessageHeader { /** * 消息体的MD5摘要,用来做简单校验 */ private String md5; /** * 服务标识 */ private int severId; /** * 消息id */ private long msgID; /** * 消息类型,枚举值。见MessageType */ private byte type; }
然后是消息类型:
public enum MessageType { /** * 业务请求消息 */ SERVICE_REQ((byte) 0), /** * 业务应答消息 */ SERVICE_RESP((byte) 1), /** * 无需应答的业务请求消息 */ SERVICE_REQ_ONE_WAY((byte) 2), /** * 心跳请求消息 */ HEARTBEAT_REQ((byte) 99), /** * 心跳应答消息 */ HEARTBEAT_RESP((byte) 100), ; private byte value; MessageType(byte value) { this.value = value; } public byte value() { return this.value; } }
心跳机制我估计大家多少能理解,这个名字就起的很形象。当读或者写心跳消息发生 I/O 异常的时候,说明已经中断,此时需要立即关闭连接,如果是客户端,需要重新发起连接;如果是服务端,需要清空缓存的半包信息,等到客户端重连。
是的,两边都需要心跳检测,毕竟是【全双工】
但是心跳机制的设计,也是有点说法的。比如,什么时候需要传心跳包过去;发什么包过去。
先说发什么包过去。这个就比较简单了,正常来说发一个空包就行了,除非你有什么特别的要求。比如我就在消息定义中新增了一个类型简单标记一下而已。
再说,什么时候发。
方案一:最粗暴
最粗暴的当然是,TCP握手完成之后开始启动一个心跳任务,然后以固定的频率发送,不管三次二十一,我就要在存续期间一直发。这当然可以实现目标,但是,这【合李】吗?
方案二:小改进
很简单的道理啊,如果我们互相之间本身就正在进行业务上的通信,咱俩都正在【说话】呢,你还发个心跳过来问我【你死没死】啊,你礼貌吗这?所以,我们可以使用Netty提供的一个【写空闲检测】机制来完成。直接上源码给你们看:
// IdleStateHandler。一个实现了InBound和OutBound的Handler
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
参数解读:
readerIdleTimeSeconds
:当在指定的时间段内没有执行读操作时,将触发 IdleState.READER_IDLE
。0表示禁用writerIdleTimeSeconds
:当在指定的时间段内没有执行写操作时,将触发IdleState.WRITER_IDLE
。0表示禁用allIdleTimeSeconds
:当在指定的时间段内没有进行读写操作时,将触发IdleState.ALL_IDLE
。0表示禁用PS:检测空闲连接以及超时对于及时释放资源来说是至关重要的。这就是心跳机制做的事情。因为很重要,所以Netty也给我们预提供了这些Handler,就是上面说的IdleStateHandler
。
如果链路中断,等到INTERVAL
时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL
后再次发起重连,直到重连成功。
为了保持服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL
时间之后再发起重连,而不是失败后立即重连。
为了保证句柄资源能够及时释放,无论什么场景下重连失败,客户端必须保证自身的资源被及时释放,包括但不限于SocketChannel、Socket 等。
重连失败后,可以打印异常堆栈信息,方便后续的问题定位。
大家还记得吗?Handler
出入境可以是无序的,但是,同是入境、出境的Handler
之间是局部有序的。这不难理解,就跟JDK8的Stream一样,前面对流的操作会影响后面的结果。所以,顺序很重要。这边大概的模型如下:
我想,大家应该能理解为什么我的顺序是这样组织的吧…
大家好好思考下,理论上5/6是可以随意调换顺序的,毕竟【心跳包】是一种特殊的业务
我估计有很多人不理解【读空闲】跟【心跳请求/应答】的关系,大家可以再看看【2.4 心跳机制】的【方案二】。我说他们都是心跳机制里面的,怎么理解?
读空闲
:其实就是一种事件监听机制。监听Channel
上的【读事件】。当事件发生的时候触发对应事件,并且往管道中传输(说到这里了,写空闲
也知道了吧)心跳请求/应答
:对上面说的事件的响应写了通信的客户端、服务端怎么调试呢?咱也没有可视化界面,所以我就搞了一个最原始的,通过Scanner
输入命令的交互式调试方案。像这样:
你懂我意思吧
头疼。代码貌似也挺多的,本来想打包压缩包上来,然后只贴核心代码。但是我的电脑有加密软件,打包上来的代码可能会有问题。代码包结构如下:
红框内的pojo
已经给过了,咱就不重复上代码了。如果大家要本地运行的话,可以跟我一样先创建好对应的包,然后一个一个复制上去吧。真不是我不想给源码压缩包,而是加密问题。给了你们打开也是乱码
这三个类。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.server.ServerInitializer; /** * RPC Server服务端 * * @author zhangshen * @date 2023/10/25 9:16 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class NettyRpcServer { public static void main(String[] args) { try { start(); } catch (InterruptedException e) { e.printStackTrace(); log.error("【Netty服务器】启动失败"); } } private static void start() throws InterruptedException { EventLoopGroup acceptLoopGroup = new NioEventLoopGroup(); EventLoopGroup reactorLoopGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(acceptLoopGroup, reactorLoopGroup) .channel(NioServerSocketChannel.class) .localAddress(NettyConstant.PORT) .childHandler(new ServerInitializer()); serverBootstrap.bind().sync(); log.info("【Netty服务器】启动成功"); } }
上面这个很简单啦,跟上一篇文章的使用示例如出一辙。不同的是,在Pipeline
中链化Handler
的逻辑我单独抽出来写成一个ServerInitializer
。后面会讲
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.client.ClientInitializer; import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.common.helper.MessageGenerateHelper; import org.tuling.io.rpc.common.pojo.CommonMessage; import org.tuling.io.rpc.common.pojo.MessageType; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * RPC Client客户端 * * @author zhangshen * @date 2023/10/25 9:16 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class NettyRpcClient { private Channel channel; private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); private volatile boolean userClose = false; /** * 定时线程池,用于断线重连 */ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); public void connect() throws InterruptedException { try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ClientInitializer()); ChannelFuture sync = bootstrap.connect(new InetSocketAddress(NettyConstant.SERVER_IP, NettyConstant.PORT)).sync(); log.info("【Netty客户端】启动成功"); channel = sync.channel(); channel.closeFuture().sync(); } finally { if (userClose) { channel = null; eventLoopGroup.shutdownGracefully().sync(); } else { // 断线重连 reconect(); } } } /** * 断线重连 */ private void reconect() { log.info("【Netty客户端】开始断线重连"); executor.execute(() -> { try { // 给操作系统足够的时间,去释放相关的资源 TimeUnit.SECONDS.sleep(1); connect(); } catch (InterruptedException e) { e.printStackTrace(); } }); } public void sendMessage(Object msg) { if (channel == null || !channel.isActive()) { throw new IllegalStateException("和服务器还未未建立起有效连接,请稍后再试!!"); } CommonMessage message = MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ.value(), msg); log.info("【Netty客户端】发送消息。CommonMessage={}", message); channel.writeAndFlush(message); } public void sendOneWay(Object msg) { if (channel == null || !channel.isActive()) { throw new IllegalStateException("和服务器还未未建立起有效连接,请稍后再试!!"); } CommonMessage message = MessageGenerateHelper.requestWithMsgId(NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_REQ_ONE_WAY.value(), msg); log.info("【Netty客户端】发送消息。CommonMessage={}", message); channel.writeAndFlush(message); } public void close() { userClose = true; channel.close(); } }
这个相对于服务端,以及前面的使用示例的客户端确实稍微复杂一点。主要的变化是【断线重连】机制引起的:
reconnect
方法reconnect()
方法,处理断线重连import java.util.Scanner; /** * @author zhangshen * @date 2023/10/25 12:31 * @slogan 编码即学习,注释断语义 **/ public class ScannerCmdClient { public static void main(String[] args) throws InterruptedException { // 新建客户端 NettyRpcClient client = new NettyRpcClient(); // 显示菜单栏 showMenu(); Scanner scanner = new Scanner(System.in); while (true) { int cmd = scanner.nextInt(); switch (cmd) { case 1: client.connect(); Thread.sleep(3000); break; case 2: client.sendMessage("客户端发送双端信息"); break; case 3: client.sendOneWay("客户端发送ONE-WAY信息"); break; case 4: client.close(); case 5: showMenu(); default: client.close(); } } } /** * 展示菜单 */ private static void showMenu() { System.out.println("请选择以下功能:"); System.out.println("【1】与服务端建立连接"); System.out.println("【2】发送一个有响应的消息"); System.out.println("【3】发送一个无响应的消息"); System.out.println("【4】关闭连接"); System.out.println("【5】显示菜单栏"); } }
前面介绍过作用了,不多说了
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.timeout.ReadTimeoutHandler; import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.common.codec.KryoDecodeHandler; import org.tuling.io.rpc.common.codec.KryoEncodeHandler; import org.tuling.io.rpc.server.handler.ServerHeartBeatHandler; import org.tuling.io.rpc.server.handler.ServerLoginHandler; import org.tuling.io.rpc.server.handler.ServerOrderHandler; /** * 服务端,通道初始化器 * * @author zhangshen * @date 2023/10/25 9:44 * @slogan 编码即学习,注释断语义 **/ public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 添加【粘包/分包】处理器。 由Netty预备提供的 pipeline.addLast(new LengthFieldBasedFrameDecoder( 65535, 0, 2, 0,2 )); pipeline.addLast(new LengthFieldPrepender(2)); // 添加【序列化/反序列化】处理器,开源序列化工具 pipeline.addLast(new KryoDecodeHandler()); pipeline.addLast(new KryoEncodeHandler()); // 添加【心跳】处理器,Netty预备提供的 pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY)); // 添加业务处理器 pipeline.addLast(new ServerLoginHandler()); pipeline.addLast(new ServerHeartBeatHandler()); pipeline.addLast(new ServerOrderHandler()); } }
我就不一一说代码了,只贴。没啥难度的,最重要的还是顺序,在【2.6 Handler的顺序组织】已经解释了一波了。
ServerHeartBeatHandler
:
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.common.helper.MessageGenerateHelper; import org.tuling.io.rpc.common.pojo.CommonMessage; import org.tuling.io.rpc.common.pojo.CommonMessageHeader; import org.tuling.io.rpc.common.pojo.MessageType; import org.tuling.io.rpc.server.helper.SecurityCenterHelper; /** * 服务器心跳处理 * * @author zhangshen * @date 2023/10/25 10:33 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CommonMessage message = (CommonMessage) msg; CommonMessageHeader header = message.getHeader(); if (header == null) { log.error("【Netty服务器】非法消息"); ctx.writeAndFlush("非法消息"); ctx.close(); ReferenceCountUtil.release(msg); return; } if (header.getType() != MessageType.HEARTBEAT_REQ.value()) { ctx.fireChannelRead(msg); return; } // 处理心跳业务 log.info("【Netty服务器】心跳应答"); CommonMessage heartBeatResponse = MessageGenerateHelper.success(-1, MessageType.HEARTBEAT_RESP.value(), null); ctx.writeAndFlush(heartBeatResponse); ReferenceCountUtil.release(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if(cause instanceof ReadTimeoutException){ log.debug("【Netty服务器】客户端长时间未通信,可能已经宕机,关闭链路"); SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString()); ctx.close(); } super.exceptionCaught(ctx, cause); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.debug("【Netty服务器】客户端已关闭连接"); super.channelInactive(ctx); } }
ServerLoginHandler
:
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.common.helper.MessageGenerateHelper; import org.tuling.io.rpc.common.pojo.CommonMessage; import org.tuling.io.rpc.common.pojo.CommonMessageHeader; import org.tuling.io.rpc.common.pojo.MessageType; import org.tuling.io.rpc.server.helper.SecurityCenterHelper; import java.net.InetSocketAddress; /** * 登录服务器处理器 * * @author zhangshen * @date 2023/10/25 10:05 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class ServerLoginHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CommonMessage message = (CommonMessage) msg; CommonMessageHeader header = message.getHeader(); if (header == null) { log.error("【Netty服务器】非法消息"); ctx.writeAndFlush("非法消息"); ctx.close(); ReferenceCountUtil.release(msg); return; } if (header.getSeverId() != NettyConstant.LOGIN_SERVER_ID) { ctx.fireChannelRead(msg); return; } // 处理登录业务 this.checkLogin(ctx, msg); ReferenceCountUtil.release(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 删除缓存 SecurityCenterHelper.removeLoginUser(ctx.channel().remoteAddress().toString()); ctx.close(); } private void checkLogin(ChannelHandlerContext ctx, Object msg) { log.info("【Netty服务器】登录消息CommonMessage={}", msg); InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String userLoginIP = socketAddress.getAddress().getHostAddress(); // 白名单校验 boolean whiteIP = SecurityCenterHelper.isWhiteIP(userLoginIP); if (!whiteIP) { String errorMessage = "不在白名单内"; log.error("【Netty服务器】{}", errorMessage); CommonMessage message = MessageGenerateHelper.fail( NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage); ctx.writeAndFlush(message); ctx.close(); ReferenceCountUtil.release(msg); return; } // 重复登录校验 String userInfo = ctx.channel().remoteAddress().toString(); boolean repeatLogin = SecurityCenterHelper.isRepeatLogin(userInfo); if (repeatLogin) { String errorMessage = "重复登录"; log.error("【Netty服务器】{}", errorMessage); CommonMessage message = MessageGenerateHelper.fail( NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), errorMessage); ctx.writeAndFlush(message); ctx.close(); ReferenceCountUtil.release(msg); return; } // 通过校验,记录 SecurityCenterHelper.addLoginUser(userInfo); String successMessage = "登录成功"; log.info("【Netty服务器】{}", successMessage); CommonMessage message = MessageGenerateHelper.success( NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_RESP.value(), successMessage ); ctx.writeAndFlush(message); } }
ServerOrderHandler
:
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.biz.OrderInfo; import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.common.helper.EncryptHelper; import org.tuling.io.rpc.common.helper.MessageGenerateHelper; import org.tuling.io.rpc.common.pojo.CommonMessage; import org.tuling.io.rpc.common.pojo.CommonMessageHeader; import org.tuling.io.rpc.common.pojo.MessageType; import org.tuling.io.rpc.server.async.AsyncBusiProcessor; import java.math.BigDecimal; /** * 订单业务处理类 * * @author zhangshen * @date 2023/10/25 10:39 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class ServerOrderHandler extends SimpleChannelInboundHandler<CommonMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception { // 检查MD5 final CommonMessageHeader header = msg.getHeader(); if (header == null) { log.error("【Netty服务器】非法消息"); ctx.writeAndFlush("非法消息"); ctx.close(); ReferenceCountUtil.release(msg); return; } if (header.getSeverId() != NettyConstant.ORDER_SERVER_ID) { ctx.fireChannelRead(msg); return; } log.info("【Netty服务器】CommonMessage={}", msg); String headMd5 = header.getMd5(); String calcMd5 = EncryptHelper.encryptObj(msg.getBody()); if (!headMd5.equals(calcMd5)) { log.error("【Netty服务器】报文md5检查不通过:" + headMd5 + " vs " + calcMd5 + ",关闭连接"); CommonMessage message = MessageGenerateHelper.fail( NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(),"报文md5检查不通过,关闭连接" ); ctx.writeAndFlush(message); ctx.close(); } log.info(msg.toString()); if (header.getType() == MessageType.SERVICE_REQ_ONE_WAY.value()) { log.debug("【Netty服务器】ONE_WAY类型消息,异步处理"); AsyncBusiProcessor.submitTask(() -> { log.info("【Netty服务器】模仿异步,ONE_WEY业务处理"); }); } else { log.debug("【Netty服务器】TWO_WAY类型消息,应答"); OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId("123456"); orderInfo.setProductCount(2); orderInfo.setAmount(BigDecimal.valueOf(1499.99)); CommonMessage message = MessageGenerateHelper.success( NettyConstant.ORDER_SERVER_ID, MessageType.SERVICE_RESP.value(), orderInfo ); ctx.writeAndFlush(message); } } }
helper包就是utils工具包。我喜欢叫做helper而已。里面只有一个类SecurityCenterHelper
。用来实现【白名单】,还有【重复登录校验】机制。
import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; /** * 登录安全校验助手 * * @author zhangshen * @date 2023/10/25 10:14 * @slogan 编码即学习,注释断语义 **/ public class SecurityCenterHelper { /** * 用以检查用户是否重复登录的缓存 */ private static Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>(); /** * 用户登录的白名单 */ private static Set<String> whiteList = new CopyOnWriteArraySet<>(); static { whiteList.add("127.0.0.1"); } /** * 是否白名单内 */ public static boolean isWhiteIP(String ip) { return whiteList.contains(ip); } /** * 给定用户信息是否重复登录 */ public static boolean isRepeatLogin(String usrInfo) { return nodeCheck.containsKey(usrInfo); } /** * 添加登录用户信息 */ public static void addLoginUser(String usrInfo) { nodeCheck.put(usrInfo, true); } /** * 移除登录用户信息 */ public static void removeLoginUser(String usrInfo) { nodeCheck.remove(usrInfo, true); } }
里面只有一个类AsyncBusiProcessor
,用来处理需要【异步】的任务。
import io.netty.util.NettyRuntime; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * 异步业务处理器。某些消息可以异步处理,比如ONE_WAY类型消息 * * @author zhangshen * @date 2023/10/25 10:56 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class AsyncBusiProcessor { private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(3000); private static final ExecutorService executorService; static { int cores = NettyRuntime.availableProcessors(); executorService = new ThreadPoolExecutor( 1, cores, 60, TimeUnit.SECONDS, taskQueue ); } /** * 提交异步执行的任务 * * @param task 任务 */ public static void submitTask(Runnable task) { executorService.execute(task); } }
Client包下跟Server包下的东西其实差不多,大家自行理解
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.timeout.ReadTimeoutHandler; import org.tuling.io.rpc.client.handler.CheckWriteIdleHandler; import org.tuling.io.rpc.client.handler.ClientHeartBeatHandler; import org.tuling.io.rpc.client.handler.ClientLoginHandler; import org.tuling.io.rpc.client.handler.ClientOrderHandler; import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.common.codec.KryoDecodeHandler; import org.tuling.io.rpc.common.codec.KryoEncodeHandler; /** * 客户端,通道初始化器 * * @author zhangshen * @date 2023/10/25 9:44 * @slogan 编码即学习,注释断语义 **/ public class ClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); // 写空闲自己检测 pipeline.addLast(new CheckWriteIdleHandler()); // 添加【粘包/分包】处理器。 由Netty预备提供的 pipeline.addLast(new LengthFieldBasedFrameDecoder( 65535, 0, 2, 0,2 )); pipeline.addLast(new LengthFieldPrepender(2)); // 添加【序列化/反序列化】处理器,开源序列化工具 pipeline.addLast(new KryoDecodeHandler()); pipeline.addLast(new KryoEncodeHandler()); // 添加登录处理器 // 登录处理器需放在心跳前面 pipeline.addLast(new ClientLoginHandler()); // 添加【心跳】处理器,Netty预备提供的 pipeline.addLast(new ReadTimeoutHandler(NettyConstant.HEARBEAT_FREQUENCY)); pipeline.addLast(new ClientHeartBeatHandler()); pipeline.addLast(new ClientOrderHandler()); } }
CheckWriteIdleHandler
:客户端写空闲检测
import io.netty.handler.timeout.IdleStateHandler; /** * 客户端检测自己的写空闲 * * @author zhangshen * @date 2023/10/25 11:08 * @slogan 编码即学习,注释断语义 **/ public class CheckWriteIdleHandler extends IdleStateHandler { public CheckWriteIdleHandler() { super(0, 8, 0); } }
ClientHeartBeatHandler
:客户端心跳处理
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.common.helper.MessageGenerateHelper; import org.tuling.io.rpc.common.pojo.CommonMessage; import org.tuling.io.rpc.common.pojo.CommonMessageHeader; import org.tuling.io.rpc.common.pojo.MessageType; /** * 客户端在长久未向服务器业务请求时,发出心跳请求报文 * * @author zhangshen * @date 2023/10/25 11:33 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) { CommonMessage request = MessageGenerateHelper.request(-1, MessageType.HEARTBEAT_REQ.value(), null); log.debug("【Netty客户端】写空闲,发出心跳报文维持连接: " + request); ctx.writeAndFlush(request); } super.userEventTriggered(ctx, evt); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CommonMessage message = (CommonMessage) msg; CommonMessageHeader header = message.getHeader(); if (header != null && header.getType() == MessageType.HEARTBEAT_RESP.value() ) { log.debug("【Netty客户端】收到服务器心跳应答,服务器正常"); ReferenceCountUtil.release(msg); } else { ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { log.debug("【Netty客户端】服务器长时间未应答,关闭链路"); } super.exceptionCaught(ctx, cause); } }
ClientLoginHandler
:客户端登录请求。TCP三次握手成功之后就请求
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.common.helper.MessageGenerateHelper; import org.tuling.io.rpc.common.pojo.CommonMessage; import org.tuling.io.rpc.common.pojo.CommonMessageHeader; import org.tuling.io.rpc.common.pojo.MessageType; /** * 客户端,发起登录请求 * * @author zhangshen * @date 2023/10/25 11:11 * @slogan 编码即学习,注释断语义 **/ @Slf4j public class ClientLoginHandler extends ChannelInboundHandlerAdapter { public void channelActive(ChannelHandlerContext ctx) throws Exception { // TCP三次握手完成,发出认证请求 CommonMessage message = MessageGenerateHelper.request(NettyConstant.LOGIN_SERVER_ID, MessageType.SERVICE_REQ.value(), null); log.info("【Netty客户端】请求服务器认证 : " + message); ctx.writeAndFlush(message); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CommonMessage message = (CommonMessage) msg; CommonMessageHeader header = message.getHeader(); if (header != null && header.getType() == MessageType.SERVICE_RESP.value() && header.getSeverId() == NettyConstant.LOGIN_SERVER_ID ) { log.info("【Netty客户端】收到认证应答报文,服务器是否验证通过?"); byte loginResult = message.getResult(); if (loginResult != 1) { // 握手失败,关闭连接 log.debug("【Netty客户端】未通过认证,关闭连接: " + message); ctx.close(); } else { log.info("【Netty客户端】通过认证,移除本处理器,进入业务通信 : " + message); ctx.pipeline().remove(this); ReferenceCountUtil.release(msg); } } else { ctx.fireChannelRead(msg); } } }
ClientOrderHandler
:瞎写的一个业务拓展类,目前只是打印。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.tuling.io.rpc.common.pojo.CommonMessage; /** * @author Mark老师 * 类说明:接收业务应答消息并处理 */ @Slf4j public class ClientOrderHandler extends SimpleChannelInboundHandler<CommonMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, CommonMessage msg) throws Exception { log.info("【Netty客户端】业务应答消息:" + msg.toString()); ReferenceCountUtil.release(msg); } }
pojo的我就不贴了,看【2.3 消息体定义】
/** * 常量定义 * * @author zhangshen * @date 2023/10/25 9:41 * @slogan 编码即学习,注释断语义 **/ public interface NettyConstant { /** * 程序绑定端口 */ int PORT = 8585; /** * 程序ip地址 */ String SERVER_IP = "127.0.0.1"; /** * 成功 */ byte SUCCESS = 1; /** * 失败 */ byte FAIL = 0; /** * 心跳检测频率 * 单位:秒 */ int HEARBEAT_FREQUENCY = 15; /** * 登录服务器标识 */ int LOGIN_SERVER_ID = 1; /** * 订单服务器标识 */ int ORDER_SERVER_ID = 2; }
EncryptHelper
:防篡改的加密摘要
import org.tuling.io.rpc.common.codec.KryoSerializer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; /** * 摘要的工具类 * * @author zhangshen * @date 2023/10/25 10:47 * @slogan 编码即学习,注释断语义 **/ public class EncryptHelper { /** * 加密信息 * * @param strSrc 需要被摘要的字符串 * @param encName 摘要方式,有 MD5、SHA-1和SHA-256 这三种,缺省为MD5 * @return 返回摘要字符串 */ private static String EncryptStr(String strSrc, String encName) { MessageDigest md = null; String strDes = null; byte[] bt = strSrc.getBytes(); try { if (encName == null || encName.equals("")) { encName = "MD5"; } md = MessageDigest.getInstance(encName); md.update(bt); strDes = bytes2Hex(md.digest()); // to HexString } catch (NoSuchAlgorithmException e) { System.out.println("Invalid algorithm."); return null; } return strDes; } /** * MD5摘要 * * @param str 需要被摘要的字符串 * @return 对字符串str进行MD5摘要后,将摘要字符串返回 */ public static String EncryptByMD5(String str) { return EncryptStr(str, "MD5"); } /** * SHA1摘要 * * @param str 需要被摘要的字符串 * @return 对字符串str进行SHA-1摘要后,将摘要字符串返回 */ public static String EncryptBySHA1(String str) { return EncryptStr(str, "SHA-1"); } /** * SHA256摘要 * * @param str 需要被摘要的字符串 * @return 对字符串str进行SHA-256摘要后,将摘要字符串返回 */ public static String EncryptBySHA256(String str) { return EncryptStr(str, "SHA-256"); } /** * 字节转十六进制,结果以字符串形式呈现 */ private static String bytes2Hex(byte[] bts) { String des = ""; String tmp = null; for (int i = 0; i < bts.length; i++) { tmp = (Integer.toHexString(bts[i] & 0xFF)); if (tmp.length() == 1) { des += "0"; } des += tmp; } return des; } /** * 对字符串进行MD5加盐摘要 * 先将str进行一次MD5摘要,摘要后再取摘要后的字符串的第1、3、5个字符追加到摘要串, * 再拿这个摘要串再次进行摘要 */ public static String encrypt(String str) { String encryptStr = EncryptByMD5(str); if (encryptStr != null) { encryptStr = encryptStr + encryptStr.charAt(0) + encryptStr.charAt(2) + encryptStr.charAt(4); encryptStr = EncryptByMD5(encryptStr); } return encryptStr; } /** * 对对象进行MD5摘要,先对对象进行序列化,转为byte数组, * 再将byte数组转为字符串,然后进行MD5加盐摘要 */ public static String encryptObj(Object o) { return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o))); } }
MessageGenerateHelper
:消息生成工具,消除代码重复用的
import org.tuling.io.rpc.common.NettyConstant; import org.tuling.io.rpc.common.pojo.CommonMessage; import org.tuling.io.rpc.common.pojo.CommonMessageHeader; import java.util.concurrent.atomic.AtomicLong; /** * 消息生成助手 * * @author zhangshen * @date 2023/10/25 11:21 * @slogan 编码即学习,注释断语义 **/ public class MessageGenerateHelper { private static AtomicLong msgId = new AtomicLong(1); public static long getID() { return msgId.getAndIncrement(); } /** * 构建成功的业务消息 */ public static CommonMessage success(int serverId, byte type, Object msg) { CommonMessage message = new CommonMessage(); CommonMessageHeader header = getHeader(serverId, type); message.setHeader(header); message.setResult(NettyConstant.SUCCESS); message.setBody(msg); return message; } /** * 构建失败的业务消息 */ public static CommonMessage fail(int serverId, byte type, Object msg) { CommonMessage message = new CommonMessage(); CommonMessageHeader header = getHeader(serverId, type); message.setHeader(header); message.setResult(NettyConstant.FAIL); message.setBody(msg); return message; } /** * 构建请求业务消息 */ public static CommonMessage request(int serverId, byte type, Object msg) { CommonMessage message = new CommonMessage(); CommonMessageHeader header = getHeader(serverId, type); message.setHeader(header); message.setBody(msg); return message; } /** * 构建请求业务消息 */ public static CommonMessage requestWithMsgId(int serverId, byte type, Object msg) { CommonMessage message = new CommonMessage(); CommonMessageHeader header = getHeader(serverId, type); header.setMsgID(getID()); header.setMd5(EncryptHelper.encryptObj(msg)); message.setHeader(header); message.setBody(msg); return message; } private static CommonMessageHeader getHeader(int serverId, byte type) { CommonMessageHeader header = new CommonMessageHeader(); header.setSeverId(serverId); header.setType(type); return header; } }
这里面的是一个基于Kryo
编解码序列化API实现的。pom.xml
如下:
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.42</version>
</dependency>
当然大家可以使用其他的API,我这里是抄的。
KryoFactory
:Kryo实例,API要求的
import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.serializers.DefaultSerializers; import de.javakaffee.kryoserializers.*; import java.lang.reflect.InvocationHandler; import java.math.BigDecimal; import java.math.BigInteger; import java.net.URI; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; /** * Kryo的工厂,拿到Kryo的实例 * * @author zhanghuitong * @date 2023/10/25 20:12 * @slogan 编码即学习,注释断语义 **/ public class KryoFactory { public static Kryo createKryo() { Kryo kryo = new Kryo(); kryo.setRegistrationRequired(false); kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer()); kryo.register(InvocationHandler.class, new JdkProxySerializer()); kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer()); kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer()); kryo.register(Pattern.class, new RegexSerializer()); kryo.register(BitSet.class, new BitSetSerializer()); kryo.register(URI.class, new URISerializer()); kryo.register(UUID.class, new UUIDSerializer()); UnmodifiableCollectionsSerializer.registerSerializers(kryo); SynchronizedCollectionsSerializer.registerSerializers(kryo); kryo.register(HashMap.class); kryo.register(ArrayList.class); kryo.register(LinkedList.class); kryo.register(HashSet.class); kryo.register(TreeSet.class); kryo.register(Hashtable.class); kryo.register(Date.class); kryo.register(Calendar.class); kryo.register(ConcurrentHashMap.class); kryo.register(SimpleDateFormat.class); kryo.register(GregorianCalendar.class); kryo.register(Vector.class); kryo.register(BitSet.class); kryo.register(StringBuffer.class); kryo.register(StringBuilder.class); kryo.register(Object.class); kryo.register(Object[].class); kryo.register(String[].class); kryo.register(byte[].class); kryo.register(char[].class); kryo.register(int[].class); kryo.register(float[].class); kryo.register(double[].class); return kryo; } }
KryoSerializer
序列化工具:
/** * Kryo的序列化器,负责序列化和反序列化 * * @author zhanghuitong * @date 2023/10/25 20:12 * @slogan 编码即学习,注释断语义 **/ public class KryoSerializer { private static Kryo kryo = KryoFactory.createKryo(); /*序列化*/ public static void serialize(Object object, ByteBuf out) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Output output = new Output(baos); kryo.writeClassAndObject(output, object); output.flush(); output.close(); byte[] b = baos.toByteArray(); try { baos.flush(); baos.close(); } catch (IOException e) { e.printStackTrace(); } out.writeBytes(b); } /*序列化为一个字节数组,主要用在消息摘要上*/ public static byte[] obj2Bytes(Object object) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Output output = new Output(baos); kryo.writeClassAndObject(output, object); output.flush(); output.close(); byte[] b = baos.toByteArray(); try { baos.flush(); baos.close(); } catch (IOException e) { e.printStackTrace(); } return b; } /*反序列化*/ public static Object deserialize(ByteBuf out) { if (out == null) { return null; } Input input = new Input(new ByteBufInputStream(out)); return kryo.readClassAndObject(input); } }
KryoEncodeHandler
:编码处理器,实现了NettyMessageToByteEncoder
接口的Handler
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.tuling.io.rpc.common.pojo.CommonMessage; /** * 序列化的Handler * * @author zhangshen * @date 2023/10/25 9:54 * @slogan 编码即学习,注释断语义 **/ public class KryoEncodeHandler extends MessageToByteEncoder<CommonMessage> { @Override protected void encode(ChannelHandlerContext ctx, CommonMessage message, ByteBuf out) throws Exception { KryoSerializer.serialize(message, out); ctx.flush(); } }
KryoDecodeHandler
:解码处理器,实现了NettyMessageToByteEncoder
接口的Handler
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 反序列化的Handler * * @author zhangshen * @date 2023/10/25 9:54 * @slogan 编码即学习,注释断语义 **/ public class KryoDecodeHandler extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object obj = KryoSerializer.deserialize(in); out.add(obj); } }
下面只有一个类,OrderInfo
import lombok.Getter; import lombok.Setter; import lombok.ToString; import java.math.BigDecimal; /** * 订单信息 * * @author zhangshen * @date 2023/10/25 10:44 * @slogan 编码即学习,注释断语义 **/ @Getter @Setter @ToString public class OrderInfo { private String orderId; private Integer productCount; private BigDecimal amount; }
感谢【百度:文心一言】
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。