赞
踩
RPC远程过程调用(Remote Procedure Call) 调用远程计算机上的服务,就像调用本地服务一样。
分布式环境下各服务之间的相互调用必然会用到RPC思想。如下图:
WebService(过时并且太重量级,目前一些老的金融和电信行业还在用)
Dubbo( 阿里开源,据说2012年停止更新)
HSF(也是阿里的,好像是现在内部在使用,了解甚少)
Spring Cloud(如果Dubbo比作组装机,SpringCloud就是品牌机)
个人理解:所有的框架核心思想和解决的问题都是一样的,并且做了很多优化和完善,还有更多的特性。所以在学习使用框架前要了解框架的核心思想,这样带着思想去学习更得心应手。并且可以学习框架的设计模式,自己去思考为什么这么做?这么做的道理是什么。。。不逼逼了 上代码。。。
RPC实现核心——动态代理+网络通讯增强
RPC中的远程调用是给目标对象提供一个代理, 并且由代理对象控制目标对象的引用。
这样做的目的是因为:屏蔽调服务处理的细节,通过网络通讯远程调用做前置和后置的增强。
新建三个Java目录,如下图:
它们三个要做什么事情呢??
首先server 会将自己暴露出的服务注册到register-center中,client 通过代理对象获取服务的实现类并调用。
- /**
- * 注册中心
- */
- public class RegisterCenter {
-
- // 线程池
- private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-
- // 存放服务注册的容器
- public static final HashMap<String, Class> serviceRegistry = new HashMap<String, Class>();
-
- private static boolean isRunning = false;
-
- private static int port;
-
- public RegisterCenter(int port) {
- this.port = port;
- }
-
- /**
- * 服务的注册:socket通讯+反射
- * @param serviceInterface
- * @param impl
- */
- public void register(String serviceName, Class serviceInterface, Class impl) {
- System.out.println(serviceName + " 已加入注册中心!");
- serviceRegistry.put(serviceInterface.getName(), impl);
- System.out.println("注册中心列表:" + serviceRegistry.toString());
- }
-
- /**
- * 启动服务注册中心
- * @throws IOException
- */
- public void start() throws IOException {
- //服务器监听
- ServerSocket serverSocket = new ServerSocket();
- //监听绑定端口
- serverSocket.bind(new InetSocketAddress(port));
- System.out.println("服务中心已启动....");
- try {
- while (true) {
- // 监听客户端的TCP连接,接到TCP连接后将其封装成task,
- // 由线程池执行,并且同时将socket送入(server.accept()=socket)
- executor.execute(new ServiceTask(serverSocket.accept()));
- }
- } finally {
- serverSocket.close();
- }
- }
- }
-
-
-
-
-
- /**
- * 服务获取后运行线程
- */
- public class ServiceTask implements Runnable {
-
- //客户端socket
- Socket client;
-
- public ServiceTask(Socket client) {
- this.client = client;
- }
-
- /**
- * 远程请求达到服务端,我们需要执行请求结果,并且把请求结果反馈至客户端,使用Socket通讯
- */
- @Override
- public void run() {
- //利用Java反射
- //同样适用object流
- ObjectInputStream inputStream = null;
- ObjectOutputStream outputStream = null;
- /**
- * 核心处理流程
- * 1.客户端发送的object对象拿到,
- * 2.在采用反射的机制进行调用,
- * 3.最后给返回结果
- */
- try{
- // 获取客户端发来的输入流
- inputStream = new ObjectInputStream(client.getInputStream());
- //顺序发送数据:类名、方法名、参数类型、参数值
- // 1.拿到服务名
- String serviceName = inputStream.readUTF();
- // 2.拿到方法名
- String methodName = inputStream.readUTF();
- // 3.拿到参数类型
- Class<?>[] paramTypes = ( Class<?>[])inputStream.readObject();
- // 4.拿到参数值
- Object[] arguments = (Object[]) inputStream.readObject();
- // 5.要到注册中心根据 接口名,获取实现类
- Class serviceClass = RegisterCenter.serviceRegistry.get(serviceName);
- // 6.使用反射的机制进行调用
- Method method = serviceClass.getMethod(methodName,paramTypes);
- // 7.反射调用方法,把结果拿到
- Object result = method.invoke(serviceClass.newInstance(),arguments);
- // 8.通过执行socket返回给客户端
- outputStream = new ObjectOutputStream(client.getOutputStream());
- // /把结果返回给客户端
- outputStream.writeObject(result);
-
- }catch (Exception e){
- System.out.println("服务处理异常");
- e.printStackTrace();
- }finally {
- try{
- if (null != outputStream){
- outputStream.close();
- }
- if (null != inputStream){
- inputStream.close();
- }
- if (null != client){
- client.close();
- }
- }catch (IOException e){
- System.out.println("关流异常");
- e.printStackTrace();
- }
-
-
- }
- }
- }
- /**
- * 订单服务接口
- */
- public interface OrderInterface {
- /**
- * 根据订单id查询订单
- * @param orderId
- * @return
- */
- String findOrder(String orderId);
- }
-
-
- /**
- * 服务实现类
- *
- * */
- public class OrderServiceImpl implements OrderInterface {
-
- @Override
- public String findOrder(String orderId) {
- System.out.println("----------------- select order start-----------------------");
- System.out.println("订单号:"+ orderId);
- System.out.println("正在查询数据库.......");
- String result = "【订单号:"+orderId + ", 订单金额:" + (Math.random()*100)+1 + "】";
- System.out.println("查询成功!订单信息为:" + result);
- System.out.println("----------------- select order end -----------------------");
- return result;
- }
- }
-
-
- /**
- * 向注册中心添加服务
- */
- public class ServerRegister {
-
- public static void main(String[] args) throws Exception{
- new Thread(new Runnable() {
- public void run() {
- try {
- // new服务中心
- RegisterCenter serviceServer = new RegisterCenter(8888);
- // 注册订单服务至注册中心
- serviceServer.register("订单服务",OrderInterface.class, OrderServiceImpl.class);
- // 运行我们的服务
- serviceServer.start();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- }
client代码
- /**
- * rpc框架的客户端代理部分
- */
- public class RpcClientFrame {
-
- /**
- * 远程服务的代理对象,参数为客户端要调用的的服务
- * @param serviceInterface
- * @param <T>
- * @return
- * @throws Exception
- */
- public static <T> T getRemoteProxyObj(final Class<?> serviceInterface) throws Exception {
-
- // 默认端口8888
- InetSocketAddress serviceAddr = new InetSocketAddress("127.0.0.1",8888);
- // 1.将本地的接口调用转换成JDK的动态代理,在动态代理中实现接口的远程调用
- // 进行实际的服务调用(动态代理)
- return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},new DynProxy(serviceInterface,serviceAddr));
- }
-
- /**
- * 动态代理类,实现了对远程服务的访问
- */
- private static class DynProxy implements InvocationHandler {
- // 接口
- private final Class<?> serviceInterface;
- // 远程调用地址
- private final InetSocketAddress addr;
-
- // 构造函数
- public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {
- this.serviceInterface = serviceInterface;
- this.addr = addr;
- }
-
- /**
- * 动态代理类,增强:实现了对远程服务的访问
- * @param proxy
- * @param method
- * @param args
- * @return
- * @throws Throwable
- */
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- /* 网络增强部分*/
- Socket socket = null;
- //因为传递的大部分是 方法、参数,所以我们使用Object流对象
- ObjectInputStream objectInputStream = null;
- ObjectOutputStream objectOutputStream = null;
- try {
- // 1.新建一个Socket
- socket = new Socket();
- // 2.连接到远程的地址和端口
- socket.connect(addr);
- //往远端 发送数据,按照顺序发送数据:类名、方法名、参数类型、参数值
- // 1.拿到输出的流
- objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
- // 2. 发送 调用方法的 类名,使用UTF-8避免乱码
- objectOutputStream.writeUTF(serviceInterface.getName());
- // 3.发送 方法名
- objectOutputStream.writeUTF(method.getName());
- // 4.发送 参数类型,使用Object
- objectOutputStream.writeObject(method.getParameterTypes());
- //5 .发送 参数的值,使用UTF-8避免乱码
- objectOutputStream.writeObject(args);
- // 6.刷新缓冲区,使得数据立马发送
- objectOutputStream.flush();
- // 7.立马拿到远程执行的结果
- objectInputStream = new ObjectInputStream(socket.getInputStream());
- // 8.我们要把调用的细节打印出来
- System.out.println("远程调用成功!" + serviceInterface.getName());
- //最后要网络的请求返回给返回
- return objectInputStream.readObject();
- } finally {
- socket.close();
- objectOutputStream.close();
- objectInputStream.close();
-
- }
- }
- }
- }
-
-
-
-
-
-
- /**
- * 订单服务接口
- */
- public interface OrderInterface {
- /**
- * 根据订单id查询订单
- * @param orderId
- * @return
- */
- String findOrder(String orderId);
- }
-
-
- /**
- * rpc的客户端,调用远端服务
- */
- public class Client {
- public static void main(String[] args) throws Exception {
- //动态代理获取我们的对象
- OrderInterface orderService = RpcClientFrame.getRemoteProxyObj(OrderInterface.class);
- //进远程调用我们的对象
- System.out.println(orderService.findOrder("20200613001"));
-
- }
- }
首先运行server 下的
ServerRegister的main方法,运行后 可以看到控制台输出:
接着我们调用一下 运行client 下的
Client 的main方式,运行后,可以看到控制台输出:
同时 服务端 的控制台接收到服务也打印了:
接着我们多调用几次:
可以看到一直在运行>...到此 我们已经实现了纯Java实现一个RPC框架!
虽然我们实现了一个简单地RPC框架,但是还是有诸多的问题,比如:
通讯效率方面,由于是基于IO实现的,IO的开销。Java序列化的速度,还有服务中心对服务的管理,比如服务宕机了怎么办?怎么去监听?
所以 还是要使用专业的RPC框架。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。