当前位置:   article > 正文

核心项目:MyRPC——基于Java语言实现的RPC框架_java rpc项目

java rpc项目

1,项目概述

RPC:Remote Procedure Call,即远程过程调用。远程过程调用是一种像调用本地过程一样调用远程机器上的过程,而不需要了解网络细节的远程过程访问支持机制。由于基于套接字的开发方式,因此带来了开发繁琐和排错困难的问题。

项目地址:https://github.com/shao12138/MyRPC

项目架构:

项目描述:

  • 实现轻量级RPC框架,使得客户端可以通过网络从远程服务端程序上请求服务。
  • 注册中心部分使用Redis实现注册、订阅功能。
  • 在客户端实现了基于一致性哈希算法的负载均衡。
  • 动态代理部分使用JDK动态代理。
  • 网络传输部分使用Http协议进行传输。

项目流程/项目难点:

  • 理解RPC的概念和协议:要成功开发RPC,需要了解RPC的基本概念和协议。还可以使用现有的RPC框架,如gRPC或Thrift,以获得有关RPC协议的更多信息。

  • 定义服务接口:在开发RPC服务之前,需要定义RPC服务的接口。这可能涉及到定义函数的输入和输出参数,以及服务可以执行的操作。为了使服务具有可伸缩性,需要设计合理的接口,确保它们能够适应未来的需求。

  • 服务注册与发现:远程调用需要知道被调用的服务的位置,需要具备服务注册与发现的能力,如ZooKeeper、Eureka等。

  • 实现RPC客户端和服务器:在RPC客户端和服务器之间建立通信是一项重要任务。需要选择适当的通信协议(如TCP或HTTP),并编写代码以确保客户端和服务器之间的稳定通信。还需要实现序列化和反序列化机制,以便数据可以在客户端和服务器之间传输。

  • 处理并发和线程安全性:RPC服务可能会处理多个请求,因此需要确保代码能够正确地处理并发请求。还需要考虑线程安全性,以确保多个线程可以同时访问您的代码而不会导致竞态条件或死锁。

  • 安全机制:远程调用需要具备安全机制,如身份认证、授权机制等,保障数据传输的安全。

  • 负载均衡:在集群环境下,远程调用需要具备负载均衡的能力,根据不同的负载均衡策略将请求分发到不同的节点上。

2,项目实践

2.1,项目搭建

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.nudt</groupId>
  7. <artifactId>MyRPC</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <packaging>pom</packaging>
  10. <modules>
  11. <module>Common</module>
  12. <module>CodeC</module>
  13. <module>Proto</module>
  14. <module>Transport</module>
  15. <module>Server</module>
  16. <module>Client</module>
  17. </modules>
  18. <properties>
  19. <maven.compiler.source>11</maven.compiler.source>
  20. <maven.compiler.target>11</maven.compiler.target>
  21. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  22. </properties>
  23. <dependencyManagement>
  24. <!--子模块可能用到的-->
  25. <dependencies>
  26. <dependency>
  27. <groupId>commons-io</groupId>
  28. <artifactId>commons-io</artifactId>
  29. <version>2.7</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.eclipse.jetty</groupId>
  33. <artifactId>jetty-servlet</artifactId>
  34. <version>9.2.28.v20190418</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>com.alibaba</groupId>
  38. <artifactId>fastjson</artifactId>
  39. <version>2.0.9</version>
  40. </dependency>
  41. </dependencies>
  42. </dependencyManagement>
  43. <dependencies>
  44. <!--所有子模块也会被添加-->
  45. <dependency>
  46. <groupId>junit</groupId>
  47. <artifactId>junit</artifactId>
  48. <version>4.12</version>
  49. <scope>test</scope>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.projectlombok</groupId>
  53. <artifactId>lombok</artifactId>
  54. <version>1.18.10</version>
  55. </dependency>
  56. <dependency>
  57. <groupId>org.slf4j</groupId>
  58. <artifactId>slf4j-api</artifactId>
  59. <version>1.7.26</version>
  60. </dependency>
  61. <dependency>
  62. <groupId>ch.qos.logback</groupId>
  63. <artifactId>logback-classic</artifactId>
  64. <version>1.2.10</version>
  65. </dependency>
  66. </dependencies>
  67. <build>
  68. <plugins>
  69. <plugin>
  70. <groupId>org.apache.maven.plugins</groupId>
  71. <artifactId>maven-archetype-plugin</artifactId>
  72. <version>3.2.0</version>
  73. </plugin>
  74. </plugins>
  75. </build>
  76. </project>

