赞
踩
RPC:Remote Procedure Call,即远程过程调用。远程过程调用是一种像调用本地过程一样调用远程机器上的过程,而不需要了解网络细节的远程过程访问支持机制。由于基于套接字的开发方式,因此带来了开发繁琐和排错困难的问题。
项目地址:https://github.com/shao12138/MyRPC
项目架构:
项目描述:
- 实现轻量级RPC框架,使得客户端可以通过网络从远程服务端程序上请求服务。
- 注册中心部分使用Redis实现注册、订阅功能。
- 在客户端实现了基于一致性哈希算法的负载均衡。
- 动态代理部分使用JDK动态代理。
- 网络传输部分使用Http协议进行传输。
项目流程/项目难点:
理解RPC的概念和协议:要成功开发RPC,需要了解RPC的基本概念和协议。还可以使用现有的RPC框架,如gRPC或Thrift,以获得有关RPC协议的更多信息。
定义服务接口:在开发RPC服务之前,需要定义RPC服务的接口。这可能涉及到定义函数的输入和输出参数,以及服务可以执行的操作。为了使服务具有可伸缩性,需要设计合理的接口,确保它们能够适应未来的需求。
服务注册与发现:远程调用需要知道被调用的服务的位置,需要具备服务注册与发现的能力,如ZooKeeper、Eureka等。
实现RPC客户端和服务器:在RPC客户端和服务器之间建立通信是一项重要任务。需要选择适当的通信协议(如TCP或HTTP),并编写代码以确保客户端和服务器之间的稳定通信。还需要实现序列化和反序列化机制,以便数据可以在客户端和服务器之间传输。
处理并发和线程安全性:RPC服务可能会处理多个请求,因此需要确保代码能够正确地处理并发请求。还需要考虑线程安全性,以确保多个线程可以同时访问您的代码而不会导致竞态条件或死锁。
安全机制:远程调用需要具备安全机制,如身份认证、授权机制等,保障数据传输的安全。
负载均衡:在集群环境下,远程调用需要具备负载均衡的能力,根据不同的负载均衡策略将请求分发到不同的节点上。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.nudt</groupId> <artifactId>MyRPC</artifactId> <version>1.0-SNAPSHOT</version> <packaging>pom</packaging> <modules> <module>Common</module> <module>CodeC</module> <module>Proto</module> <module>Transport</module> <module>Server</module> <module>Client</module> </modules> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencyManagement> <!--子模块可能用到的--> <dependencies> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-servlet</artifactId> <version>9.2.28.v20190418</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.9</version> </dependency> </dependencies> </dependencyManagement> <dependencies> <!--所有子模块也会被添加--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.10</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-archetype-plugin</artifactId> <version>3.2.0</version> </plugin> </plugins> </build> </project>
Peer:网络传输的一个端点
package org.nudt; import lombok.AllArgsConstructor; import lombok.Data; //表示网络传输的一个端点 @Data //set,get @AllArgsConstructor //所有参数的构造器 public class Peer { //地址 private String host; //端口号 private int port; }ServiceDescriptor
package org.nudt; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.lang.reflect.Method; import java.util.Arrays; //表示哦服务 @Data //set,get @AllArgsConstructor //所有参数的构造器 @NoArgsConstructor //默认构造器 public class ServiceDescriptor { //类 private String clazz; //方法 private String method; //返回类型 private String returnType; //参数类型 private String[] parameterTypes; public static ServiceDescriptor from(Class clazz, Method method) { ServiceDescriptor sdp = new ServiceDescriptor(); sdp.setClazz(clazz.getName()); sdp.setMethod(method.getName()); sdp.setReturnType(method.getReturnType().getTypeName()); Class[] parameterClasses = method.getParameterTypes(); String[] parameterTypes = new String[parameterClasses.length]; for (int i = 0; i < parameterClasses.length; i++) { parameterTypes[i] = parameterClasses[i].getName(); } sdp.setParameterTypes(parameterTypes); return sdp; } public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } ServiceDescriptor that = (ServiceDescriptor) o; return this.toString().equals(that.toString()); } public int hashCode() { return toString().hashCode(); } public String toString() { return "ServiceDescriptor{" + "clazz='" + clazz + '\'' + ", method='" + method + '\'' + ", returnType='" + returnType + '\'' + ", parameterTypes=" + Arrays.toString(parameterTypes) + '}'; } }Request&Response
package org.nudt; import lombok.Data; @Data //set,get public class Request { //请求服务 private ServiceDescriptor service; //请求参数 private Object[] parameter; }
package org.nudt; import lombok.Data; //表示RPC的响应 @Data //set,get public class Response { //服务返回信息:0成功,非0失败 private int code = 0; //具体错误信息 private String message = "ok"; //返回数据 private Object data; }
ReflectionUtils
package org.nudt; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; //反射工具类 public class ReflectionUtils { //根据Class创建对象 public static <T> T newInstance(Class clazz) { try { return (T) clazz.newInstance(); } catch (Exception e) { throw new IllegalStateException(e); } } //获取某个类所有的公共方法 public static Method[] getPublicMethods(Class clazz) { Method[] methods = clazz.getDeclaredMethods(); List<Method> pmethods = new ArrayList<>(); for (Method m : methods) { if (Modifier.isPublic(m.getModifiers())) { pmethods.add(m); } } return pmethods.toArray(new Method[0]); } //调用某个对象的指定方法 public static Object invoke(Object obj, Method method, Object... args) { try { return method.invoke(obj, args); } catch (Exception e) { throw new IllegalStateException(e); } } }
引入fastjson
<dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.9</version> </dependency> </dependencies>Decoder&Encoder
package org.nudt; //反序列化 public interface Decoder { <T> T decode(byte[] bytes, Class<T> clazz); }
package org.nudt; //序列化 public interface Encoder { byte[] encode(Object obj); }JSONEncoder&JSONDecoder
package org.nudt; import com.alibaba.fastjson.JSON; //基于JSON的序列化实现 public class JSONEncoder implements Encoder { public byte[] encode(Object obj) { return JSON.toJSONBytes(obj); } }
package org.nudt; import com.alibaba.fastjson2.JSON; //基于JSON的反序列化 public class JSONDecoder implements Decoder { public <T> T decode(byte[] bytes, Class<T> clazz) { return JSON.parseObject(bytes, clazz); } }
引入依赖
<dependencies> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-servlet</artifactId> <version>9.2.28.v20190418</version> </dependency> <dependency> <groupId>org.nudt</groupId> <artifactId>Proto</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>TransportClient&TranspotServer
package org.nudt; import java.io.InputStream; /* * 创建连接 * 发送数据,并且等待响应 * 关闭连接*/ public interface TransportClient { void connect(Peer peer); InputStream write(InputStream data); void close(); }
package org.nudt; /*启动,监听 * 接受请求 * 关闭监听*/ public interface TransportServer { void init(int prot, RequestHandler handler); void start(); void stop(); }RequestHandler
package org.nudt; import java.io.InputStream; import java.io.OutputStream; //处理网络请求的handler public interface RequestHandler { void onRequest(InputStream revice, OutputStream toResponse); }
HttpTransportClient
package org.nudt; import org.apache.commons.io.IOUtils; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; public class HTTPTransportClient implements TransportClient { private String url; public void connect(Peer peer) { this.url = "http://" + peer.getHost() + ":" + peer.getPort(); } public InputStream write(InputStream data) { try { HttpURLConnection httpConn = (HttpURLConnection) new URL(url).openConnection(); httpConn.setDoInput(true); httpConn.setDoOutput(true); httpConn.setUseCaches(false); httpConn.setRequestMethod("POST"); httpConn.connect(); IOUtils.copy(data, httpConn.getOutputStream()); int resultCode = httpConn.getResponseCode(); if (resultCode == HttpURLConnection.HTTP_OK) { return httpConn.getInputStream(); } else { return httpConn.getErrorStream(); } } catch (IOException e) { throw new IllegalStateException(e); } } public void close() { } }
HttpTransportServer
package org.nudt; import lombok.extern.slf4j.Slf4j; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @Slf4j public class HttpTransportServer implements TransportServer { private RequestHandler handler; private Server server; public void init(int prot, RequestHandler handler) { this.handler = handler; this.server = new Server(prot); //Servlet接收请求 ServletContextHandler ctx = new ServletContextHandler(); server.setHandler(ctx); ServletHolder holder = new ServletHolder(new RequestServlet()); ctx.addServlet(holder, "/*"); } public void start() { try { server.start(); server.join(); } catch (Exception e) { throw new RuntimeException(e); } } public void stop() { try { server.start(); } catch (Exception e) { throw new RuntimeException(e); } } class RequestServlet extends HttpServlet { protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { log.info("client connect"); InputStream in = req.getInputStream(); OutputStream out = resp.getOutputStream(); if (handler != null) { handler.onRequest(in, out); } out.flush(); } } }
引入依赖
<dependencies> <dependency> <groupId>org.nudt</groupId> <artifactId>Proto</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.nudt</groupId> <artifactId>CodeC</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.nudt</groupId> <artifactId>Common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> <dependency> <groupId>org.nudt</groupId> <artifactId>Transport</artifactId> <version>1.0-SNAPSHOT</version> <scope>compile</scope> </dependency> </dependencies>RpcServerConfig
package org.nudt; import lombok.Data; //server配置 @Data public class RpcServerConfig { private Class<? extends TransportServer> transportClass = HttpTransportServer.class; private Class<? extends Encoder> encoderClass = JSONEncoder.class; private Class<? extends Decoder> decoderClass = JSONDecoder.class; private int port = 3000; }RpcServer
package org.nudt; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @Slf4j public class RpcServer { private RpcServerConfig config; private TransportServer net; private Encoder encoder; private Decoder decoder; private ServiceManager serviceManager; private ServiceInvoker serviceInvoker; public RpcServer(RpcServerConfig config) { this.config = config; //net this.net = ReflectionUtils.newInstance(config.getTransportClass()); this.net.init(config.getPort(), this.handler); //序列化 this.encoder = ReflectionUtils.newInstance(config.getEncoderClass()); //反序列化 this.decoder = ReflectionUtils.newInstance(config.getDecoderClass()); //service this.serviceManager = new ServiceManager(); this.serviceInvoker = new ServiceInvoker(); } public <T> void register(Class<T> interfaceClass, T bean) { serviceManager.register(interfaceClass, bean); } public void start() { this.net.start(); } public void stop() { this.net.stop(); } private RequestHandler handler = new RequestHandler() { public void onRequest(InputStream revice, OutputStream toResponse) { Response resp = new Response(); try { byte[] inBytes = IOUtils.readFully(revice, revice.available()); Request request = decoder.decode(inBytes, Request.class); log.info("get request:{}", request); ServiceInstance sis = serviceManager.lookup(request); Object ret = serviceInvoker.invoke(sis, request); resp.setData(ret); } catch (Exception e) { resp.setData(1); resp.setMessage("RpcServer got error:" + e.getClass() + " : " + e.getMessage()); } finally { byte[] outBytes = encoder.encode(resp); try { toResponse.write(outBytes); } catch (IOException e) { log.warn(e.getMessage(), e); } } } }; }
ServiceInstance
package org.nudt; import lombok.AllArgsConstructor; import lombok.Data; import java.lang.reflect.Method; //表示一个具体服务 @Data @AllArgsConstructor public class ServiceInstance { private Object target; private Method method; }ServiceInvoker
package org.nudt; //调用具体服务 public class ServiceInvoker { public Object invoke(ServiceInstance service, Request request) { return ReflectionUtils.invoke(service.getTarget(),service.getMethod(),request.getParameter()); } }ServiceManager
package org.nudt; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; //管理RPC暴露的服务 @Slf4j public class ServiceManager { private Map<ServiceDescriptor, ServiceInstance> services; public ServiceManager() { this.services = new ConcurrentHashMap<>(); } //注册:扫描接口所有方法 public <T> void register(Class<T> interfaceClass, T bean) { Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass); for (Method method : methods) { ServiceInstance sis = new ServiceInstance(bean, method); ServiceDescriptor sdp = ServiceDescriptor.from(interfaceClass, method); services.put(sdp, sis); log.info("有服务被注册了:{} {}", sdp.getClazz(), sdp.getMethod()); } } //查找服务 public ServiceInstance lookup(Request request){ ServiceDescriptor sdp = request.getService(); return services.get(sdp); } }
引入配置
<dependencies> <dependency> <groupId>org.nudt</groupId> <artifactId>Proto</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.nudt</groupId> <artifactId>CodeC</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.nudt</groupId> <artifactId>Common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.nudt</groupId> <artifactId>Transport</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> </dependencies>RpcClientConfig
package org.nudt; import lombok.Data; import java.util.Arrays; import java.util.List; @Data public class RpcClientConfig { private Class<? extends TransportClient> transportClass = HTTPTransportClient.class; private Class<? extends Encoder> encoderClass = JSONEncoder.class; private Class<? extends Decoder> decoderClass = JSONDecoder.class; private Class<? extends TransportSelector> selectorClass = RandomTransportSelector.class; private int connectCount = 1; private List<Peer> servers = Arrays.asList(new Peer("127.0.0.1", 3000)); }TransportSelector
package org.nudt; import java.util.List; //选择哪个Server去连接 public interface TransportSelector { /*初始化 * peers:可以连接的server端点信息 * count:client与server建立多少个连接 * clazz:client实现class*/ void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz); //选择一个transport与Server做交互 TransportClient select(); //释放client void release(TransportClient client); //关闭client void close(); }RandomTransportSelector
package org.nudt; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.Random; @Slf4j public class RandomTransportSelector implements TransportSelector { //已经连接好的client private List<TransportClient> clients; public RandomTransportSelector() { clients = new ArrayList<>(); } public synchronized void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) { count = Math.max(count, 1); for (Peer peer : peers) { for (int i = 0; i < count; i++) { TransportClient client = ReflectionUtils.newInstance(clazz); client.connect(peer); clients.add(client); } log.info("connect server:{}", peer); } } public synchronized TransportClient select() { int i = new Random().nextInt(clients.size()); return clients.remove(i); } public synchronized void release(TransportClient client) { clients.add(client); } public synchronized void close() { for (TransportClient cliet : clients) { cliet.close(); } clients.clear(); } }
RpcClient
package org.nudt; import java.lang.reflect.Proxy; public class RpcClient { private RpcClientConfig config; private Encoder encoder; private Decoder decoder; private TransportSelector selector; public RpcClient() { this(new RpcClientConfig()); } public RpcClient(RpcClientConfig config) { this.config = config; this.encoder = ReflectionUtils.newInstance(this.config.getEncoderClass()); this.decoder = ReflectionUtils.newInstance(this.config.getDecoderClass()); this.selector = ReflectionUtils.newInstance(this.config.getSelectorClass()); this.selector.init(this.config.getServers(), this.config.getConnectCount(), this.config.getTransportClass()); } //获取接口代理对象 public <T> T getProxy(Class<T> clazz) { return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new RemoteInvoker(clazz, encoder, decoder, selector)); } }RemoteInvoker
package org.nudt; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @Slf4j //调用远程服务代理类 public class RemoteInvoker implements InvocationHandler { private Class clazz; private Encoder encoder; private Decoder decoder; private TransportSelector selector; public RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder, TransportSelector selector) { this.clazz = clazz; this.encoder = encoder; this.decoder = decoder; this.selector = selector; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Request request = new Request(); request.setService(ServiceDescriptor.from(clazz, method)); request.setParameter(args); Response response = invokeRemote(request); if (response == null || response.getCode() != 0) { throw new IllegalStateException("fail to invoke remote:" + response); } return response.getData(); } private Response invokeRemote(Request request) { Response resp = null; TransportClient client = null; try { client = selector.select(); byte[] outBytes = encoder.encode(request); InputStream revice = client.write(new ByteArrayInputStream(outBytes)); byte[] inBytes = IOUtils.readFully(revice, revice.available()); resp = decoder.decode(inBytes, Response.class); } catch (Exception e) { log.warn(e.getMessage(), e); resp = new Response(); resp.setCode(1); resp.setMessage("RpcClient got error:" + e.getClass() + ":" + e.getMessage()); } finally { if (client != null) { selector.release(client); } } return resp; } }
CalcService
package org.nudt; public interface CalcService { int add(int a, int b); int minus(int a, int b); }CalcServiceImpl
package org.nudt; public class CalcServiceImpl implements CalcService { public int add(int a, int b) { return a + b; } @Override public int minus(int a, int b) { return a - b; } }Client
package org.nudt; public class Client { public static void main(String[] args) { RpcClient client = new RpcClient(); CalcService service = client.getProxy(CalcService.class); int r1 = service.add(1, 2); int r2 = service.minus(1, 2); System.out.println(r1 + " " + r2); } }Server
package org.nudt; public class Server { public static void main(String[] args) { RpcServer server = new RpcServer(new RpcServerConfig()); server.register(CalcService.class, new CalcServiceImpl()); server.start(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。