赞
踩
RPC框架通常有基于http方式的(OpenFeign),还有tcp方式的(dubbo),我们今天就尝试使用Java的Socket自己封装一个RPC框架。
// 用于存储服务的方法
public class BeanMethod {
private Object bean;
private Method method;
// get set
}
// 定义远程服务
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RemoteService {
}
// 服务端启动时,将远程服务接口的方法都存储起来 @Component public class InitialMerdiator implements BeanPostProcessor{ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // if(bean.getClass().isAnnotationPresent(RemoteService.class)){ //加了服务发布标记的bean进行远程发布 Method[] methods=bean.getClass().getDeclaredMethods(); for(Method method:methods){ String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName(); BeanMethod beanMethod=new BeanMethod(); beanMethod.setBean(bean); beanMethod.setMethod(method); Mediator.map.put(key,beanMethod); } } return bean; } }
public class Mediator { //用来存储发布的服务的实例(服务调用的路由) public static Map<String ,BeanMethod> map=new ConcurrentHashMap<>(); private volatile static Mediator instance; private Mediator(){}; public static Mediator getInstance(){ if(instance==null){ synchronized (Mediator.class){ if(instance==null){ instance=new Mediator(); } } } return instance; } // 用于处理最终的调用请求,通过反射调用类的方法 public Object processor(RpcRequest request){ String key=request.getClassName()+"."+request.getMethodName(); //key BeanMethod beanMethod=map.get(key); if(beanMethod==null){ return null; } Object bean=beanMethod.getBean(); Method method=beanMethod.getMethod(); try { return method.invoke(bean,request.getArgs()); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return null; } }
客户端与服务器端之间的信息传输,就是使用该类进行传输
public class RpcRequest implements Serializable{
private String className; // 类名
private String methodName; // 方法名
private Object[] args; // 方法参数
private Class[] types; // 返回值类型
// ...get set
}
spring容器启动之后,会调用onApplicationEvent方法,我们创建一个Socket并开始监听客户端的消息。
//spring 容器启动完成之后,会发布一个ContextRefreshedEvent @Component public class SocketServerInitial implements ApplicationListener<ContextRefreshedEvent>{ private final ExecutorService executorService= Executors.newCachedThreadPool(); @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { //启动服务,启动Socket,监听端口。 ServerSocket serverSocket=null; try { serverSocket=new ServerSocket(8888); while(true){ Socket socket=serverSocket.accept(); //监听客户端请求 executorService.execute(new ProcessorHandler(socket)); } } catch (IOException e) { e.printStackTrace(); }finally { if(serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
服务端启动之后,新启一个线程,来监听客户端的远程调用。
public class ProcessorHandler implements Runnable{ private Socket socket; public ProcessorHandler(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream inputStream=null; ObjectOutputStream outputStream=null; try { inputStream=new ObjectInputStream(socket.getInputStream());// RpcRequest request=(RpcRequest)inputStream.readObject(); //反序列化 //路由 Mediator mediator=Mediator.getInstance(); Object rs=mediator.processor(request); System.out.println("服务端的执行结果:"+rs); outputStream=new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(rs); outputStream.flush(); } catch (Exception e) { e.printStackTrace(); }finally { if(inputStream!=null){ try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (outputStream!=null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public interface IOrderService { String queryOrderList(); String orderById(String id); } //声明这个注解之后,自动发布服务 @RemoteService public class OrderServiceImpl implements IOrderService{ @Override public String queryOrderList() { return "EXECUTE QUERYORDERLIST METHOD"; } @Override public String orderById(String id) { return "EXECUTE ORDER_BY_ID METHOD"; } }
// 自动注入
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface Reference {
}
@Component public class ReferenceInvokeProxy implements BeanPostProcessor{ @Autowired RemoteInvocationHandler invocationHandler; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { Field[] fields=bean.getClass().getDeclaredFields(); for(Field field:fields){ if(field.isAnnotationPresent(Reference.class)){ field.setAccessible(true); //针对这个加了Reference注解的字段,设置为一个代理的值 Object proxy= Proxy.newProxyInstance(field.getType().getClassLoader(),new Class<?>[]{field.getType()},invocationHandler); try { field.set(bean,proxy); //相当于针对加了Reference的注解,设置了一个代理,这个代理的实现是inovcationHandler } catch (IllegalAccessException e) { e.printStackTrace(); } } } return bean; } }
@Component public class RemoteInvocationHandler implements InvocationHandler{ @Value("${rpc.host}") private String host; @Value("${rpc.port}") private int port; public RemoteInvocationHandler() { } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //先建立远程连接 RpcNetTransport rpcNetTransport=new RpcNetTransport(host,port); //传递数据了? // 调用哪个接口、 哪个方法、方法的参数? RpcRequest request=new RpcRequest(); request.setArgs(args); request.setClassName(method.getDeclaringClass().getName()); request.setTypes(method.getParameterTypes()); //参数的类型 request.setMethodName(method.getName()); return rpcNetTransport.send(request); } }
public class RpcNetTransport { private String host; private int port; public RpcNetTransport(String host, int port) { this.host = host; this.port = port; } public Socket newSocket() throws IOException { Socket socket=new Socket(host,port); return socket; } public Object send(RpcRequest request){ ObjectOutputStream outputStream=null; ObjectInputStream inputStream=null; try { Socket socket=newSocket(); //IO操作 outputStream=new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(request); outputStream.flush(); inputStream=new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally { //TODO 不写了 } return null; } }
@RestController
public class TestController {
@Reference
private IOrderService orderService; //
@GetMapping("/test")
public String test(){
return orderService.queryOrderList();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。