赞
踩
今天我们分享springboot集成netty的过程:
1、jar包依赖,Netty服务端和客户端依赖的jar包一样:
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.28.Final</version>
- </dependency>
- <!-- 序列化jar -->
- <dependency>
- <groupId>de.javakaffee</groupId>
- <artifactId>kryo-serializers</artifactId>
- <version>0.42</version>
- </dependency>
2、Netty服务端核心代码(boot启动类省略):
2.1、netty功能启动入口
-
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
- import io.netty.handler.codec.LengthFieldPrepender;
- import io.netty.handler.timeout.ReadTimeoutHandler;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- /**
- * 类说明:服务端Handler的初始化
- * 交给Spring 托管,ServerBusiHandler用注入方式实例化后加入Netty的pipeline
- */
- @Service//启动时扫描,即交给spring管理
- public class ServerInit extends ChannelInitializer<SocketChannel> {
-
- @Autowired
- private ServerBusiHandler serverBusiHandler;
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- /*Netty提供的日志打印Handler,可以展示发送接收出去的字节*/
- //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
- /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
- ch.pipeline().addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(65535,
- 0,2,0,
- 2));
- /*给发送出去的消息增加长度字段*/
- ch.pipeline().addLast("frameEncoder",
- new LengthFieldPrepender(2));
- /*反序列化,将字节数组转换为消息实体*/
- ch.pipeline().addLast(new KryoDecoder());
- /*序列化,将消息实体转换为字节数组准备进行网络传输*/
- ch.pipeline().addLast("MessageEncoder",
- new KryoEncoder());
- /*超时检测*/
- ch.pipeline().addLast("readTimeoutHandler",
- new ReadTimeoutHandler(50));
- /*登录应答*/
- ch.pipeline().addLast(new LoginAuthRespHandler());
-
- /*心跳应答*/
- ch.pipeline().addLast("HeartBeatHandler",
- new HeartBeatRespHandler());
-
- /*服务端业务处理*/
- ch.pipeline().addLast("ServerBusiHandler",
- serverBusiHandler);
- }
- }
2.2、netty自定义配置入口 ServerInit
-
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
- import io.netty.handler.codec.LengthFieldPrepender;
- import io.netty.handler.timeout.ReadTimeoutHandler;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- /**
- * 类说明:服务端Handler的初始化
- * 交给Spring 托管,ServerBusiHandler用注入方式实例化后加入Netty的pipeline
- */
- @Service//启动时扫描,即交给spring管理
- public class ServerInit extends ChannelInitializer<SocketChannel> {
-
- @Autowired
- private ServerBusiHandler serverBusiHandler;
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- /*Netty提供的日志打印Handler,可以展示发送接收出去的字节*/
- //ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
- /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
- ch.pipeline().addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(65535,
- 0,2,0,
- 2));
- /*给发送出去的消息增加长度字段*/
- ch.pipeline().addLast("frameEncoder",
- new LengthFieldPrepender(2));
- /*反序列化,将字节数组转换为消息实体*/
- ch.pipeline().addLast(new KryoDecoder());
- /*序列化,将消息实体转换为字节数组准备进行网络传输*/
- ch.pipeline().addLast("MessageEncoder",
- new KryoEncoder());
- /*超时检测*/
- ch.pipeline().addLast("readTimeoutHandler",
- new ReadTimeoutHandler(50));
- /*登录应答*/
- ch.pipeline().addLast(new LoginAuthRespHandler());
-
- /*心跳应答*/
- ch.pipeline().addLast("HeartBeatHandler",
- new HeartBeatRespHandler());
-
- /*服务端业务处理*/
- ch.pipeline().addLast("ServerBusiHandler",
- serverBusiHandler);
- }
- }
2.3、netty处理业务的Handler入口 ServerBusiHandler 类:
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import java.lang.reflect.Method;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author Mark老师 享学课堂 https://enjoy.ke.qq.com
- * 类说明:业务处理类
- * channelRead0方法中有了实际的业务处理,负责具体的业务方法的调用
- *
- */
- @Service
- @ChannelHandler.Sharable//共享的单例
- public class ServerBusiHandler
- extends SimpleChannelInboundHandler<MyMessage> {
- private static final Log LOG
- = LogFactory.getLog(ServerBusiHandler.class);
-
- @Autowired
- private RegisterService registerService;
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg)
- throws Exception {
- LOG.info(msg);
- MyMessage message = new MyMessage();
- MyHeader myHeader = new MyHeader();
- myHeader.setSessionID(msg.getMyHeader().getSessionID());
- myHeader.setType(MessageType.SERVICE_RESP.value());
- message.setMyHeader(myHeader);
- Map<String,Object> content = (HashMap<String,Object>)msg.getBody();
- /*方法所在类名接口名*/
- String serviceName = (String) content.get("siName");
- /*方法的名字*/
- String methodName = (String) content.get("methodName");
- /*方法的入参类型*/
- Class<?>[] paramTypes = (Class<?>[]) content.get("paraTypes");
- /*方法的入参的值*/
- Object[] args = (Object[]) content.get("args");
- /*从容器中拿到服务的Class对象*/
- Class serviceClass = registerService.getLocalService(serviceName);
- if(serviceClass == null){
- throw new ClassNotFoundException(serviceName+ " not found");
- }
-
- /*通过反射,执行实际的服务*/
- Method method = serviceClass.getMethod(methodName, paramTypes);
- boolean result = (boolean)method.invoke(serviceClass.newInstance(),args);
- message.setBody(result);
- ctx.writeAndFlush(message);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx)
- throws Exception {
- LOG.info(ctx.channel().remoteAddress()+" 主动断开了连接!");
- }
-
- }
3、客户端核心代码实现(boot启动类省略):
3.1、netty客户端启动入口
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
- import java.net.InetSocketAddress;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- /**
- *类说明:rpc框架的客户端代理部分,交给Spring 托管
- * 1、动态代理的实现中,不再连接服务器,而是直接发送请求
- * 2、客户端网络部分的主体,包括Netty组件的初始化,连接服务器等
- */
- @Service
- public class RpcClientFrame implements Runnable{
-
- private static final Log LOG = LogFactory.getLog(RpcClientFrame.class);
-
- private ScheduledExecutorService executor = Executors
- .newScheduledThreadPool(1);
- private Channel channel;
- private EventLoopGroup group = new NioEventLoopGroup();
-
- /*是否用户主动关闭连接的标志值*/
- private volatile boolean userClose = false;
- /*连接是否成功关闭的标志值*/
- private volatile boolean connected = false;
-
- @Autowired
- private ClientInit clientInit;
- @Autowired
- private ClientBusiHandler clientBusiHandler;
-
- /*远程服务的代理对象,参数为客户端要调用的的服务*/
- public <T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception {
-
- /*拿到一个代理对象,由这个代理对象通过网络进行实际的服务调用*/
- return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(),
- new Class<?>[]{serviceInterface},
- new DynProxy(serviceInterface,clientBusiHandler));
- }
-
- /*动态代理,实现对远程服务的访问*/
- private static class DynProxy implements InvocationHandler{
- private Class<?> serviceInterface;
- private ClientBusiHandler clientBusiHandler;
-
- public DynProxy(Class<?> serviceInterface, ClientBusiHandler clientBusiHandler) {
- this.serviceInterface = serviceInterface;
- this.clientBusiHandler = clientBusiHandler;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- Map<String,Object> content = new HashMap<>();
- content.put("siName",serviceInterface.getName());
- content.put("methodName",method.getName());
- content.put("paraTypes",method.getParameterTypes());
- content.put("args",args);
- return clientBusiHandler.send(content);
- }
- }
-
- public boolean isConnected() {
- return connected;
- }
-
- /*连接服务器*/
- public void connect(int port, String host) throws Exception {
-
- try {
- Bootstrap b = new Bootstrap();
- b.group(group).channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(clientInit);
- // 发起异步连接操作
- ChannelFuture future = b.connect(
- new InetSocketAddress(host, port)).sync();
- channel = future.sync().channel();
- /*连接成功后通知等待线程,连接已经建立*/
- synchronized (this){
- this.connected = true;
- this.notifyAll();
- }
- future.channel().closeFuture().sync();
- } finally {
- if(!userClose){/*非用户主动关闭,说明发生了网络问题,需要进行重连操作*/
- System.out.println("发现异常,可能发生了服务器异常或网络问题," +
- "准备进行重连.....");
- //再次发起重连操作
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- TimeUnit.SECONDS.sleep(1);
- try {
- // 发起重连操作
- connect(NettyConstant.REMOTE_PORT,
- NettyConstant.REMOTE_IP);
- } catch (Exception e) {
- e.printStackTrace();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }else{/*用户主动关闭,释放资源*/
- channel = null;
- group.shutdownGracefully().sync();
- connected = false;
- // synchronized (this){
- // this.connected = false;
- // this.notifyAll();
- // }
- }
- }
- }
-
- @Override
- public void run() {
- try {
- connect(NettyConstant.REMOTE_PORT, NettyConstant.REMOTE_IP);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void close() {
- userClose = true;
- channel.close();
- }
-
- @PostConstruct
- public void startNet() throws InterruptedException {
- new Thread(this).start();
- while(!this.isConnected()){
- synchronized (this){
- this.wait();
- }
- }
- LOG.info("网络通信已准备好,可以进行业务操作了........");
- }
-
- @PreDestroy
- public void stopNet(){
- close();
- }
-
- }
3.2、netty客户端自定义配置入口 ClientInit
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
- import io.netty.handler.codec.LengthFieldPrepender;
- import io.netty.handler.timeout.ReadTimeoutHandler;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- /**
- * @author Mark老师 享学课堂 https://enjoy.ke.qq.com
- * 类说明:客户端Handler的初始化
- * 交给Spring 托管,clientBusiHandler用注入方式实例化后加入Netty的pipeline
- */
- @Service
- public class ClientInit extends ChannelInitializer<SocketChannel> {
-
- @Autowired
- private ClientBusiHandler clientBusiHandler;
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- /*剥离接收到的消息的长度字段,拿到实际的消息报文的字节数组*/
- ch.pipeline().addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(65535,
- 0,2,0,
- 2));
-
- /*给发送出去的消息增加长度字段*/
- ch.pipeline().addLast("frameEncoder",
- new LengthFieldPrepender(2));
-
- /*反序列化,将字节数组转换为消息实体*/
- ch.pipeline().addLast(new KryoDecoder());
- /*序列化,将消息实体转换为字节数组准备进行网络传输*/
- ch.pipeline().addLast("MessageEncoder",
- new KryoEncoder());
-
- /*超时检测*/
- ch.pipeline().addLast("readTimeoutHandler",
- new ReadTimeoutHandler(10));
-
- /*发出登录请求*/
- ch.pipeline().addLast("LoginAuthHandler",
- new LoginAuthReqHandler());
-
- /*发出心跳请求*/
- ch.pipeline().addLast("HeartBeatHandler",
- new HeartBeatReqHandler());
-
- /*业务处理*/
- ch.pipeline().addLast("ClientBusiHandler",
- clientBusiHandler);
- }
- }
3.3、netty通信中序列化的实现
序列化的Handler
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
-
- /**
- * 类说明:序列化的Handler
- */
- public class KryoEncoder extends MessageToByteEncoder<MyMessage> {
-
- @Override
- protected void encode(ChannelHandlerContext ctx, MyMessage message,
- ByteBuf out) throws Exception {
- KryoSerializer.serialize(message, out);
- ctx.flush();
- }
- }
反序列化的Handler
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
-
- import java.util.List;
-
- /**
- * 类说明:反序列化的Handler
- */
- public class KryoDecoder extends ByteToMessageDecoder {
-
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in,
- List<Object> out) throws Exception {
- Object obj = KryoSerializer.deserialize(in);
- out.add(obj);
- }
- }
信息类的定义:
- /**
- * 类说明:消息实体类
- */
- public final class MyMessage {
-
- private MyHeader myHeader;
-
- private Object body;
-
- public final MyHeader getMyHeader() {
- return myHeader;
- }
-
- public final void setMyHeader(MyHeader myHeader) {
- this.myHeader = myHeader;
- }
-
- public final Object getBody() {
- return body;
- }
-
- public final void setBody(Object body) {
- this.body = body;
- }
-
- @Override
- public String toString() {
- return "MyMessage [myHeader=" + myHeader + "][body="+body+"]";
- }
- }
4、启动Netty服务端:
5、启动客户端
接下来就可以通信了,比如,客户端发送消息:
到此,springboot集成netty的过程基本结束,后期会详细分析其前后端交互过程,敬请期待。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。