赞
踩
RPC 又名远程过程调用协议RPC(Remote Procedure Call Protocol),允许像调用本地服务一样调用远程服务。
RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
第一,首先,要解决通讯的问题,主要是通过在客户端和服务器之间建立TCP连接(socket),远程过程调用的所有交换的数据都在这个连接里传输。连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程过程调用共享同一个连接。
第二,要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的RPC框架,如何连接到B服务器(如主机或IP地址)以及特定的端口,方法的名称名称是什么,这样才能完成调用。
第三,当A服务器上的应用发起远程过程调用时,方法的参数需要通过底层的网络协议如TCP传递到B服务器,由于网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式,也就是序列化(Serialize),通过寻址和传输将序列化的二进制发送给B服务器。
第四,B服务器收到请求后,需要对参数进行反序列化(序列化的逆操作),恢复为内存中的表达方式,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值。
第五,返回值还要发送回服务器A上的应用,也要经过序列化的方式发送,服务器A接到后,再反序列化,恢复为内存中的表达方式,交给A服务器上的应用。
总而言之,RPC主要是为了解决分布式系统中,服务之间的调用问题。使得远程调用时,要能够像本地调用一样方便,让调用者感知不到远程调用的逻辑。
@Slf4j public class PRCServer { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup work = new NioEventLoopGroup(); RPCMessageCodecSharable rpcMessageCodecSharableHandler = new RPCMessageCodecSharable(); RPCRequestMessageHandler requestMessageHandler = new RPCRequestMessageHandler(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, work); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //预设好的长度用于解决粘包和半包现象,客户端和服务端都要保持一致。 ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0)); //日志处理 ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); //编解码器 用于客户端和服务端之间数据传输的编码和解码工作 ch.pipeline().addLast(rpcMessageCodecSharableHandler); //处理客户端的请求服务 ch.pipeline().addLast(requestMessageHandler); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); Channel channel = channelFuture.channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("服务端报错", e.toString()); e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } }
服务端代码主要有三个功能,防止粘包和半包现象的发生,处理客户端和服务端的编码解码问题,处理客户端的请求消息(requestMessageHandler)并将数据返回给客户端。
下面看requestMessageHandler服务端是如何处理客户端的请求消息的。
@Slf4j @ChannelHandler.Sharable public class RPCRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception { RpcResponseMessage responseMessage = new RpcResponseMessage(); responseMessage.setSequenceId(msg.getSequenceId()); try { //通过反射获取调用方法的值 Object service = ServicesFactory.getService(Class.forName(msg.getInterfaceName())); Method method = service.getClass().getMethod(msg.getMethodName(), msg.getParameterTypes()); Object invoke = method.invoke(service, msg.getParameterValue()); responseMessage.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); responseMessage.setExceptionValue(e); } ctx.writeAndFlush(responseMessage); } }
服务端在接收到客户端的消息后,经过二进制转换成RpcRequestMessage实体类,获取到客户端发送过来的类名,方法名和方法参数,方法的参数值等。通过反射获取到客户端要调用的类,并通过反射调用该类的方法获取到返回值,最后通过RpcResponseMessage这个实体类将信息返回给客户端。
public interface IMemberGroupService {
MemberGroupEntity getMemberGroupEntity(String name,String code,int id);
}
public class memberGroupServiceImpl implements IMemberGroupService {
@Override
public MemberGroupEntity getMemberGroupEntity(String name, String code, int id) {
return new MemberGroupEntity(name,code,id);
}
}
@Slf4j public class RPCClient { private static Channel channel = null; private static final Object LOCK = new Object(); public static Channel getChannel(){ if(channel!=null){ return channel; } synchronized (LOCK){ if(channel!=null){ return channel; } inintChannel(); return channel; } } public static void inintChannel() { NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,12,4,0,0)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new RPCMessageCodecSharable()); ch.pipeline().addLast(new RPCResponseMessageHandler()); } }); try { channel = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync().channel(); System.out.println("连接成功"); channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { group.shutdownGracefully(); } }); } catch (InterruptedException e) { e.printStackTrace(); } finally { } } }
客户端代码作用主要有四个,固定消息长度防止粘包和半包现象、将客户端消息编码发送到服务端将服务端返回消息反序列化到客户端、处理服务端的返回请求,获取channel通过channel向服务端发送数据。
PRC主要的实现方式是通过代理加反射方式来实现。代理的原理和实现方式参考我的上篇博客:https://blog.csdn.net/secret2316352792/article/details/126937667?spm=1001.2014.3001.5501
客户端启动的时候,需要调用服务端的服务,将服务端的服务所属的类和方法、参数等作为参数传入代理类中,代理类通过调用客户端的netty服务获取channel将所调用的类的方法发送给服务端,服务端获取到请求消息之后,通过客户端请求带过来的类、方法、方法参数、参数类型等通过反射调用服务端的方法,然后将服务端方法的返回结果写回到服务端,也就完成了RPC通信的整个过程。
@Slf4j @ChannelHandler.Sharable public class RPCResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> { // 序号 用来接收结果的 promise 对象 public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { Promise<Object> promise = PROMISES.remove(msg.getSequenceId()); if (promise != null) { Object returnValue = msg.getReturnValue(); promise.setSuccess(returnValue); } } }
服务端返回RpcResponseMessage这个相应消息给客户端,客户端接收到之后通过promise来获取到结果并存储到PROMISES这个map中。待结果返回后可以从promise中获取到服务端返回的消息。
public class ProxyRpcClient { public static <T> T getService(Class<T> serviceClass) { // public static <T> T (Class<T> t){ //类加载器 ClassLoader loader = serviceClass.getClassLoader(); //被代理的接口 Class<?>[] interfaces = new Class[]{serviceClass}; //代理类 InvocationHandlerClient invocationHandlerTest = new InvocationHandlerClient(serviceClass); Object o = Proxy.newProxyInstance(loader, interfaces, invocationHandlerTest); return (T)o ; } }
public class InvocationHandlerClient<T> implements InvocationHandler { private T object; public InvocationHandlerClient(T object) { this.object = object; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage( sequenceId, ((Class) object).getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); // 2. 将消息对象发送出去 RPCClient.getChannel().writeAndFlush(msg); // 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收结果线程 DefaultPromise<Object> promise = new DefaultPromise<>(RPCClient.getChannel().eventLoop()); RPCResponseMessageHandler.PROMISES.put(sequenceId, promise); // 4. 等待 promise 结果 promise.await(); if(promise.isSuccess()) { // 调用正常 Object now = promise.getNow(); System.out.println("######################################################"); return now; } else { // 调用失败 throw new RuntimeException(promise.cause()); } } }
代理类将客户端发送的消息(类名、方法名、方法参数、方法返回类型)封装成实体,通过netty的channel写出去,并生成一个promise用来获取服务端返回的结果。
public class RPCMessageCodecSharable extends MessageToMessageCodec<ByteBuf,Message>{ /** * 客户端向服务端发送消息的时候编码成二进制 * 服务端向客户端写出消息的时候编码成二进制 * @param ctx * @param msg * @param outList * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); // 1. 4 字节的魔数 out.writeBytes(new byte[]{1, 2, 3, 4}); // 2. 1 字节的版本, out.writeByte(1); // 3. 1 字节的序列化方式 1 代表 java的方式 out.writeByte(1); // 4. 1 字节的指令类型用于标识这个msg到底是那个类 out.writeByte(msg.getMessageType()); // 5. 4 个字节 out.writeInt(msg.getSequenceId()); // 无意义,对齐填充 out.writeByte(0xff); //java实体序列化写出 ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(msg); byte[] bytes = bos.toByteArray(); //写入长度 out.writeInt(bytes.length); //写出内容 out.writeBytes(bytes); outList.add(out); } /** * 服务端接受到客户端请求消息时反序列化实体 * 客户端接受到服务端返回消息时反序列化实体 * @param ctx * @param in * @param outList * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> outList) throws Exception { int magic = in.readInt(); byte version = in.readByte(); byte serialMethod = in.readByte(); byte messageType = in.readByte(); //用户反序列化的时候使用 int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte[] bytes = new byte[length]; //把bytebuf的读取到bytes里面,然后再反序列化 in.readBytes(bytes,0,length); Class<? extends Message> messageClass = Message.getMessageClass(messageType); Message message = deserialize(messageClass, bytes); outList.add(message); } public <T> T deserialize(Class<T> clazz, byte[] bytes) { try { ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); return (T) ois.readObject(); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("反序列化失败", e); } } }
//properties文件 cn.rpc.Message.IMemberGroupService=cn.rpc.Message.impl.memberGroupServiceImpl public class ServicesFactory { static Properties properties; static Map<Class<?>, Object> map = new ConcurrentHashMap<>(); static { try (InputStream in = Config.class.getResourceAsStream("/application.properties")) { properties = new Properties(); properties.load(in); Set<String> names = properties.stringPropertyNames(); for (String name : names) { if (name.endsWith("Service")) { Class<?> interfaceClass = Class.forName(name); Class<?> instanceClass = Class.forName(properties.getProperty(name)); Object o = instanceClass.newInstance(); map.put(interfaceClass, o); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ExceptionInInitializerError(e); } } public static <T> T getService(Class<T> interfaceClass) { return (T) map.get(interfaceClass); } public static void main(String[] args) { Object service = getService(HelloService.class); } }
@Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message { /** * 调用的接口全限定名,服务端根据它找到实现 */ private String interfaceName; /** * 调用接口中的方法名 */ private String methodName; /** * 方法返回类型 */ private Class<?> returnType; /** * 方法参数类型数组 */ private Class[] parameterTypes; /** * 方法参数值数组 */ private Object[] parameterValue; public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super.setSequenceId(sequenceId); this.interfaceName = interfaceName; this.methodName = methodName; this.returnType = returnType; this.parameterTypes = parameterTypes; this.parameterValue = parameterValue; } @Override public int getMessageType() { return RPC_MESSAGE_TYPE_REQUEST; } }
@Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { /** * 返回值 */ private Object returnValue; /** * 异常值 */ private Exception exceptionValue; @Override public int getMessageType() { return RPC_MESSAGE_TYPE_RESPONSE; } }
@Data public abstract class Message implements Serializable { /** * 根据消息类型字节,获得对应的消息 class * @param messageType 消息类型字节 * @return 消息 class */ public static Class<? extends Message> getMessageClass(int messageType) { return messageClasses.get(messageType); } private int sequenceId; private int messageType; public abstract int getMessageType(); /** * 请求类型 byte 值 */ public static final int RPC_MESSAGE_TYPE_REQUEST = 101; /** * 响应类型 byte 值 */ public static final int RPC_MESSAGE_TYPE_RESPONSE = 102; private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>(); static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
public class StartMain {
public static void main(String[] args) {
IMemberGroupService memberGroupService = ProxyRpcClient.getService(IMemberGroupService.class);
MemberGroupEntity memberGroupEntity = memberGroupService.getMemberGroupEntity("客群1", "memberGroupCode1", 1);
System.out.println(memberGroupEntity);
}
}
返回结果
######################################################
MemberGroupEntity{memberGroupName='客群1', memberGroupCode='memberGroupCode1', memberGroupId=1}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。