赞
踩
简单的rpc逻辑
客户端:传一个id给服务端,服务端查询到User对象返回到客户端
public class RPCClient { public static void main(String[] args) { Scanner scanner = new Scanner(System.in); try { // 建立Socket连接 Socket socket = new Socket("127.0.0.1", 8899); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); // 传给服务器id objectOutputStream.writeInt(scanner.nextInt()); objectOutputStream.flush(); // 服务器查询数据,返回对应的对象 User user = (User) objectInputStream.readObject(); System.out.println("服务端返回的User:"); System.out.println("name:"+ user.getUserName()); System.out.println("ID:" + user.getId()); System.out.println("性别: " + user.getSex()); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); System.out.println("客户端启动失败"); } } }
客户端:
objectInputStream
对象读取User
对象服务端:根据客户端传递过来的id号,去调用相应的业务逻辑方法
public class RPCServer { public static void main(String[] args) { UserServiceImpl userService = new UserServiceImpl(); try { ServerSocket serverSocket = new ServerSocket(8899); System.out.println("服务端启动了"); // BIO的方式监听Socket while (true){ Socket socket = serverSocket.accept(); // 开启一个线程去处理 new Thread(()->{ try { //java.io.ObjectOutputStream extends OutputStream //ObjectOutputStream:对象的序列化流, // 作用:把对象转成字节数据的输出到文件中保存,对象的输出过程称为序列化,可实现对象的持久存储。 ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); //ObjectInputStream 反序列化流,将之前使用 ObjectOutputStream 序列化的原始数据恢复为对象, // 以流的方式读取对象。 ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()); // 读取客户端传过来的id Integer id = ois.readInt(); User userByUserId = userService.getUserByUserId(id); // 写入User对象给客户端 oos.writeObject(userByUserId); oos.flush(); } catch (IOException e){ e.printStackTrace(); System.out.println("从IO中读取数据错误"); } }).start(); } } catch (IOException e) { e.printStackTrace(); System.out.println("服务器启动失败"); } } }
服务端:
ObjectOutputStream
问题:
只能发送一个固定的ID号和实现一个方法,需要抽象Request
只支持返回固定的User对象,不能灵活的返回不同的对象,因此需要抽象-Response
客户端不通用,调用方式和host、port都是固定的
首先定义一个通用的Request对象
/**
* 客户端会调用不同的服务,因此在一个rpc请求中,我们需要封装一个request类
* 在request类中包含接口名称,方法,参数列表等信息
* 这样服务端就可以根据这些信息去调用对应的接口完成业务逻辑
*/
@Data
@Builder
public class RPCRequest implements Serializable {
private String interfaceName;//接口名称,需要实现的接口类
private String methodName;//方法名称
private Object[] params;//参数列表
private Class<?>[] paramsTypes;//参数列表的属性
}
定义一个通用的Response对象
@Data @Builder public class RPCResponse implements Serializable { private int code; private String message; private Object data; public static RPCResponse success(Object data){ return RPCResponse.builder().code(200).data(data).build(); } public static RPCResponse fail(Object data){ return RPCResponse.builder().code(500).message("服务器发生错误").build(); } }
在之后的传输过程中,都会用request和response数据格式进行传输
服务端解析request数据并且返回一个封装好的response对象
public class RPCServer { public static void main(String[] args) { UserServiceImpl userService = new UserServiceImpl(); try { //在客户/服务器通信模式中,服务器端需要创建监听特定端口的ServerSocket, //ServerSocket负责接收客户连接请求,并生成与客户端连接的Socket。 ServerSocket serverSocket = new ServerSocket(8899); System.out.println("-----服务端启动------"); while (true){ Socket socket = serverSocket.accept(); new Thread(()-> { try { ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); RPCRequest rpcRequest =(RPCRequest) objectInputStream.readObject();//读取从客户端传递过来的数据 //根据发射得到方法 Method method = userService.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsTypes()); Object invoke = method.invoke(userService, rpcRequest.getParams());//通过invoke调用相应的方法 objectOutputStream.writeObject(RPCResponse.success(invoke));//将得到的对象返回到服务端 objectOutputStream.flush(); } catch (IOException | ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); System.out.println("IO传输数据失败"); } catch (InvocationTargetException e) { e.printStackTrace(); System.out.println("反射得到方法失败"); } catch (IllegalAccessException e) { e.printStackTrace(); System.out.println("调用方法失败"); } }).start(); } } catch (IOException e) { e.printStackTrace(); System.out.println("服务端启动失败"); } } }
客户端根据不同的service进行动态搭理,首先编写一个IOClient对象,主要用来进行底层通信,发送request对象和接收response对象
/** * 负责底层的通信,发送request,接收response对象 * 这里面主要是为了封装request对象,不同的service使用的request对象是不同的 * 客户端只会知道接口的名称,需要动态搭理进行增强,利用反射机制封装不同的service request对象 */ public class IOClient { public static RPCResponse sendRequest(String host, Integer port, RPCRequest rpcRequest){ try { Socket socket = new Socket(host,port); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); System.out.println("收到的request请求" + rpcRequest); objectOutputStream.writeObject(rpcRequest); objectOutputStream.flush(); System.out.println("------请求已经发送------"); RPCResponse rpcResponse = (RPCResponse) objectInputStream.readObject(); System.out.println("----接收到response--------"); return rpcResponse; } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); System.out.println("IOclient失败"); return null; } } }
创建一个动态代理类,接收传入的host和port参数,生成一个request对象,调用IOClient对象的sendRequest方法,向服务端发送请求,并接收服务端返回的response对象,返回response对象中的data数据
@AllArgsConstructor public class ClientProxy implements InvocationHandler { //封转request对象 private String host; private int port; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //利用发射构造request对象 RPCRequest request = RPCRequest.builder().interfaceName(method.getDeclaringClass().getName()) .methodName(method.getName()) .params(args) .paramsTypes(method.getParameterTypes()).build(); //发送request数据,得到response RPCResponse response = IOClient.sendRequest(host,port,request); return response.getData();//返回得到的数据 } //动态代理 <T>T getProxy(Class<T> clazz){ //参数:第一个使用哪个类加载器、第二个需要实现什么接口、第三个调用h的invoke方法 Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); return (T)o; } }
客户端可以利用动态代理调用不同的方法
public class RPCClient {
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
//建立动态代理对象
ClientProxy clientProxy = new ClientProxy("127.0.0.1",8899);
UserService proxy = clientProxy.getProxy(UserService.class);//把需要进行动态代理的对象传入到动态代理中
User userByUserId = proxy.getUserByUserId(scanner.nextInt());
System.out.println("从服务端得到的User对象" + userByUserId);
User user = User.builder().userName("小七").id(1).sex(true).build();//创建一个User对象
Integer integer = proxy.insertUserId(user);//使用代理对象进行插入
System.out.println("向服务端进行插入的数据" + integer);
}
}
新增:
问题:
第二个版本提供了多个服务接口
首先更新一个新的服务接口
public interface BlogService {
Blog getBlogById(Integer id);
}
public class BlogServiceImpl implements BlogService {
@Override
public Blog getBlogById(Integer id) {
Blog blog = Blog.builder().id(id).title("小七").useId(1).build();
System.out.println("客户端查询了:"+id + blog.getTitle());
return blog;
}
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Blog implements Serializable {
private Integer id;
private Integer useId;
private String title;
}
接下来,抽象画RPCServer
/**
* 将RPCServer进行抽象,以后的服务只需要实现这个接口即可,解耦合
*/
public interface RPCServer {
void start(int port);
void stop();
}
创建一个类来存放接口,使用Map数据结构来存放
/** * 一个类可能实现多个接口 */ public class ServiceProvider { private Map<String, Object> interfaceProvider; public ServiceProvider(){ this.interfaceProvider = new HashMap<>(); } public void provideServiceInterface(Object service){ Class<?>[] interfaces = service.getClass().getInterfaces(); for(Class clazz : interfaces){ interfaceProvider.put(clazz.getName(),service); } } public Object getService(String interfaceName){ return interfaceProvider.get(interfaceName); } }
单独创建一个工作线程,方便之后创建一个线程或多个线程进行执行
工作线程的主要工作为:接收request请求,通过反射调用服务端响应的方法,得到response,将response写入到客户端,也就是把之前版本服务端的业务逻辑移到了这里
/** * 工作线程 */ @AllArgsConstructor public class WorkThread implements Runnable{ private Socket socket; private ServiceProvider serviceProvide; @Override public void run() { try { ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); //读取客户端传递过来的request请求 RPCRequest rpcRequest = (RPCRequest) objectInputStream.readObject(); //调用方法得到response RPCResponse response = getResponse(rpcRequest); //将response写入到客户端 objectOutputStream.writeObject(response); objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); System.out.println("从IO中读取数据错误"); } } public RPCResponse getResponse(RPCRequest rpcRequest){ //得到服务的名称 String interfaceName = rpcRequest.getInterfaceName(); //得到服务端相应的实体类 Object service = serviceProvide.getService(interfaceName); //调用的方法 Method method = null; try { //得到要调用的方法 method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsTypes()); Object invoke = method.invoke(service, rpcRequest.getParams()); return RPCResponse.success(invoke);//返回response } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { e.printStackTrace(); System.out.println("获取方法失败"); return RPCResponse.fail(); } } }
创建一个单线程模型
public class SimpleRPCRPCServer implements RPCServer{ // 存着服务接口名-> service对象的map private ServiceProvider serviceProvider; public SimpleRPCRPCServer(ServiceProvider serviceProvide){ this.serviceProvider = serviceProvide; } public void start(int port) { try { ServerSocket serverSocket = new ServerSocket(port); System.out.println("服务端启动了"); // BIO的方式监听Socket while (true){ Socket socket = serverSocket.accept(); // 开启一个新线程去处理 new Thread(new WorkThread(socket,serviceProvider)).start(); } } catch (IOException e) { e.printStackTrace(); System.out.println("服务器启动失败"); } } public void stop(){ } }
创建测试服务端,在服务端里面写入需要调用的服务,把服务封装在ServiceProvider 中,然后通过SimpleRPCRPCServer对象接收客户端的请求
public class TestServer {
public static void main(String[] args) {
UserService userService = new UserServiceImpl();
BlogService blogService = new BlogServiceImpl();
//将两个接口放入到Map中
ServiceProvider serviceProvider = new ServiceProvider();
serviceProvider.provideServiceInterface(userService);
serviceProvider.provideServiceInterface(blogService);
RPCServer RPCServer = new SimpleRPCRPCServer(serviceProvider);
RPCServer.start(8899);
}
}
线程池版本
public class ThreadPoolRPCRPCServer implements RPCServer{ private final ThreadPoolExecutor threadPoolExecutor; private ServiceProvider serviceProvider; //预设的线程池参数 public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider) { threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 1000, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); this.serviceProvider = serviceProvider; } //核心线程池参数 public ThreadPoolRPCRPCServer(ServiceProvider serviceProvider, int corePoolSize, int maxiumPoolSize, long keepAliveTime, TimeUnit unit, BlockingDeque<Runnable> workQueue) { threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxiumPoolSize,keepAliveTime,unit,workQueue); this.serviceProvider = serviceProvider; } @Override public void start(int port) { System.out.println("线程池版服务端启动了"); try { ServerSocket serverSocket = new ServerSocket(port); while(true){ Socket socket = serverSocket.accept(); threadPoolExecutor.execute(new WorkThread(socket,serviceProvider)); } } catch (IOException e) { e.printStackTrace(); } } @Override public void stop() { } }
客户端的内容基本保持不变
总结:
缺点:
项目github地址:https://github.com/he2121/MyRPCFromZero
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。