2.2,协议类

Peer:网络传输的一个端点

  1. package org.nudt;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. //表示网络传输的一个端点
  5. @Data //set,get
  6. @AllArgsConstructor //所有参数的构造器
  7. public class Peer {
  8. //地址
  9. private String host;
  10. //端口号
  11. private int port;
  12. }

ServiceDescriptor

  1. package org.nudt;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import java.lang.reflect.Method;
  6. import java.util.Arrays;
  7. //表示哦服务
  8. @Data //set,get
  9. @AllArgsConstructor //所有参数的构造器
  10. @NoArgsConstructor //默认构造器
  11. public class ServiceDescriptor {
  12. //
  13. private String clazz;
  14. //方法
  15. private String method;
  16. //返回类型
  17. private String returnType;
  18. //参数类型
  19. private String[] parameterTypes;
  20. public static ServiceDescriptor from(Class clazz, Method method) {
  21. ServiceDescriptor sdp = new ServiceDescriptor();
  22. sdp.setClazz(clazz.getName());
  23. sdp.setMethod(method.getName());
  24. sdp.setReturnType(method.getReturnType().getTypeName());
  25. Class[] parameterClasses = method.getParameterTypes();
  26. String[] parameterTypes = new String[parameterClasses.length];
  27. for (int i = 0; i < parameterClasses.length; i++) {
  28. parameterTypes[i] = parameterClasses[i].getName();
  29. }
  30. sdp.setParameterTypes(parameterTypes);
  31. return sdp;
  32. }
  33. public boolean equals(Object o) {
  34. if (this == o) {
  35. return true;
  36. }
  37. if (o == null || getClass() != o.getClass()) {
  38. return false;
  39. }
  40. ServiceDescriptor that = (ServiceDescriptor) o;
  41. return this.toString().equals(that.toString());
  42. }
  43. public int hashCode() {
  44. return toString().hashCode();
  45. }
  46. public String toString() {
  47. return "ServiceDescriptor{" +
  48. "clazz='" + clazz + '\'' +
  49. ", method='" + method + '\'' +
  50. ", returnType='" + returnType + '\'' +
  51. ", parameterTypes=" + Arrays.toString(parameterTypes) +
  52. '}';
  53. }
  54. }

Request&Response

  1. package org.nudt;
  2. import lombok.Data;
  3. @Data //set,get
  4. public class Request {
  5. //请求服务
  6. private ServiceDescriptor service;
  7. //请求参数
  8. private Object[] parameter;
  9. }
  1. package org.nudt;
  2. import lombok.Data;
  3. //表示RPC的响应
  4. @Data //set,get
  5. public class Response {
  6. //服务返回信息:0成功,非0失败
  7. private int code = 0;
  8. //具体错误信息
  9. private String message = "ok";
  10. //返回数据
  11. private Object data;
  12. }

2.3,反射工具类

ReflectionUtils

  1. package org.nudt;
  2. import java.lang.reflect.Method;
  3. import java.lang.reflect.Modifier;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. //反射工具类
  7. public class ReflectionUtils {
  8. //根据Class创建对象
  9. public static <T> T newInstance(Class clazz) {
  10. try {
  11. return (T) clazz.newInstance();
  12. } catch (Exception e) {
  13. throw new IllegalStateException(e);
  14. }
  15. }
  16. //获取某个类所有的公共方法
  17. public static Method[] getPublicMethods(Class clazz) {
  18. Method[] methods = clazz.getDeclaredMethods();
  19. List<Method> pmethods = new ArrayList<>();
  20. for (Method m : methods) {
  21. if (Modifier.isPublic(m.getModifiers())) {
  22. pmethods.add(m);
  23. }
  24. }
  25. return pmethods.toArray(new Method[0]);
  26. }
  27. //调用某个对象的指定方法
  28. public static Object invoke(Object obj, Method method, Object... args) {
  29. try {
  30. return method.invoke(obj, args);
  31. } catch (Exception e) {
  32. throw new IllegalStateException(e);
  33. }
  34. }
  35. }

2.4,序列化模块

引入fastjson

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.alibaba</groupId>
  4. <artifactId>fastjson</artifactId>
  5. <version>2.0.9</version>
  6. </dependency>
  7. </dependencies>

