赞
踩
RPC实现Consumer 远程调用
梳理一下基本的实现思路,主要完成一个这样的功能:API 模块中的接口功能在服务端实现(并没有在客户端实现)。因此,客户端调用API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样。具体调用过程如下图所示:
1.服务提供者
package com.netty.provider; import com.netty.api.IRpcService; /** * Created by chenli on 2019/11/13. */ public class IRpcServiceImpl implements IRpcService { @Override public int add(int a, int b) { return a+b; } @Override public int sub(int a, int b) { return a-b; } @Override public int multi(int a, int b) { return a*b; } @Override public int div(int a, int b) { return a/b; } }
2.提供给消费端的API接口
package com.netty.api; /** * Created by chenli on 2019/11/13. */ public interface IRpcService { //加 int add(int a,int b); /** * 减 * @param a * @param b * @return */ int sub(int a,int b); /** * × * @param a * @param b * @return */ int multi(int a,int b); /** * 除 * @param a * @param b * @return */ int div(int a , int b); }
3.服务注册中心
package com.netty.registry; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; /** * Created by chenli on 2019/11/13. */ public class RpcRegistry { //应该有端口 private int port; public RpcRegistry(int port) { this.port = port; } public void start(){ //利用netty创建注册中心 //boss线程 EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGoup=new NioEventLoopGroup(); //服务 ServerBootstrap server=new ServerBootstrap(); server.group(bossGroup,workerGoup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline=socketChannel.pipeline(); //自定义协议解码器 /** 入参有5个,分别解释如下 maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。 lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置 lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8) lengthAdjustment:要添加到长度字段值的补偿值 initialBytesToStrip:从解码帧中去除的第一个字节数 */ pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //自定义协议编码器 pipeline.addLast(new LengthFieldPrepender(4)); //对象参数类型编码器 pipeline.addLast("encoder",new ObjectEncoder()); //对象参数类型解码器 pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast(new RpcRegistryHandler()); } }) //主线程 .option(ChannelOption.SO_BACKLOG,128) //工作线程--保持长连接 .childOption(ChannelOption.SO_KEEPALIVE,true); //启动 try { ChannelFuture channelFuture=server.bind(port).sync(); System.out.println(" RPC Registry start listen at " + port ); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new RpcRegistry(8080).start(); } }
4.注册中心实际处理逻辑
package com.netty.registry; import com.netty.protocol.InvokerProtocol; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.concurrent.EventExecutorGroup; import java.io.File; import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * Created by chenli on 2019/11/13. */ public class RpcRegistryHandler extends ChannelInboundHandlerAdapter { //保存所有可用的服务 private static ConcurrentHashMap<String ,Object> registerMap=new ConcurrentHashMap<>(); //保存所有相关的服务名 private List<String> className=new ArrayList<>(); public RpcRegistryHandler() { //扫描服务 //注册类 scannerClass("com.netty.provider"); doRegistry(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Object result=new Object(); InvokerProtocol invokerProtocol=(InvokerProtocol) msg; if (registerMap.containsKey(invokerProtocol.getClassName())){ Object object=registerMap.get(invokerProtocol.getClassName()); Method method=object.getClass().getMethod(invokerProtocol.getMethodName(),invokerProtocol.getParams()); result=method.invoke(object,invokerProtocol.getValues()); } ctx.write(result); ctx.flush(); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } private void doRegistry() { if (className.size()==0){ return; } for (String className1:className){ try { Class<?> c=Class.forName(className1); Class<?> i=c.getInterfaces()[0];//拿到该类第一个实现接口 registerMap.put(i.getName(),c.newInstance()); } catch (Exception e) { e.printStackTrace(); } } } private void scannerClass( String packageName) { URL uri=this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/")); File dir=new File(uri.getFile()); for (File file: dir.listFiles()) { if (file.isDirectory()){ scannerClass(dir+"."+file.getName()); }else{ className.add(packageName+"."+file.getName().replace(".class","").trim()); } } } }
5.传输协议
package com.netty.protocol; import java.io.Serializable; /** * Created by chenli on 2019/11/13. */ public class InvokerProtocol implements Serializable{ private String className;//类名 private String methodName;//方法名 private Object[] values;//实参 private Class<?>[] params;//形参列表 public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getValues() { return values; } public void setValues(Object[] values) { this.values = values; } public Class<?>[] getParams() { return params; } public void setParams(Class<?>[] params) { this.params = params; } }
6.消费者测试
package com.netty.consumer; import com.netty.api.IRpcService; /** * Created by chenli on 2019/11/13. */ public class RpcConsumer { public static void main(String[] args) { IRpcService iRpcService=RpcProxy.create(IRpcService.class); System.out.println(iRpcService.add(1,2)); System.out.println(iRpcService.sub(2,1)); } }
7.远程代理接口
package com.netty.consumer; import com.netty.protocol.InvokerProtocol; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** * Created by chenli on 2019/11/13. */ public class RpcProxy { public static <T> T create(Class<?> clazz){ MethodProxy methodProxy=new MethodProxy(clazz); Class<?> [] interfaces=clazz.isInterface() ? new Class[]{clazz}: clazz.getInterfaces(); T result=(T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,methodProxy); return result; } private static class MethodProxy implements InvocationHandler { private Class<?> clazz; public MethodProxy(Class<?> clazz) { this.clazz = clazz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //如果传进来是一个已实现的具体类 if (Object.class.equals(method.getDeclaringClass())) { try { //具体实现类,直接调 return method.invoke(this, args); } catch (Exception e) { e.printStackTrace(); } } else { return rpcInvoke(proxy, method, args); } return null; } private Object rpcInvoke(Object proxy, Method method, Object[] args) { //封装协议 InvokerProtocol invokerProtocol = new InvokerProtocol(); invokerProtocol.setMethodName(method.getName()); invokerProtocol.setClassName(this.clazz.getName()); invokerProtocol.setValues(args); invokerProtocol.setParams(method.getParameterTypes()); final RpcProxyHandler consumerHandler = new RpcProxyHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap service = new Bootstrap(); service.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //自定义协议解码器 /** 入参有5个,分别解释如下 maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。 lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置 lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8) lengthAdjustment:要添加到长度字段值的补偿值 initialBytesToStrip:从解码帧中去除的第一个字节数 */ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //自定义协议编码器 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); //对象参数类型编码器 pipeline.addLast("encoder", new ObjectEncoder()); //对象参数类型解码器 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast("handler", consumerHandler); } }); try { ChannelFuture channelFuture = service.connect("localhost", 8080).sync(); channelFuture.channel().writeAndFlush(invokerProtocol).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } return consumerHandler.getResponse(); } } }
8.处理返回消息
package com.netty.consumer; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Created by chenli on 2019/11/14. */ public class RpcProxyHandler extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() { return response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response=msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
版权声明:本文为CSDN博主「Leon_Jinhai_Sun」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Leon_Jinhai_Sun/article/details/112549242
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。