当前位置:   article > 正文

阿里巴巴中间件性能挑战赛(RPC篇 同步阻塞模型)_com.alibaba.middleware

com.alibaba.middleware

赛题要求:

一个简单的RPC框架

RPC(Remote Procedure Call )——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

框架——让编程人员便捷地使用框架所提供的功能,由于RPC的特性,聚焦于应用的分布式服务化开发,所以成为一个对开发人员无感知的接口代理,显然是RPC框架优秀的设计。

题目要求

1.要成为框架:对于框架的使用者,隐藏RPC实现。

2.网络模块可以自己编写,如果要使用IO框架,要求使用netty-4.0.23.Final。

3.支持异步调用,提供future、callback的能力。

4.能够传输基本类型、自定义业务类型、异常类型(要在客户端抛出)。

5.要处理超时场景,服务端处理时间较长时,客户端在指定时间内跳出本次调用。

6.提供RPC上下文,客户端可以透传数据给服务端。

7.提供Hook,让开发人员进行RPC层面的AOP。

注:为了降低第一题的难度,RPC框架不需要注册中心,客户端识别-DSIP的JVM参数来获取服务端IP。

衡量标准

满足所有要求。性能测试。



这个赛题现在看来其实并不难,不过当时刚参赛的时候我还是理解了好久。首先刚开始的时候其实根本不知道到底什么是RPC,感觉应该是一种无比高大上无比难实现的东东。直到我看到了梁飞的博客http://javatar.iteye.com/blog/1123915,才知道原来RPC其实并不难。通俗来说就是客户端通过一定的协议把方法名称、参数类型还有参数传给服务器,然后服务器调用对应方法,完成以后再把结果传回来给客户端,仅此而已。



首先是Consumer部分的接口定义:

  1. package com.alibaba.middleware.race.rpc.api;
  2. import com.alibaba.middleware.race.rpc.aop.ConsumerHook;
  3. import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;
  4. import java.lang.reflect.InvocationHandler;
  5. import java.lang.reflect.Method;
  6. import java.lang.reflect.Proxy;
  7. /**
  8. * Created by huangsheng.hs on 2015/3/26.
  9. */
  10. public class RpcConsumer implements InvocationHandler {
  11. private Class<?> interfaceClazz;
  12. public RpcConsumer() {
  13. }
  14. private void init() {
  15. //TODO
  16. }
  17. public RpcConsumer interfaceClass(Class<?> interfaceClass) {
  18. this.interfaceClazz = interfaceClass;
  19. return this;
  20. }
  21. public RpcConsumer version(String version) {
  22. //TODO
  23. return this;
  24. }
  25. public RpcConsumer clientTimeout(int clientTimeout) {
  26. //TODO
  27. return this;
  28. }
  29. public RpcConsumer hook(ConsumerHook hook) {
  30. return this;
  31. }
  32. public Object instance() {
  33. //TODO return an Proxy
  34. return Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{this.interfaceClazz},this);
  35. }
  36. public void asynCall(String methodName) {
  37. asynCall(methodName, null);
  38. }
  39. public <T extends ResponseCallbackListener> void asynCall(String methodName, T callbackListener) {
  40. //TODO
  41. }
  42. public void cancelAsyn(String methodName) {
  43. //TODO
  44. }
  45. @Override
  46. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  47. return null;
  48. }
  49. }
主要接口分析如下

public RpcConsumer interfaceClass(Class<?> interfaceClass):定义了RPC的服务接口(也就是客户端可以调用服务器提供的哪些服务)

public Object instance():生成一个客户端代理的实例,其中用到了Proxy类的newProxyInstance方法

public asynCall(String methodName,T callbackListener):异步调用的接口,先在注册表中注册异步回调函数,然后进行异步调用