Decoder&Encoder

  1. package org.nudt;
  2. //反序列化
  3. public interface Decoder {
  4. <T> T decode(byte[] bytes, Class<T> clazz);
  5. }
  1. package org.nudt;
  2. //序列化
  3. public interface Encoder {
  4. byte[] encode(Object obj);
  5. }

JSONEncoder&JSONDecoder

  1. package org.nudt;
  2. import com.alibaba.fastjson.JSON;
  3. //基于JSON的序列化实现
  4. public class JSONEncoder implements Encoder {
  5. public byte[] encode(Object obj) {
  6. return JSON.toJSONBytes(obj);
  7. }
  8. }
  1. package org.nudt;
  2. import com.alibaba.fastjson2.JSON;
  3. //基于JSON的反序列化
  4. public class JSONDecoder implements Decoder {
  5. public <T> T decode(byte[] bytes, Class<T> clazz) {
  6. return JSON.parseObject(bytes, clazz);
  7. }
  8. }

3,网络模块

3.1,接口

引入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>commons-io</groupId>
  4. <artifactId>commons-io</artifactId>
  5. <version>2.7</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.eclipse.jetty</groupId>
  9. <artifactId>jetty-servlet</artifactId>
  10. <version>9.2.28.v20190418</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.nudt</groupId>
  14. <artifactId>Proto</artifactId>
  15. <version>1.0-SNAPSHOT</version>
  16. </dependency>
  17. </dependencies>

TransportClient&TranspotServer

  1. package org.nudt;
  2. import java.io.InputStream;
  3. /*
  4. * 创建连接
  5. * 发送数据,并且等待响应
  6. * 关闭连接*/
  7. public interface TransportClient {
  8. void connect(Peer peer);
  9. InputStream write(InputStream data);
  10. void close();
  11. }
  1. package org.nudt;
  2. /*启动,监听
  3. * 接受请求
  4. * 关闭监听*/
  5. public interface TransportServer {
  6. void init(int prot, RequestHandler handler);
  7. void start();
  8. void stop();
  9. }

RequestHandler

  1. package org.nudt;
  2. import java.io.InputStream;
  3. import java.io.OutputStream;
  4. //处理网络请求的handler
  5. public interface RequestHandler {
  6. void onRequest(InputStream revice, OutputStream toResponse);
  7. }

3.2,实现Client

HttpTransportClient

  1. package org.nudt;
  2. import org.apache.commons.io.IOUtils;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.net.HttpURLConnection;
  6. import java.net.URL;
  7. public class HTTPTransportClient implements TransportClient {
  8. private String url;
  9. public void connect(Peer peer) {
  10. this.url = "http://" + peer.getHost() + ":" + peer.getPort();
  11. }
  12. public InputStream write(InputStream data) {
  13. try {
  14. HttpURLConnection httpConn = (HttpURLConnection) new URL(url).openConnection();
  15. httpConn.setDoInput(true);
  16. httpConn.setDoOutput(true);
  17. httpConn.setUseCaches(false);
  18. httpConn.setRequestMethod("POST");
  19. httpConn.connect();
  20. IOUtils.copy(data, httpConn.getOutputStream());
  21. int resultCode = httpConn.getResponseCode();
  22. if (resultCode == HttpURLConnection.HTTP_OK) {
  23. return httpConn.getInputStream();
  24. } else {
  25. return httpConn.getErrorStream();
  26. }
  27. } catch (IOException e) {
  28. throw new IllegalStateException(e);
  29. }
  30. }
  31. public void close() {
  32. }
  33. }

3.3,实现Server

