当前位置:   article > 正文

纯Java手写一个简单的RPC框架_java手写rpc

java手写rpc

手写之前先简单了解几个概念

What is RPC?

RPC远程过程调用(Remote Procedure Call) 调用远程计算机上的服务,就像调用本地服务一样。

 

Why RPC?

分布式环境下各服务之间的相互调用必然会用到RPC思想。如下图:

 

说一下我知道并了解的RPC框架?

WebService(过时并且太重量级,目前一些老的金融和电信行业还在用

Dubbo( 阿里开源,据说2012年停止更新

HSF(也是阿里的,好像是现在内部在使用,了解甚少

Spring Cloud(如果Dubbo比作组装机,SpringCloud就是品牌机

 

换汤不换药

个人理解:所有的框架核心思想和解决的问题都是一样的,并且做了很多优化和完善,还有更多的特性。所以在学习使用框架前要了解框架的核心思想,这样带着思想去学习更得心应手。并且可以学习框架的设计模式,自己去思考为什么这么做?这么做的道理是什么。。。不逼逼了 上代码。。。

 

先搞清楚核心思想

RPC实现核心——动态代理+网络通讯增强

RPC中的远程调用是给目标对象提供一个代理, 并且由代理对象控制目标对象的引用。

这样做的目的是因为:屏蔽调服务处理的细节,通过网络通讯远程调用做前置和后置的增强。

 

准备

新建三个Java目录,如下图:

它们三个要做什么事情呢??

首先server 会将自己暴露出的服务注册到register-center中,client 通过代理对象获取服务的实现类并调用。

 

register-center 代码

  1. /**
  2. * 注册中心
  3. */
  4. public class RegisterCenter {
  5. // 线程池
  6. private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  7. // 存放服务注册的容器
  8. public static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();
  9. private static boolean isRunning = false;
  10. private static int port;
  11. public RegisterCenter(int port) {
  12. this.port = port;
  13. }
  14. /**
  15. * 服务的注册:socket通讯+反射
  16. * @param serviceInterface
  17. * @param impl
  18. */
  19. public void register(String serviceName, Class serviceInterface, Class impl) {
  20. System.out.println(serviceName + " 已加入注册中心!");
  21. serviceRegistry.put(serviceInterface.getName(), impl);
  22. System.out.println("注册中心列表:" + serviceRegistry.toString());
  23. }
  24. /**
  25. * 启动服务注册中心
  26. * @throws IOException
  27. */
  28. public void start() throws IOException {
  29. //服务器监听
  30. ServerSocket serverSocket = new ServerSocket();
  31. //监听绑定端口
  32. serverSocket.bind(new InetSocketAddress(port));
  33. System.out.println("服务中心已启动....");
  34. try {
  35. while (true) {
  36. // 监听客户端的TCP连接,接到TCP连接后将其封装成task,
  37. // 由线程池执行,并且同时将socket送入(server.accept()=socket)
  38. executor.execute(new ServiceTask(serverSocket.accept()));
  39. }
  40. } finally {
  41. serverSocket.close();
  42. }
  43. }
  44. }
  45. /**
  46. * 服务获取后运行线程
  47. */
  48. public class ServiceTask implements Runnable {
  49. //客户端socket
  50. Socket client;
  51. public ServiceTask(Socket client) {
  52. this.client = client;
  53. }
  54. /**
  55. * 远程请求达到服务端,我们需要执行请求结果,并且把请求结果反馈至客户端,使用Socket通讯
  56. */
  57. @Override
  58. public void run() {
  59. //利用Java反射
  60. //同样适用object流
  61. ObjectInputStream inputStream = null;
  62. ObjectOutputStream outputStream = null;
  63. /**
  64. * 核心处理流程
  65. * 1.客户端发送的object对象拿到,
  66. * 2.在采用反射的机制进行调用,
  67. * 3.最后给返回结果
  68. */
  69. try{
  70. // 获取客户端发来的输入流
  71. inputStream = new ObjectInputStream(client.getInputStream());
  72. //顺序发送数据:类名、方法名、参数类型、参数值
  73. // 1.拿到服务名
  74. String serviceName = inputStream.readUTF();
  75. // 2.拿到方法名
  76. String methodName = inputStream.readUTF();
  77. // 3.拿到参数类型
  78. Class<?>[] paramTypes = ( Class<?>[])inputStream.readObject();
  79. // 4.拿到参数值
  80. Object[] arguments = (Object[]) inputStream.readObject();
  81. // 5.要到注册中心根据 接口名,获取实现类
  82. Class serviceClass = RegisterCenter.serviceRegistry.get(serviceName);
  83. // 6.使用反射的机制进行调用
  84. Method method = serviceClass.getMethod(methodName,paramTypes);
  85. // 7.反射调用方法,把结果拿到
  86. Object result = method.invoke(serviceClass.newInstance(),arguments);
  87. // 8.通过执行socket返回给客户端
  88. outputStream = new ObjectOutputStream(client.getOutputStream());
  89. // /把结果返回给客户端
  90. outputStream.writeObject(result);
  91. }catch (Exception e){
  92. System.out.println("服务处理异常");
  93. e.printStackTrace();
  94. }finally {
  95. try{
  96. if (null != outputStream){
  97. outputStream.close();
  98. }
  99. if (null != inputStream){
  100. inputStream.close();
  101. }
  102. if (null != client){
  103. client.close();
  104. }
  105. }catch (IOException e){
  106. System.out.println("关流异常");
  107. e.printStackTrace();
  108. }
  109. }
  110. }
  111. }

 

server 代码

  1. /**
  2. * 订单服务接口
  3. */
  4. public interface OrderInterface {
  5. /**
  6. * 根据订单id查询订单
  7. * @param orderId
  8. * @return
  9. */
  10. String findOrder(String orderId);
  11. }
  12. /**
  13. * 服务实现类
  14. *
  15. * */
  16. public class OrderServiceImpl implements OrderInterface {
  17. @Override
  18. public String findOrder(String orderId) {
  19. System.out.println("----------------- select order start-----------------------");
  20. System.out.println("订单号:"+ orderId);
  21. System.out.println("正在查询数据库.......");
  22. String result = "【订单号:"+orderId + ", 订单金额:" + (Math.random()*100)+1 + "】";
  23. System.out.println("查询成功!订单信息为:" + result);
  24. System.out.println("----------------- select order end -----------------------");
  25. return result;
  26. }
  27. }
  28. /**
  29. * 向注册中心添加服务
  30. */
  31. public class ServerRegister {
  32. public static void main(String[] args) throws Exception{
  33. new Thread(new Runnable() {
  34. public void run() {
  35. try {
  36. // new服务中心
  37. RegisterCenter serviceServer = new RegisterCenter(8888);
  38. // 注册订单服务至注册中心
  39. serviceServer.register("订单服务",OrderInterface.class, OrderServiceImpl.class);
  40. // 运行我们的服务
  41. serviceServer.start();
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }).start();
  47. }
  48. }

client代码

  1. /**
  2. * rpc框架的客户端代理部分
  3. */
  4. public class RpcClientFrame {
  5. /**
  6. * 远程服务的代理对象,参数为客户端要调用的的服务
  7. * @param serviceInterface
  8. * @param <T>
  9. * @return
  10. * @throws Exception
  11. */
  12. public static <T> T getRemoteProxyObj(final Class<?> serviceInterface) throws Exception {
  13. // 默认端口8888
  14. InetSocketAddress serviceAddr = new InetSocketAddress("127.0.0.1",8888);
  15. // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
  16. // 进行实际的服务调用(动态代理)
  17. return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},new DynProxy(serviceInterface,serviceAddr));
  18. }
  19. /**
  20. * 动态代理类,实现了对远程服务的访问
  21. */
  22. private static class DynProxy implements InvocationHandler {
  23. // 接口
  24. private final Class<?> serviceInterface;
  25. // 远程调用地址
  26. private final InetSocketAddress addr;
  27. // 构造函数
  28. public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {
  29. this.serviceInterface = serviceInterface;
  30. this.addr = addr;
  31. }
  32. /**
  33. * 动态代理类,增强:实现了对远程服务的访问
  34. * @param proxy
  35. * @param method
  36. * @param args
  37. * @return
  38. * @throws Throwable
  39. */
  40. public Object invoke(Object proxy, Method method, Object[] args)
  41. throws Throwable {
  42. /* 网络增强部分*/
  43. Socket socket = null;
  44. //因为传递的大部分是 方法、参数,所以我们使用Object流对象
  45. ObjectInputStream objectInputStream = null;
  46. ObjectOutputStream objectOutputStream = null;
  47. try {
  48. // 1.新建一个Socket
  49. socket = new Socket();
  50. // 2.连接到远程的地址和端口
  51. socket.connect(addr);
  52. //往远端 发送数据,按照顺序发送数据:类名、方法名、参数类型、参数值
  53. // 1.拿到输出的流
  54. objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
  55. // 2. 发送 调用方法的 类名,使用UTF-8避免乱码
  56. objectOutputStream.writeUTF(serviceInterface.getName());
  57. // 3.发送 方法名
  58. objectOutputStream.writeUTF(method.getName());
  59. // 4.发送 参数类型,使用Object
  60. objectOutputStream.writeObject(method.getParameterTypes());
  61. //5 .发送 参数的值,使用UTF-8避免乱码
  62. objectOutputStream.writeObject(args);
  63. // 6.刷新缓冲区,使得数据立马发送
  64. objectOutputStream.flush();
  65. // 7.立马拿到远程执行的结果
  66. objectInputStream = new ObjectInputStream(socket.getInputStream());
  67. // 8.我们要把调用的细节打印出来
  68. System.out.println("远程调用成功!" + serviceInterface.getName());
  69. //最后要网络的请求返回给返回
  70. return objectInputStream.readObject();
  71. } finally {
  72. socket.close();
  73. objectOutputStream.close();
  74. objectInputStream.close();
  75. }
  76. }
  77. }
  78. }
  79. /**
  80. * 订单服务接口
  81. */
  82. public interface OrderInterface {
  83. /**
  84. * 根据订单id查询订单
  85. * @param orderId
  86. * @return
  87. */
  88. String findOrder(String orderId);
  89. }
  90. /**
  91. * rpc的客户端,调用远端服务
  92. */
  93. public class Client {
  94. public static void main(String[] args) throws Exception {
  95. //动态代理获取我们的对象
  96. OrderInterface orderService = RpcClientFrame.getRemoteProxyObj(OrderInterface.class);
  97. //进远程调用我们的对象
  98. System.out.println(orderService.findOrder("20200613001"));
  99. }
  100. }

我们来测试一下

 

首先运行server 下的

ServerRegister的main方法,运行后 可以看到控制台输出:

接着我们调用一下  运行client 下的

Client 的main方式,运行后,可以看到控制台输出:

同时  服务端 的控制台接收到服务也打印了:

接着我们多调用几次:

可以看到一直在运行>...到此 我们已经实现了纯Java实现一个RPC框架!

 

总结

虽然我们实现了一个简单地RPC框架,但是还是有诸多的问题,比如:

通讯效率方面,由于是基于IO实现的,IO的开销。Java序列化的速度,还有服务中心对服务的管理,比如服务宕机了怎么办?怎么去监听?

所以 还是要使用专业的RPC框架。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/678864
推荐阅读
相关标签
  

闽ICP备14008679号