赞
踩
远程过程调用(Remote Procedure Call,RPC)是一种常用的分布式网络通信协议,它允许运行于一台计算机的程序调用另一台计算机的子程序,同时将网络的通信细节隐藏起来,使得用户无须额外地为这个交互作用编程。
基于Client/Server客户机与服务器的交互模式
/**
* @author suben
* @apiNote 服务提供者接口
*/
public interface EchoRpcService {
/**
* 接口方法:传入ping,返回结果
* @param ping
* @return
*/
String echo(String ping);
}
/**
* 服务提供者接口实现类
*/
public class EchoRpcServiceImpl implements EchoRpcService {
@Override
public String echo(String ping) {
return ping != null ? ping + "--> I am Ok!!" : "I am not Ok.";
}
}
import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * 服务发布者:运行在RPC服务端,将远程服务发布为本地服务,并供其他消费者调用 */ public class RpcExporter { // 创建程序在运行期间可用的固定的线程池 static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 发布方法 public static void export(String hostname,int port) { ServerSocket server = null; try { server = new ServerSocket(); server.bind(new InetSocketAddress(hostname,port)); // executor.execute(new ExporterTask(server.accept()));// 只能接收一个消费者请求 while (true){ executor.execute(new ExporterTask(server.accept())); } } catch (IOException e) { e.printStackTrace(); }finally { try { server.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 内部线程类 */ private static class ExporterTask implements Runnable{ Socket client = null; // 声明Socket public ExporterTask(Socket client){ this.client = client; } @Override public void run() { ObjectOutputStream output = null; ObjectInputStream input = null; try { // 创建对象输入流 input = new ObjectInputStream(client.getInputStream()); // 通过对象输入流读取远程接口名称、方法名、接口参数及其类型等 String interfaceName = input.readUTF(); // 根据接口名称和反射机制,获取接口Class Class<?> service = Class.forName(interfaceName); // 获取方法名称 String methodName = input.readUTF(); Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); // 通过找到的接口服务,获取真正的远程方法Method Method method = service.getMethod(methodName, parameterTypes); // 调用远程方法 Object invokeResult = method.invoke(service.newInstance(), arguments); // 创建输出流对象,将远程方法结果返回回写 output = new ObjectOutputStream(client.getOutputStream()); output.writeObject(invokeResult); } catch (Exception e) { e.printStackTrace(); }finally { if (output != null){ try { output.close(); } catch (IOException e) { e.printStackTrace(); } } if (input != null){ try { input.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; /** * RPC客户端本地服务器代理 */ public class RpcImporterProxy<S> { public S importer(final Class<?> serviceClass, final InetSocketAddress address){ return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class[]{serviceClass.getInterfaces()[0]}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectInputStream input = null; ObjectOutputStream output = null; try{ socket = new Socket(); socket.connect(address); output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(serviceClass.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(args); input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); }finally { if (socket != null){ socket.close(); } if (output != null){ output.close(); } if (input != null){ input.close(); } } } }); } }
import java.net.InetSocketAddress; /** * 测试类 */ public class RpcTest { public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { RpcExporter.export("localhost",8088); } }).start(); RpcImporterProxy<EchoRpcService> importerProxy = new RpcImporterProxy<>(); EchoRpcService echoRpcService = importerProxy.importer(EchoRpcServiceImpl.class, new InetSocketAddress("localhost", 8088)); System.out.println(echoRpcService.echo("Are you Ok?")); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。