public Object invoke(Object proxy, Method method, Object[] args):RpcConsumer实现了java中的InvocationHandler接口,因此需要重写invoke方法。invoke方法是RpcConsumer最核心的方法。调用被代理的任何服务都会被转为调用invoke方法,在invoke方法中会向服务器发送Rpc请求。如果是同步调用则会阻塞等待结果,如果是异步调用则会马上返回。

(invoke方法是程序最核心的地方,也是比较难理解的地方。如果对于invoke方法不理解,可以先参考java的代理类的原理,了解Proxy类以及InvocationHandler接口的原理再往下看)


接下来是RpcConsumer的具体实现(只贴最核心的invoke方法,其余代码请看我的github)

  1. //调用方法的协议:方法名 参数类型 参数值 上下文
  2. @Override
  3. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
  4. {
  5. if(isAsyn(method.getName())) return null; //本方法是同步方法,被异步调用则返回空
  6. Socket s = new Socket(host,PORT);
  7. try
  8. {
  9. ObjectOutputStream output = new ObjectOutputStream(s.getOutputStream());
  10. try
  11. {
  12. RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),
  13. args, RpcContext.localMap.get());
  14. consumerHook.before(request);
  15. output.writeObject(request);
  16. ObjectInputStream input = new ObjectInputStream(s.getInputStream());
  17. try
  18. {
  19. //使用Callable接口和FutureTask来实现计时
  20. FutureTask<RpcResponse> task = new FutureTask<RpcResponse>(new ResultGetter(input));
  21. new Thread(task).start();
  22. RpcResponse response= task.get(clientTimeout, TimeUnit.MILLISECONDS);
  23. if(response.isError())
  24. {
  25. InvocationTargetException targetEx = (InvocationTargetException)(response.getThrowable());
  26. Throwable t = targetEx .getTargetException();
  27. throw t;
  28. }
  29. else
  30. {
  31. Object result = response.getAppResponse();
  32. consumerHook.after(request);
  33. return result;
  34. }
  35. }
  36. finally
  37. {
  38. input.close();
  39. }
  40. }
  41. finally
  42. {
  43. output.close();
  44. }
  45. }
  46. finally
  47. {
  48. s.close();
  49. }
  50. }
思路其实很简单,就是把方法名、参数类型、参数值以及其他东东包装成RpcRequest,然后用java自带的序列化方法通过socket发送到服务器端,然后再同步阻塞等待结果返回回来。当然,这个只是最简单的版本,后面我会将一下一些更加复杂的细节。不过在此之前先贴一下服务器端的代码:


接口定义:

  1. package com.alibaba.middleware.race.rpc.api;
  2. /**
  3. * Created by huangsheng.hs on 2015/3/26.
  4. */
  5. public class RpcProvider {
  6. public RpcProvider() {}
  7. /**
  8. * init Provider
  9. */
  10. private void init(){
  11. //TODO
  12. }
  13. /**
  14. * set the interface which this provider want to expose as a service
  15. * @param serviceInterface
  16. */
  17. public RpcProvider serviceInterface(Class<?> serviceInterface){
  18. //TODO
  19. return this;
  20. }
  21. /**
  22. * set the version of the service
  23. * @param version
  24. */
  25. public RpcProvider version(String version){
  26. //TODO
  27. return this;
  28. }
  29. /**
  30. * set the instance which implements the service's interface
  31. * @param serviceInstance
  32. */
  33. public RpcProvider impl(Object serviceInstance){
  34. //TODO
  35. return this;
  36. }
  37. /**
  38. * set the timeout of the service
  39. * @param timeout
  40. */
  41. public RpcProvider timeout(int timeout){
  42. //TODO
  43. return this;
  44. }
  45. /**
  46. * set serialize type of this service
  47. * @param serializeType
  48. */
  49. public RpcProvider serializeType(String serializeType){
  50. //TODO
  51. return this;
  52. }
  53. /**
  54. * publish this service
  55. * if you want to publish your service , you need a registry server.
  56. * after all , you cannot write servers' ips in config file when you have 1 million server.
  57. * you can use ZooKeeper as your registry server to make your services found by your consumers.
  58. */
  59. public void publish() {
  60. //TODO
  61. }
  62. }
