赞
踩
RPC 【Remote Procedure Call】是指远程过程调用,是一种进程间通信方式,是一种技术思想,而不是规范。它允许程序调用另一个地址空间(网络的另一台机器上)的过程或函数,而不用开发人员显式编码这个调用的细节。调用本地方法和调用远程方法一样。
RPC调用过程中需要将参数编组为消息进行发送,接受方需要解组消息为参数,过程处理结果同样需要经编组、解组。消息由哪些部分构成及消息的表示形式就构成了消息协议。
RPC调用过程中采用的消息协议称为RPC协议
RPC协议规定请求、响应消息的格式
在TCP(网络传输控制协议)上可选用或自定义消息协议来完成RPC消息交互
我们可以选用通用的标准协议(如:http、https),也也可根据自身的需要定义自己的消息协议。
封装好参数编组、消息解组、底层网络通信的RPC程序,可直接在其基础上只需专注与过程业务代码编码,无需再关注其调用细节
目前常见的RPC框架
Dubbo、gRPC、gRPC、Apache Thrift、RMI…等
下面将一步步来写一个精简版的RPC框架,使项目引入该框架后,通过简单的配置让项目拥有提供远程服务与调用的能力
首先服务端要停工远程服务,就必须具备服务注册及暴露的能力;在这之后还需要开启网络服务,供客户端连接。有些项目可能即使服务提供者同时又是服务消费者,那么什么时候注册暴露服务,什么时候注入消费服务呢?在这我就引入了一个RPC监听处理器的概念,就有这个处理器来完成服务的注册暴露,以及服务消费注入
哪些服务需要注册暴露这里使用自定义注解的方式来标注:@Service
/** * @Author Lijl * @AnnotationTypeName Service * @Description 被该注解标记的服务可提供远程访问的能力 * @Date 2022/2/14 14:32 * @Version 1.0 */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) @Component @Inherited public @interface Service { String value() default ""; String version() default ""; long timeout() default 0L; }
/** * @Author Lijl * @InterfaceName ServiceRegister * @Description 定义服务注册 * @Date 2022/2/15 15:14 * @Version 1.0 */ public interface ServiceRegister { void register(List<ServiceObject> so) throws Exception; ServiceObject getServiceObject(String name) throws Exception; } /** * @Author Lijl * @ClassName DefaultServiceRegister * @Description 默认服务注册 * @Date 2022/2/15 15:19 * @Version 1.0 */ public abstract class DefaultServiceRegister implements ServiceRegister{ private Map<String, ServiceObject> serviceMap = new HashMap<>(); protected String protocol; protected int port; /** * @Author lijl * @MethodName register * @Description 缓存服务持有对象 * @Date 16:10 2022/3/11 * @Version 1.0 * @param soList * @return: void **/ @Override public void register(List<ServiceObject> soList) throws Exception { if (soList==null&&soList.size()>0){ throw new IllegalAccessException("Service object information cannot be empty"); } soList.forEach(so -> this.serviceMap.put(so.getName(), so)); } /** * @Author lijl * @MethodName getServiceObject * @Description 获取服务持有对象 * @Date 16:11 2022/3/11 * @Version 1.0 * @param name * @return: com.huawei.rpc.server.register.ServiceObject **/ @Override public ServiceObject getServiceObject(String name) { return this.serviceMap.get(name); } } /** * @Author Lijl * @ClassName ZookeeperExportServiceRegister * @Description Zookeeper服务注册,提供服务注册、服务暴露 * @Date 2022/2/15 15:26 * @Version 1.0 */ public class ZookeeperExportServiceRegister extends DefaultServiceRegister { /** * zk客户端 */ private ZkClient client; public ZookeeperExportServiceRegister(String zkAddress, int port, String protocol){ this.client = new ZkClient(zkAddress); this.client.setZkSerializer(new ZookeeperSerializer()); this.port = port; this.protocol = protocol; } /** * @Author lijl * @MethodName register * @Description 缓存服务持有对象,并注册服务 * @Date 15:38 2022/2/15 * @Version 1.0 * @param soList 服务持有者集合 * @return: void **/ @Override public void register(List<ServiceObject> soList) throws Exception { super.register(soList); for (ServiceObject so : soList) { ServiceInfo serviceInfo = new ServiceInfo(); String host = InetAddress.getLocalHost().getHostAddress(); String address = host + ":" + port; serviceInfo.setAddress(address); serviceInfo.setName(so.getName()); serviceInfo.setProtocol(protocol); this.exportService(serviceInfo); } } /** * @Author lijl * @MethodName exportService * @Description 暴露服务 * @Date 15:38 2022/2/15 * @Version 1.0 * @param serviceInfo 需要暴露的服务信息 * @return: void **/ private void exportService(ServiceInfo serviceInfo) { String serviceName = serviceInfo.getName(); String uri = JSON.toJSONString(serviceInfo); try { uri = URLEncoder.encode(uri, CommonConstant.UTF_8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = CommonConstant.ZK_SERVICE_PATH + CommonConstant.PATH_DELIMITER + serviceName + CommonConstant.PATH_DELIMITER + "service"; if (!client.exists(servicePath)){ client.createPersistent(servicePath,true); } String uriPath = servicePath + CommonConstant.PATH_DELIMITER + uri; if (client.exists(uriPath)){ client.delete(uriPath); } client.createEphemeral(uriPath); } }
这个过程其实没有详说的必要,就是将指定ServiceObject对象序列化后保存到ZK上,供客户端发现。同时会将服务对象缓存起来,在客户端调用服务时,通过缓存的ServiceObject对象反射指定服务,调用方法;其实看方法上打的注释也能看明白每步都做了什么
/** * @Author Lijl * @ClassName RpcService * @Description RPC抽象服务端 * @Date 2022/2/15 16:09 * @Version 1.0 */ public abstract class RpcServer { /** * 服务端口 */ protected int port; /** * 服务协议 */ protected String protocol; /** * 请求处理者 */ protected RequestHandler handler; public RpcServer(int port, String protocol, RequestHandler handler){ super(); this.port = port; this.protocol = protocol; this.handler = handler; } /** * 开启服务 */ public abstract void start(); /** * 停止服务 */ public abstract void stop(); } /** * @Author Lijl * @ClassName NettyRpcService * @Description Netty RPC服务端,提供Netty网络服务开启、关闭,接收客户端请求及消息后的处理 * @Date 2022/2/15 16:28 * @Version 1.0 */ @Slf4j public class NettyRpcServer extends RpcServer{ private Channel channel; public NettyRpcServer(int port, String protocol, RequestHandler handler){ super(port,protocol,handler); } @Override public void start() { EventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new ChannelRequestHandler()); } }); //启动服务 ChannelFuture future = sb.bind(port).sync(); log.info("Server starteed successfully."); channel = future.channel(); //等待服务通道关闭 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //释放资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void stop() { if (this.channel!=null){ this.channel.close(); } } private class ChannelRequestHandler extends ChannelInboundHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("Channel active: {}",ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("The server receives a message: {}",msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] req = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(req); byte[] res = handler.handleRequest(req); log.info("Send response: {}",msg); ByteBuf respBuf = Unpooled.buffer(res.length); respBuf.writeBytes(res); ctx.write(respBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); log.error("Exception occurred: {}",cause.getMessage()); ctx.close(); } } } /** * @Author Lijl * @ClassName RequestHandler * @Description 请求处理器,提供解组请求,编组响应操作 * @Date 2022/2/15 16:10 * @Version 1.0 */ public class RequestHandler { private MessageProtocol protocol; private ServiceRegister serviceRegister; public RequestHandler(MessageProtocol protocol,ServiceRegister serviceRegister){ super(); this.protocol = protocol; this.serviceRegister = serviceRegister; } /** * @Author lijl * @MethodName handleRequest * @Description 处理客户端请求参数,调用本地服务 * @Date 16:26 2022/2/15 * @Version 1.0 * @param data * @return: byte[] **/ public byte[] handleRequest(byte[] data) throws Exception{ //1.解组消息 Request request = this.protocol.unmarshallingRequest(data); //2. 查找服务对象 ServiceObject so = this.serviceRegister.getServiceObject(request.getServiceName()); Response response = null; if (so==null){ response = Response.builder().status(Status.NOT_FOUND).build(); }else{ try { //3.反射调用对应的过程方法 Method method = so.getClazz().getMethod(request.getMethod(), request.getParameterTypes()); Object returnVal = method.invoke(so.getObj(), request.getParameters()); response = Response.builder() .status(Status.SUCCESS) .returnValue(returnVal) .build(); } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { response = Response.builder() .status(Status.ERROR) .exception(e) .build(); } } return this.protocol.marshallingResponse(response); } }
这段算是服务端的核心部分,控制服务段Netty网络服务的开启关闭;接收客户端发起的请求,将客户端发送的请求参数解组并查询客户端远程调用的过程业务过程接口,并通过反射调用返回调用结果
开始有提到RPC监听处理器的概念,用于服务的注册暴露与服务的消费注入,这里先说下服务开启服务注册,后面说的客户端时在补充服务注入
/** * @Author Lijl * @ClassName DefaultRpcProcessor * @Description rcp监听处理器 负责暴露服务、自动注入 * @Date 2022/2/26 20:43 * @Version 1.0 */ @Slf4j public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent>, DisposableBean { @Autowired private ClientProxyFactory clientProxyFactory; @Autowired private ServiceRegister serviceRegister; @Autowired private RpcServer rpcService; @SneakyThrows @Override public void onApplicationEvent(ContextRefreshedEvent event) { ApplicationContext applicationContext = event.getApplicationContext(); if (Objects.isNull(applicationContext.getParent())){ //开启服务 startServer(applicationContext); //注入Service injectService(applicationContext); } } /** * @Author lijl * @MethodName startServer * @Description 扫描服务注册注解,调用服务注册将服务注册到zookeeper中 * @Date 18:44 2022/2/26 * @Version 1.0 * @param applicationContext * @return: void **/ private void startServer(ApplicationContext applicationContext) throws Exception { //过滤出带有服务注册注解的实例 Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Service.class); if (beans.size()!=0){ //遍历服务组装服务注册信息 for (Object o : beans.values()) { List<ServiceObject> soList = new ArrayList<>(); Class<?> clazz = o.getClass(); Service service = clazz.getAnnotation(Service.class); String version = service.version(); Class<?>[] interfaces = clazz.getInterfaces(); if (interfaces.length>1){ //相同接口存在不同版本号,则分别注册 for (Class<?> aClass : interfaces) { String aClassName = aClass.getName(); if (StringUtils.hasLength(version)){ aClassName +=":"+version; } soList.add(new ServiceObject(aClassName,aClass,o)); } }else{ Class<?> superClass = interfaces[0]; String aClassName = superClass.getName(); if (StringUtils.hasLength(version)){ aClassName +=":"+version; } soList.add(new ServiceObject(aClassName, superClass, o)); } //调用服务注册 this.serviceRegister.register(soList); } rpcService.start(); } } /** * @Author lijl * @MethodName injectService * @Description 注入远程调用服务 * @Date 19:20 2022/2/26 * @Version 1.0 * @param applicationContext * @return: void **/ private void injectService(ApplicationContext applicationContext) { //... } /** * @Author lijl * @MethodName destroy * @Description 关闭服务 * @Date 19:35 2022/2/26 * @Version 1.0 * @param * @return: void **/ @Override public void destroy() { log.info("The application will stop and the zookeeper connection will be closed"); rpcService.stop(); } }
DefaultRpcProcessor实现了ApplicationListener,并监听了ContextRefreshedEvent事件,在Spring启动完毕过后会收到一个事件通知,基于这个机制,就可以在这里开启服务,以及注入服务。
客户端想要调用远程服务,必须具备服务发现的能力;在知道有哪些服务后,还必须有服务代理来执行服务调用;客户端想要与服务端通信,必须要有相同的消息协议与网络请求的能力,即网络层功能。
/** * @Author Lijl * @InterfaceName IDiscovererService * @Description 服务发现,定义服务发现规范 * @Date 2022/2/14 14:29 * @Version 1.0 */ public interface IDiscovererService { List<ServiceInfo> getService(String name); } /** * @Author Lijl * @ClassName ZookeeperServiceDiscoverer * @Description 服务发现,定义以zookeeper为注册中心的服务发现 * @Date 2022/2/14 16:00 * @Version 1.0 */ public class ZookeeperServiceDiscoverer implements IDiscovererService{ private ZkClient zkClient; public ZookeeperServiceDiscoverer(String zkAddress){ zkClient = new ZkClient(zkAddress); zkClient.setZkSerializer(new ZookeeperSerializer()); } /** * @Author lijl * @MethodName getService * @Description 使用Zookeeper客户端,通过服务名获取服务列表 * @Date 17:07 2022/3/11 * @Version 1.0 * @param name 服务名->接口全路径 * @return: java.util.List<com.huawei.rpc.common.info.ServiceInfo> 服务列表 **/ @Override public List<ServiceInfo> getService(String name) { // /simple-rpc/接口全路径/service //从zk中获取已注册的服务 String servicePath = CommonConstant.ZK_SERVICE_PATH+CommonConstant.PATH_DELIMITER+name+CommonConstant.PATH_DELIMITER+"service"; List<String> children = zkClient.getChildren(servicePath); return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> { String deCh = null; try { deCh = URLDecoder.decode(str,CommonConstant.UTF_8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return JSON.parseObject(deCh, ServiceInfo.class); }).collect(Collectors.toList()); } }
服务端是将服务注册进了Zookeeper中,所以服务发现者也使用Zookeeper来实现,通过ZkClient我们很容易发现已经注册在ZK上的服务。当然我们也可以使用其他组件作为注册中心,例如Redis
/** * @Author Lijl * @InterfaceName NetWorkClient * @Description 网络请求客户端,定义网络请求规范 * @Date 2022/2/14 16:34 * @Version 1.0 */ public interface NetWorkClient { byte[] sendRequest(byte[] data, ServiceInfo serviceInfo) throws InterruptedException; } /** * @Author Lijl * @ClassName NettyNetWorkClient * @Description Netty网络请求客户端,定义通过Netty实现网络请求 * @Date 2022/2/14 16:35 * @Version 1.0 */ @Slf4j public class NettyNetWorkClient implements NetWorkClient{ /** * @Author lijl * @MethodName sendRequest * @Description 发送请求 * @Date 16:36 2022/2/14 * @Version 1.0 * @param data 请求数据 * @param serviceInfo 服务信息 * @return: byte[] 响应数据 **/ @Override public byte[] sendRequest(byte[] data, ServiceInfo serviceInfo) throws InterruptedException { String[] addrInfoArray = serviceInfo.getAddress().split(":"); String serverAddress = addrInfoArray[0]; String serverPort = addrInfoArray[1]; SendHandler sendHandler = new SendHandler(data); byte[] respData; NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(sendHandler); } }); bootstrap.connect(serverAddress,Integer.parseInt(serverPort)).sync(); respData = (byte[]) sendHandler.rspData(); log.info("SendRequest get reply: {}",respData); } finally { //释放线程组资源 group.shutdownGracefully(); } return respData; } } /** * @Author Lijl * @ClassName SendHandler * @Description 发送处理类,定义Netty入站处理 * @Date 2022/2/14 16:53 * @Version 1.0 */ @Slf4j public class SendHandler extends ChannelInboundHandlerAdapter { private CountDownLatch cdl; private Object readMsg = null; private byte[] data; public SendHandler(byte[] data){ this.cdl = new CountDownLatch(1); this.data = data; } /** * @Author lijl * @MethodName channelActive * @Description 当连接服务段成功后,发送请求数据 * @Date 16:56 2022/2/14 * @Version 1.0 * @param ctx 通道上下文 * @return: void **/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("Successful connection to server: {}",ctx); ByteBuf buffer = Unpooled.buffer(data.length); buffer.writeBytes(data); log.info("Client sends message: {}",buffer); ctx.writeAndFlush(buffer); } /** * @Author lijl * @MethodName channelRead * @Description 读取数据,数据读取完毕释放CD锁 * @Date 16:59 2022/2/14 * @Version 1.0 * @param ctx 上下文 * @param msg 字节流 * @return: void **/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("Client reads message: {}",msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(resp); readMsg = resp; cdl.countDown(); } /** * @Author lijl * @MethodName rspData * @Description 等待读取数据完成 * @Date 17:02 2022/2/14 * @Version 1.0 * @param * @return: java.lang.Object **/ public Object rspData() throws InterruptedException { cdl.await(); return readMsg; } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); log.error("Exception occurred: {}",cause.getMessage()); ctx.close(); } }
这部分前面讲的服务端请求处理类似,连接服务端(Netty),将请求参数编组后发送到服务端,待服务端接收处理后返回,再这里将服务端返回的结果进行解组
/** * @Author Lijl * @ClassName ClientProxyFactory * @Description 客户端代理工厂:用于创建远程服务代理类;封装编组请求、请求发送、编组响应等操作 * @Date 2022/2/14 17:13 * @Version 1.0 */ public class ClientProxyFactory { @Getter @Setter private IDiscovererService iDiscovererService; @Getter @Setter private Map<String, MessageProtocol> supportMessageProtocols; @Getter @Setter private NetWorkClient netWorkClient; /** * @Author lijl * @MethodName getProxy * @Description 通过Java动态代理获取服务代理类 * @Date 14:14 2022/2/15 * @Version 1.0 * @param clazz 被代理类Class * @return: T 服务代理类 **/ public <T> T getProxy(Class<T> clazz,String version){ return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, new ClientInvocationHandler(clazz,version)); } /** * @Author Lijl * @ClassName ClientProxyFactory * @Description 客户端服务代理类invoke函数细节实现 * @Date 2022/2/15 14:31 * @Version 1.0 */ private class ClientInvocationHandler implements InvocationHandler { private Class<?> aClass; private String version; public ClientInvocationHandler(Class<?> aClass,String version){ super(); this.aClass = aClass; this.version = version; } private Random random = new Random(); @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if ("toString".equals(methodName)){ return proxy.getClass().toString(); } if (("hashCode").equals(methodName)){ return 0; } //1、获取服务信息 String serviceName = this.aClass.getName(); if (StringUtils.hasLength(this.version)){ serviceName += ":"+version; } List<ServiceInfo> serviceInfoList = iDiscovererService.getService(serviceName); if (serviceInfoList==null||serviceInfoList.isEmpty()){ throw new ClassCastException("No provider available"); } //多个服务的话随机获取一个 ServiceInfo serviceInfo = serviceInfoList.get(random.nextInt(serviceInfoList.size())); //2、构造request对象 Request request = new Request(); request.setServiceName(serviceInfo.getName()); request.setMethod(methodName); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); //3、请求协议编组 //获取该方法对应的协议 MessageProtocol messageProtocol = supportMessageProtocols.get(serviceInfo.getProtocol()); //编组请求 byte[] data = messageProtocol.marshallingRequest(request); //4、条用网络层发送请求 byte[] requestData = netWorkClient.sendRequest(data, serviceInfo); //5、解组响应信息 Response response = messageProtocol.unmarshallingResponse(requestData); // 6、处理结果 if (response.getException()!=null){ throw response.getException(); } return response.getReturnValue(); } } }
服务代理类由客户端代理工厂类产生,代理方式是基于Java的动态代理。在处理类ClientInvocationHandler的invoke函数中,定义了一系列的操作,包括获取服务、选择服务提供者、构造请求对象、编组请求对象、网络请求客户端发送请求、解组响应消息、异常处理等
/** * @Author Lijl * @AnnotationTypeName Reference * @Description 用于注入远程服务 * @Date 2022/2/14 14:37 * @Version 1.0 */ @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.FIELD}) @Inherited public @interface Reference { String value() default ""; String version() default ""; long timeout() default 0L; }
/** * @Author lijl * @MethodName injectService * @Description 注入远程调用服务 * @Date 16:52 2022/3/11 * @Version 1.0 * @param applicationContext * @return: void **/ private void injectService(ApplicationContext applicationContext) { String[] beanNames = applicationContext.getBeanDefinitionNames(); for (String beanName : beanNames) { Class<?> clazz = applicationContext.getType(beanName); if (Objects.isNull(clazz)){ continue; } Field[] fields = clazz.getDeclaredFields(); for (Field field : fields) { Reference reference = field.getAnnotation(Reference.class); if (Objects.isNull(reference)){ continue; } Class<?> fieldClass = field.getType(); Object bean = applicationContext.getBean(beanName); field.setAccessible(true); String version = reference.version(); try { field.set(bean,clientProxyFactory.getProxy(fieldClass,version)); } catch (IllegalAccessException e) { e.printStackTrace(); } } } }
/** * @Author Lijl * @ClassName MessageProtocol * @Description 消息协议,定义编组请求、解组请求、编组响应、解组响应规范 * @Date 2022/2/14 17:26 * @Version 1.0 */ public interface MessageProtocol { /** * @Author lijl * @MethodName marshallingRequest * @Description 编组请求 * @Date 13:44 2022/2/15 * @Version 1.0 * @param request 请求信息 * @return: byte[] 请求字节 **/ byte[] marshallingRequest(Request request) throws Exception; /** * @Author lijl * @MethodName unmarshallingRequest * @Description 解组请求 * @Date 13:45 2022/2/15 * @Version 1.0 * @param data 请求字节 * @return: com.huawei.rpc.common.prorocol.Request 请求信息 **/ Request unmarshallingRequest(byte[] data) throws Exception; /** * @Author lijl * @MethodName marshallingResponse * @Description 编组响应 * @Date 13:46 2022/2/15 * @Version 1.0 * @param response 响应信息 * @return: byte[] 响应字节 **/ byte[] marshallingResponse(Response response) throws Exception; /** * @Author lijl * @MethodName unmarshallingResponse * @Description 解组响应 * @Date 13:47 2022/2/15 * @Version 1.0 * @param data 响应字节 * @return: com.huawei.rpc.common.prorocol.Response 响应信息 **/ Response unmarshallingResponse(byte[] data) throws Exception; } /** * @Author Lijl * @ClassName HessianSerializeMessageProrocol * @Description Hessian 序列化信息 * @Date 2022/2/17 15:16 * @Version 1.0 */ public class HessianSerializeMessageProrocol implements MessageProtocol{ /** * @Author lijl * @MethodName serialize * @Description Hessian实现序列化 * @Date 15:23 2022/2/17 * @Version 1.0 * @param object * @return: byte[] **/ public byte[] serialize(Object object) throws IOException { ByteArrayOutputStream byteArrayOutputStream = null; HessianOutput hessianOutput = null; try { byteArrayOutputStream = new ByteArrayOutputStream(); // Hessian的序列化输出 hessianOutput = new HessianOutput(byteArrayOutputStream); hessianOutput.writeObject(object); return byteArrayOutputStream.toByteArray(); } catch (IOException e) { throw e; } finally { try { if (byteArrayOutputStream!=null){ byteArrayOutputStream.close(); } } catch (IOException e) { throw e; } try { if (hessianOutput!=null){ hessianOutput.close(); } } catch (IOException e) { throw e; } } } /** * @Author lijl * @MethodName deserialize * @Description Hessian实现反序列化 * @Date 15:29 2022/2/17 * @Version 1.0 * @param employeeArray * @return: java.lang.Object **/ public Object deserialize(byte[] employeeArray) throws Exception { ByteArrayInputStream byteArrayInputStream = null; HessianInput hessianInput = null; try { byteArrayInputStream = new ByteArrayInputStream(employeeArray); // Hessian的反序列化读取对象 hessianInput = new HessianInput(byteArrayInputStream); return hessianInput.readObject(); } catch (IOException e) { throw e; } finally { try { if (byteArrayInputStream!=null){ byteArrayInputStream.close(); } } catch (IOException e) { throw e; } try { if (hessianInput!=null){ hessianInput.close(); } } catch (Exception e) { throw e; } } } @Override public byte[] marshallingRequest(Request request) throws Exception { return serialize(request); } @Override public Request unmarshallingRequest(byte[] data) throws Exception { return (Request) deserialize(data); } @Override public byte[] marshallingResponse(Response response) throws Exception { return serialize(response); } @Override public Response unmarshallingResponse(byte[] data) throws Exception { return (Response) deserialize(data); } } /** * @Author Lijl * @ClassName JavaSerializeMessageProtocol * @Description java序列化信息 * @Date 2022/2/15 13:52 * @Version 1.0 */ public class JavaSerializeMessageProtocol implements MessageProtocol{ private byte[] serialize(Object obj) throws Exception{ try(ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos);) { oos.writeObject(obj); return baos.toByteArray(); } } private Object deserialize(byte[] data)throws Exception{ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) { return ois.readObject(); } } @Override public byte[] marshallingRequest(Request request) throws Exception { return this.serialize(request); } @Override public Request unmarshallingRequest(byte[] data) throws Exception { return (Request) this.deserialize(data); } @Override public byte[] marshallingResponse(Response response) throws Exception { return this.serialize(response); } @Override public Response unmarshallingResponse(byte[] data) throws Exception { return (Response) this.deserialize(data); } }
消息协议主要是定义了客户端如何编组请求、解组响应,服务端如何解组请求、编组响应这四个操作规范。本文提供了Hessian、Java序列化与反序列化的实现,感兴趣的可以加入其他序列化技术,例如:kryo、protostuff等。
这里是spring自动装配的一些配置很简单,就不做赘述了,不了解的可看这篇手写spring-boot-starter
/** * @Author Lijl * @ClassName AutoConfiguration * @Description 自动装配 * @Date 2022/2/16 9:25 * @Version 1.0 */ @Configuration @EnableConfigurationProperties({ProtocolProperties.class,RegistryProperties.class}) public class AutoConfiguration { private ProtocolProperties protocolProperties; private RegistryProperties registryProperties; public AutoConfiguration(ProtocolProperties protocolProperties,RegistryProperties registryProperties){ this.protocolProperties = protocolProperties; this.registryProperties = registryProperties; } @Bean public DefaultRpcProcessor defaultRpcProcessor(){ return new DefaultRpcProcessor(); } /** * @Author lijl * @MethodName clientProxyFactory * @Description 注册客户端动态代理 * @Date 20:50 2022/2/26 * @Version 1.0 * @param * @return: com.huawei.rpc.client.ClientProxyFactory **/ @Bean public ClientProxyFactory clientProxyFactory(){ ClientProxyFactory clientProxyFactory = new ClientProxyFactory(); //设置服务发现者 String address = registryProperties.getAddress(); int port = registryProperties.getPort(); clientProxyFactory.setIDiscovererService(new ZookeeperServiceDiscoverer(address+":"+port)); //设置支持的协议 Map<String, MessageProtocol> supportMessageProtocols = new HashMap<>(16); supportMessageProtocols.put(protocolProperties.getName(), protocolProperties.getMessageProtocol()); clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols); //设置网络层实现 clientProxyFactory.setNetWorkClient(new NettyNetWorkClient()); return clientProxyFactory; } /** * @Author lijiale * @MethodName serviceRegister * @Description 服务注册 * @Date 20:52 2022/2/26 * @Version 1.0 * @param * @return: com.huawei.rpc.server.register.ServiceRegister **/ @Bean public ServiceRegister serviceRegister(){ String address = registryProperties.getAddress(); int port = registryProperties.getPort(); return new ZookeeperExportServiceRegister(address+":"+port, protocolProperties.getPort(), protocolProperties.getName()); } @Bean public RequestHandler requestHandler(ServiceRegister serviceRegister){ return new RequestHandler(protocolProperties.getMessageProtocol(), serviceRegister); } @Bean public RpcServer rpcServer(RequestHandler requestHandler){ return new NettyRpcServer(protocolProperties.getPort(), protocolProperties.getName(), requestHandler); } }
因为我这是直接封装成starter了,所以使用起来非常简单,只需要准备一个zookeeper与简单的五步配置编写,当然创建服务端与客户端程序除外
分别再创建好的服务端工程与客户端工程中引入依赖
<dependency>
<groupId>com.huawei</groupId>
<artifactId>simple-rpc-spring-boot-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
服务端配置zookeeper地址与协议端口
spring:
rpc:
registry:
address: 127.0.0.1
port: 2181
protocol:
port: 12138
客户端配置zookeeper地址与协议端口
spring:
rpc:
registry:
address: 127.0.0.1
port: 2181
protocol:
port: 12138
将你的远程服务使用@Service注解,例如:
在需要暴露的服务上使用@Service注解申明,注意该注解是自定义注解
/** * @Author Lijl * @ClassName TestServiceImpl * @Description 测试 * @Date 2022/2/16 13:52 * @Version 1.0 */ @Service(version = "1.0") public class TestServiceImpl implements TestService, ITestService { @Override public TestDto getTestData() { System.out.println("远程调用."); TestDto testDto = new TestDto(); testDto.setId(1); testDto.setData("测试远程调用"); return testDto; } @Override public TestDto getTestDtoData() { System.out.println("远程调用....."); TestDto testDto = new TestDto(); testDto.setId(2); testDto.setData("测试远程调用"); return testDto; } }
使用@Reference注解注入远程服务
/** * @Author Lijl * @ClassName TestController * @Description 测试 * @Date 2022/2/16 14:07 * @Version 1.0 */ @RestController public class TestController { @Reference(version = "1.0") private TestService testService; @Reference(version = "1.0") private ITestService iTestService; @PostMapping(value = "/test") public void test(){ TestDto testData = this.testService.getTestData(); TestDto testDtoData = this.iTestService.getTestDtoData(); System.out.println(testData.toString()+"------"+testDtoData.toString()); } }
主要用于理解RPC调用原理,功能写的比较简单,待完善的地方还有很多,例如调用超时、负载均衡,容错等…,拉的有feature分支,感兴趣的小伙伴可在上补充修改自己所想到的内容
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。