HttpTransportServer

  1. package org.nudt;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.eclipse.jetty.server.Server;
  4. import org.eclipse.jetty.servlet.ServletContextHandler;
  5. import org.eclipse.jetty.servlet.ServletHolder;
  6. import javax.servlet.ServletException;
  7. import javax.servlet.http.HttpServlet;
  8. import javax.servlet.http.HttpServletRequest;
  9. import javax.servlet.http.HttpServletResponse;
  10. import java.io.IOException;
  11. import java.io.InputStream;
  12. import java.io.OutputStream;
  13. @Slf4j
  14. public class HttpTransportServer implements TransportServer {
  15. private RequestHandler handler;
  16. private Server server;
  17. public void init(int prot, RequestHandler handler) {
  18. this.handler = handler;
  19. this.server = new Server(prot);
  20. //Servlet接收请求
  21. ServletContextHandler ctx = new ServletContextHandler();
  22. server.setHandler(ctx);
  23. ServletHolder holder = new ServletHolder(new RequestServlet());
  24. ctx.addServlet(holder, "/*");
  25. }
  26. public void start() {
  27. try {
  28. server.start();
  29. server.join();
  30. } catch (Exception e) {
  31. throw new RuntimeException(e);
  32. }
  33. }
  34. public void stop() {
  35. try {
  36. server.start();
  37. } catch (Exception e) {
  38. throw new RuntimeException(e);
  39. }
  40. }
  41. class RequestServlet extends HttpServlet {
  42. protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
  43. log.info("client connect");
  44. InputStream in = req.getInputStream();
  45. OutputStream out = resp.getOutputStream();
  46. if (handler != null) {
  47. handler.onRequest(in, out);
  48. }
  49. out.flush();
  50. }
  51. }
  52. }

4,Server模块

4.1,配置

引入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.nudt</groupId>
  4. <artifactId>Proto</artifactId>
  5. <version>1.0-SNAPSHOT</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.nudt</groupId>
  9. <artifactId>CodeC</artifactId>
  10. <version>1.0-SNAPSHOT</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.nudt</groupId>
  14. <artifactId>Common</artifactId>
  15. <version>1.0-SNAPSHOT</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>commons-io</groupId>
  19. <artifactId>commons-io</artifactId>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.nudt</groupId>
  23. <artifactId>Transport</artifactId>
  24. <version>1.0-SNAPSHOT</version>
  25. <scope>compile</scope>
  26. </dependency>
  27. </dependencies>

RpcServerConfig

  1. package org.nudt;
  2. import lombok.Data;
  3. //server配置
  4. @Data
  5. public class RpcServerConfig {
  6. private Class<? extends TransportServer> transportClass = HttpTransportServer.class;
  7. private Class<? extends Encoder> encoderClass = JSONEncoder.class;
  8. private Class<? extends Decoder> decoderClass = JSONDecoder.class;
  9. private int port = 3000;
  10. }

RpcServer

  1. package org.nudt;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.commons.io.IOUtils;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.io.OutputStream;
  7. @Slf4j
  8. public class RpcServer {
  9. private RpcServerConfig config;
  10. private TransportServer net;
  11. private Encoder encoder;
  12. private Decoder decoder;
  13. private ServiceManager serviceManager;
  14. private ServiceInvoker serviceInvoker;
  15. public RpcServer(RpcServerConfig config) {
  16. this.config = config;
  17. //net
  18. this.net = ReflectionUtils.newInstance(config.getTransportClass());
  19. this.net.init(config.getPort(), this.handler);
  20. //序列化
  21. this.encoder = ReflectionUtils.newInstance(config.getEncoderClass());
  22. //反序列化
  23. this.decoder = ReflectionUtils.newInstance(config.getDecoderClass());
  24. //service
  25. this.serviceManager = new ServiceManager();
  26. this.serviceInvoker = new ServiceInvoker();
  27. }
  28. public <T> void register(Class<T> interfaceClass, T bean) {
  29. serviceManager.register(interfaceClass, bean);
  30. }
  31. public void start() {
  32. this.net.start();
  33. }
  34. public void stop() {
  35. this.net.stop();
  36. }
  37. private RequestHandler handler = new RequestHandler() {
  38. public void onRequest(InputStream revice, OutputStream toResponse) {
  39. Response resp = new Response();
  40. try {
  41. byte[] inBytes = IOUtils.readFully(revice, revice.available());
  42. Request request = decoder.decode(inBytes, Request.class);
  43. log.info("get request:{}", request);
  44. ServiceInstance sis = serviceManager.lookup(request);
  45. Object ret = serviceInvoker.invoke(sis, request);
  46. resp.setData(ret);
  47. } catch (Exception e) {
  48. resp.setData(1);
  49. resp.setMessage("RpcServer got error:" + e.getClass() + " : " + e.getMessage());
  50. } finally {
  51. byte[] outBytes = encoder.encode(resp);
  52. try {
  53. toResponse.write(outBytes);
  54. } catch (IOException e) {
  55. log.warn(e.getMessage(), e);
  56. }
  57. }
  58. }
  59. };
  60. }