具体实现(其他方法都不是很重要,只贴publish方法):

  1. public void publish(Object service,int port)
  2. {
  3. System.out.println("service:"+service.getClass().getName()+" port:"+port);
  4. ServerSocket ss = null;
  5. try {
  6. ss = new ServerSocket(port);
  7. while(true)
  8. {
  9. final Socket socket = ss.accept();
  10. Thread thread = new Thread(new ServiceThread(socket,serviceInstance));
  11. thread.start();
  12. try{
  13. thread.join(TIME_OUT);
  14. }catch(InterruptedException e){
  15. e.printStackTrace();
  16. System.out.println("thread被打断");
  17. }
  18. if(thread.isAlive())
  19. {
  20. thread.interrupt();
  21. }
  22. }
  23. } catch (Exception e) {
  24. System.out.println("thread被打断");
  25. e.printStackTrace();
  26. }
  27. }
服务端的思路其实也很简单,就是起一个简单的socket,不断地监听在某一个端口上,如果有新的请求到来就新起一个线程(ServiceThread)来处理请求


ServiceThread代码如下:

  1. package com.alibaba.middleware.race.rpc.api.impl;
  2. import java.io.*;
  3. import java.lang.reflect.Method;
  4. import java.net.Socket;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import java.util.concurrent.Future;
  10. import java.util.concurrent.TimeoutException;
  11. import com.alibaba.middleware.race.rpc.context.RpcContext;
  12. import com.alibaba.middleware.race.rpc.model.RpcRequest;
  13. import com.alibaba.middleware.race.rpc.model.RpcResponse;
  14. //RpcContext需要对props实现ThreadLocal以保证线程的同步访问
  15. public class ServiceThread implements Runnable{
  16. public ServiceThread(final Socket s,final Object service)
  17. {
  18. this.s = s;
  19. this.service = service;
  20. }
  21. @Override
  22. public void run()
  23. {
  24. try
  25. {
  26. try
  27. {
  28. ObjectInputStream input = new ObjectInputStream(s.getInputStream());
  29. try
  30. {
  31. RpcRequest request = (RpcRequest)input.readObject();
  32. RpcContext.localMap.set(request.getContextMap());
  33. ObjectOutputStream output = new ObjectOutputStream(s.getOutputStream());
  34. Method method = service.getClass().getMethod(request.getMethodName(),
  35. request.getParameterTypes());
  36. RpcResponse response = new RpcResponse();
  37. try
  38. {
  39. Object result = method.invoke(service, request.getArgs());
  40. response.setAppResponse(result);
  41. output.writeObject(response);
  42. }catch(Throwable t)
  43. {
  44. response.setThrowable(t);
  45. output.writeObject(response);
  46. }finally
  47. {
  48. output.close();
  49. }
  50. }
  51. finally
  52. {
  53. input.close();
  54. }
  55. }
  56. finally
  57. {
  58. s.close();
  59. }
  60. }catch(Exception e)
  61. {
  62. e.printStackTrace();
  63. }
  64. }
  65. final Socket s;
  66. final Object service;
  67. }

基本的原理大概就是这样,最后再讲一下一些实现的细节以及优化:

1、如何实现异步调用:由于java没有函数指针,因此可以通过在注册表中注册回调接口的方式来实现。

2、对于更加复杂的RPC框架,会使用Zookeeper来实现注册中心。

3、在程序中使用的网络io模型是同步阻塞的io模型,在比赛中,我还基于netty框架尝试了同步非阻塞的网络io模型,这个会在接下来再细讲。

4、序列化的方法使用的是java自带的序列化,这其实是十分低效的一种方式。实际上可以使用kryo、protobuf、fst等更加高效的序列库。



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/424594
推荐阅读
相关标签
  

闽ICP备14008679号