4.2,Server

ServiceInstance

  1. package org.nudt;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import java.lang.reflect.Method;
  5. //表示一个具体服务
  6. @Data
  7. @AllArgsConstructor
  8. public class ServiceInstance {
  9. private Object target;
  10. private Method method;
  11. }

ServiceInvoker

  1. package org.nudt;
  2. //调用具体服务
  3. public class ServiceInvoker {
  4. public Object invoke(ServiceInstance service, Request request) {
  5. return ReflectionUtils.invoke(service.getTarget(),service.getMethod(),request.getParameter());
  6. }
  7. }

ServiceManager

  1. package org.nudt;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.lang.reflect.Method;
  4. import java.util.Map;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. //管理RPC暴露的服务
  7. @Slf4j
  8. public class ServiceManager {
  9. private Map<ServiceDescriptor, ServiceInstance> services;
  10. public ServiceManager() {
  11. this.services = new ConcurrentHashMap<>();
  12. }
  13. //注册:扫描接口所有方法
  14. public <T> void register(Class<T> interfaceClass, T bean) {
  15. Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass);
  16. for (Method method : methods) {
  17. ServiceInstance sis = new ServiceInstance(bean, method);
  18. ServiceDescriptor sdp = ServiceDescriptor.from(interfaceClass, method);
  19. services.put(sdp, sis);
  20. log.info("有服务被注册了:{} {}", sdp.getClazz(), sdp.getMethod());
  21. }
  22. }
  23. //查找服务
  24. public ServiceInstance lookup(Request request){
  25. ServiceDescriptor sdp = request.getService();
  26. return services.get(sdp);
  27. }
  28. }

5,Client模块

5.1,配置

引入配置

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.nudt</groupId>
  4. <artifactId>Proto</artifactId>
  5. <version>1.0-SNAPSHOT</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.nudt</groupId>
  9. <artifactId>CodeC</artifactId>
  10. <version>1.0-SNAPSHOT</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.nudt</groupId>
  14. <artifactId>Common</artifactId>
  15. <version>1.0-SNAPSHOT</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.nudt</groupId>
  19. <artifactId>Transport</artifactId>
  20. <version>1.0-SNAPSHOT</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>commons-io</groupId>
  24. <artifactId>commons-io</artifactId>
  25. </dependency>
  26. </dependencies>

RpcClientConfig

  1. package org.nudt;
  2. import lombok.Data;
  3. import java.util.Arrays;
  4. import java.util.List;
  5. @Data
  6. public class RpcClientConfig {
  7. private Class<? extends TransportClient> transportClass = HTTPTransportClient.class;
  8. private Class<? extends Encoder> encoderClass = JSONEncoder.class;
  9. private Class<? extends Decoder> decoderClass = JSONDecoder.class;
  10. private Class<? extends TransportSelector> selectorClass = RandomTransportSelector.class;
  11. private int connectCount = 1;
  12. private List<Peer> servers = Arrays.asList(new Peer("127.0.0.1", 3000));
  13. }

TransportSelector

  1. package org.nudt;
  2. import java.util.List;
  3. //选择哪个Server去连接
  4. public interface TransportSelector {
  5. /*初始化
  6. * peers:可以连接的server端点信息
  7. * count:client与server建立多少个连接
  8. * clazz:client实现class*/
  9. void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz);
  10. //选择一个transport与Server做交互
  11. TransportClient select();
  12. //释放client
  13. void release(TransportClient client);
  14. //关闭client
  15. void close();
  16. }

RandomTransportSelector

  1. package org.nudt;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Random;
  6. @Slf4j
  7. public class RandomTransportSelector implements TransportSelector {
  8. //已经连接好的client
  9. private List<TransportClient> clients;
  10. public RandomTransportSelector() {
  11. clients = new ArrayList<>();
  12. }
  13. public synchronized void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) {
  14. count = Math.max(count, 1);
  15. for (Peer peer : peers) {
  16. for (int i = 0; i < count; i++) {
  17. TransportClient client = ReflectionUtils.newInstance(clazz);
  18. client.connect(peer);
  19. clients.add(client);
  20. }
  21. log.info("connect server:{}", peer);
  22. }
  23. }
  24. public synchronized TransportClient select() {
  25. int i = new Random().nextInt(clients.size());
  26. return clients.remove(i);
  27. }
  28. public synchronized void release(TransportClient client) {
  29. clients.add(client);
  30. }
  31. public synchronized void close() {
  32. for (TransportClient cliet : clients) {
  33. cliet.close();
  34. }
  35. clients.clear();
  36. }
  37. }

5.2,Client

RpcClient

  1. package org.nudt;
  2. import java.lang.reflect.Proxy;
  3. public class RpcClient {
  4. private RpcClientConfig config;
  5. private Encoder encoder;
  6. private Decoder decoder;
  7. private TransportSelector selector;
  8. public RpcClient() {
  9. this(new RpcClientConfig());
  10. }
  11. public RpcClient(RpcClientConfig config) {
  12. this.config = config;
  13. this.encoder = ReflectionUtils.newInstance(this.config.getEncoderClass());
  14. this.decoder = ReflectionUtils.newInstance(this.config.getDecoderClass());
  15. this.selector = ReflectionUtils.newInstance(this.config.getSelectorClass());
  16. this.selector.init(this.config.getServers(), this.config.getConnectCount(), this.config.getTransportClass());
  17. }
  18. //获取接口代理对象
  19. public <T> T getProxy(Class<T> clazz) {
  20. return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new RemoteInvoker(clazz, encoder, decoder, selector));
  21. }
  22. }

RemoteInvoker

  1. package org.nudt;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.commons.io.IOUtils;
  4. import java.io.ByteArrayInputStream;
  5. import java.io.InputStream;
  6. import java.lang.reflect.InvocationHandler;
  7. import java.lang.reflect.Method;
  8. @Slf4j
  9. //调用远程服务代理类
  10. public class RemoteInvoker implements InvocationHandler {
  11. private Class clazz;
  12. private Encoder encoder;
  13. private Decoder decoder;
  14. private TransportSelector selector;
  15. public RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder, TransportSelector selector) {
  16. this.clazz = clazz;
  17. this.encoder = encoder;
  18. this.decoder = decoder;
  19. this.selector = selector;
  20. }
  21. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  22. Request request = new Request();
  23. request.setService(ServiceDescriptor.from(clazz, method));
  24. request.setParameter(args);
  25. Response response = invokeRemote(request);
  26. if (response == null || response.getCode() != 0) {
  27. throw new IllegalStateException("fail to invoke remote:" + response);
  28. }
  29. return response.getData();
  30. }
  31. private Response invokeRemote(Request request) {
  32. Response resp = null;
  33. TransportClient client = null;
  34. try {
  35. client = selector.select();
  36. byte[] outBytes = encoder.encode(request);
  37. InputStream revice = client.write(new ByteArrayInputStream(outBytes));
  38. byte[] inBytes = IOUtils.readFully(revice, revice.available());
  39. resp = decoder.decode(inBytes, Response.class);
  40. } catch (Exception e) {
  41. log.warn(e.getMessage(), e);
  42. resp = new Response();
  43. resp.setCode(1);
  44. resp.setMessage("RpcClient got error:" + e.getClass() + ":" + e.getMessage());
  45. } finally {
  46. if (client != null) {
  47. selector.release(client);
  48. }
  49. }
  50. return resp;
  51. }
  52. }

6,测试

CalcService

  1. package org.nudt;
  2. public interface CalcService {
  3. int add(int a, int b);
  4. int minus(int a, int b);
  5. }

CalcServiceImpl

  1. package org.nudt;
  2. public class CalcServiceImpl implements CalcService {
  3. public int add(int a, int b) {
  4. return a + b;
  5. }
  6. @Override
  7. public int minus(int a, int b) {
  8. return a - b;
  9. }
  10. }

Client

  1. package org.nudt;
  2. public class Client {
  3. public static void main(String[] args) {
  4. RpcClient client = new RpcClient();
  5. CalcService service = client.getProxy(CalcService.class);
  6. int r1 = service.add(1, 2);
  7. int r2 = service.minus(1, 2);
  8. System.out.println(r1 + " " + r2);
  9. }
  10. }

Server

  1. package org.nudt;
  2. public class Server {
  3. public static void main(String[] args) {
  4. RpcServer server = new RpcServer(new RpcServerConfig());
  5. server.register(CalcService.class, new CalcServiceImpl());
  6. server.start();
  7. }
  8. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/473861
推荐阅读
相关标签
  

闽ICP备14008679号