当前位置:   article > 正文

SOFA RPC源码解析之Rest服务(4)-服务端响应_sofa writeandflush sofarquest

sofa writeandflush sofarquest

1 SOFA RPC源码解析

1.1 Rest服务

1.1.1  服务端响应

        在前文介绍SOFA RPC Rest服务发布过程的文章中,我们知道对于rest类型绑定,SOFA采用com.alipay.sofa.rpc.server.rest.SofaNettyJaxrsServer作为服务器。SofaNettyJaxrsServer采用Netty4作为网络通讯层。

        看一下SofaNettyJaxrsServer启动方法:

1.  public void start() {
2.          eventLoopGroup = newNioEventLoopGroup(ioWorkerCount, new NamedThreadFactory("SOFA-REST-IO-"+ port, daemon));
3.          eventExecutor = newNioEventLoopGroup(executorThreadCount, newNamedThreadFactory("SOFA-REST-BIZ-" + port, daemon));
4.          bootstrap.group(eventLoopGroup)
5.              .channel(NioServerSocketChannel.class)
6.              .childHandler(createChannelInitializer())
7.              .option(ChannelOption.SO_BACKLOG,backlog)
8.              .childOption(ChannelOption.SO_KEEPALIVE,keepAlive); // CHANGE:
9.   
10.         ……
11.  
12.         bootstrap.bind(socketAddress).syncUninterruptibly();
13.     }

        在Server配置过程中,调用createChannelInitializer方法:

1.  privateChannelInitializer<SocketChannel> createChannelInitializer() {
2.          final RequestDispatcher dispatcher =createRequestDispatcher();
3.          if (sslContext == null) {
4.              return newChannelInitializer<SocketChannel>() {
5.                  @Override
6.                  public voidinitChannel(SocketChannel ch) throws Exception {
7.                      setupHandlers(ch, dispatcher, HTTP);
8.                  }
9.              };
10.         } else {
11.             final SSLEngine engine =sslContext.createSSLEngine();
12.             engine.setUseClientMode(false);
13.             return newChannelInitializer<SocketChannel>() {
14.                 @Override
15.                 public voidinitChannel(SocketChannel ch) throws Exception {
16.                     ch.pipeline().addFirst(newSslHandler(engine));
17.                     setupHandlers(ch, dispatcher, HTTPS);
18.                 }
19.             };
20.         }
21.     }

        无论是HTTP连接,或HTTPS连接,都会调用setupHandlers方法设置各种Netty4的ChannelInboundHandler和ChannelOutboundHandler处理器:

1.  private void setupHandlers(SocketChannelch, RequestDispatcher dispatcher,
2.                                 RestEasyHttpRequestDecoder.Protocolprotocol) {
3.          ChannelPipeline channelPipeline =ch.pipeline();
4.          channelPipeline.addLast(channelHandlers.toArray(newChannelHandler[channelHandlers.size()]));
5.          channelPipeline.addLast(newHttpRequestDecoder());
6.          channelPipeline.addLast(newHttpObjectAggregator(maxRequestSize));
7.          channelPipeline.addLast(newHttpResponseEncoder());
8.          channelPipeline.addLast(httpChannelHandlers.toArray(newChannelHandler[httpChannelHandlers.size()]));
9.          channelPipeline.addLast(newRestEasyHttpRequestDecoder(dispatcher.getDispatcher(), root, protocol));
10.         channelPipeline.addLast(newRestEasyHttpResponseEncoder());
11.         channelPipeline.addLast(eventExecutor,new SofaRestRequestHandler(dispatcher)); // CHANGE: sofa的处理类
12.     }

        由于SofaNettyJaxrsServer参考JBoss Resteasy的NettyJaxrsServer实现,除了根据SOFABoot框架自身需求增加一些自定义功能外,其它还是采用JBossResteasy的处理逻辑。所以,根据Resteasy需求,设置Netty4的上行处理器和下行处理器。

        其中,上行处理器主要包括自定义非HTTP类型ChannelInboundHandler、Http请求解码器HttpRequestDecoder、Http对象聚合器HttpObjectAggregator、自定义HTTP类型ChannelInboundHandler、restful风格Http请求解码器RestEasyHttpRequestDecoder、SOFA框架restful风格Http请求处理器SofaRestRequestHandler;

        下行处理器包括自定义非HTTP类型ChannelOutboundHandler、Http响应编码器、自定义HTTP类型ChannelOutboundHandler、restful风格Http响应编码器RestEasyHttpResponseEncoder;

        我们主要关注SOFA框架restful风格Http请求处理器,对于其他处理器,在这里不详细描述。

        从上述代码中,我们可以看出,SOFA框架的业务处理是在一个单独的线程组中执行,而非在Worker线程组(此处Boss和Worker采用同一个线程组),这样可以避免耗时较长的业务处理长时间占用IO线程,导致当大量并发请求到来时,无线程处理的问题。

        以下是SofaRestRequestHandler类的主要方法:

1.  public class SofaRestRequestHandlerextends SimpleChannelInboundHandler {
2.      protected final RequestDispatcherdispatcher;
3.      ……
4.      publicSofaRestRequestHandler(RequestDispatcher dispatcher) {
5.          this.dispatcher = dispatcher;
6.      }
7.   
8.      @Override
9.      protected voidchannelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
10.         if (msg instanceof NettyHttpRequest) {
11.  
12.             NettyHttpRequest request =(NettyHttpRequest) msg;
13.  
14.             try {
15.                 if(EventBus.isEnable(RestServerReceiveEvent.class)) {
16.                     EventBus.post(newRestServerReceiveEvent(request));
17.                 }
18.  
19.                 if(request.getUri().getPath().endsWith("/favicon.ico")) {
20.                     HttpResponse response = newDefaultHttpResponse(HTTP_1_1, NOT_FOUND);
21.                     ctx.writeAndFlush(response);
22.  
23.                     if(EventBus.isEnable(RestServerSendEvent.class)) {
24.                         EventBus.post(newRestServerSendEvent(request, null, null));
25.                     }
26.                     return;
27.                 }
28.  
29.                 if(request.is100ContinueExpected()) {
30.                     send100Continue(ctx);
31.                 }
32.  
33.                 NettyHttpResponse response =request.getResponse();
34.                 Exception exception = null;
35.                 try {
36.                     // 获取远程ip 兼容nignx转发和vip
37.                     HttpHeaders httpHeaders =request.getHttpHeaders();
38.                     String remoteip =httpHeaders.getHeaderString("X-Forwarded-For");
39.                     if (remoteip == null) {
40.                         remoteip =httpHeaders.getHeaderString("X-Real-IP");
41.                     }
42.                     if (remoteip != null) {
43.                         RpcInternalContext.getContext().setRemoteAddress(remoteip,0);
44.                     } else { // request取不到就从channel里取
45.                         RpcInternalContext.getContext().setRemoteAddress(
46.                             (InetSocketAddress)ctx.channel().remoteAddress());
47.                     }
48.                     // 设置本地地址
49.                     RpcInternalContext.getContext().setLocalAddress((InetSocketAddress)ctx.channel().localAddress());
50.  
51.                     dispatcher.service(ctx, request, response,true);
52.                 } catch (Failure e1) {
53.                     response.reset();
54.                     response.setStatus(e1.getErrorCode());
55.                     exception = e1;
56.                 } catch (Exception ex) {
57.                     response.reset();
58.                     response.setStatus(500);
59.                     logger.error("Unexpected",ex); // todo 异常带给用户?
60.                     exception = ex;
61.                 } finally {
62.                     if(EventBus.isEnable(RestServerSendEvent.class)) {
63.                         EventBus.post(newRestServerSendEvent(request, response, exception));
64.                     }
65.                 }
66.  
67.                 if(!request.getAsyncContext().isSuspended()) {
68.                     response.finish();
69.                     ctx.flush();
70.                 }
71.             } finally {
72.                 if(EventBus.isEnable(ServerEndHandleEvent.class)) {
73.                     EventBus.post(newServerEndHandleEvent());
74.                 }
75.                 RpcInvokeContext.removeContext();
76.                 RpcInternalContext.removeAllContext();
77.             }
78.         }
79.     }
80.  
81.     private voidsend100Continue(ChannelHandlerContext ctx) {
82.         HttpResponse response = new DefaultHttpResponse(HTTP_1_1,CONTINUE);
83.         ctx.writeAndFlush(response);
84.     }
85.  
86.     @Override
87.     public voidexceptionCaught(ChannelHandlerContext ctx, Throwable e)
88.         throws Exception {
89.       ……
90.     }
91. }

        当Netty4服务器接收到客户端Http请求,按照顺序依次调用其ChannelPipeline中配置的上行处理器处理Http请求。 此处,上行处理器的执行顺序为:自定义非HTTP类型ChannelInboundHandler、Http请求解码器HttpRequestDecoder、Http对象聚合器HttpObjectAggregator、自定义HTTP类型ChannelInboundHandler、restful风格Http请求解码器RestEasyHttpRequestDecoder、SOFA框架restful风格Http请求处理器SofaRestRequestHandler。

        当请求到达最后一个处理SofaRestRequestHandler时,调用其channelRead0方法,处理Http请求:

        1.   经过RestEasyHttpRequestDecoder解码器处理以后,Http请求已经被转换为JBoss RestEasy的NettyHttpRequest实例request;

        2.   从NettyHttpRequest中获取NettyHttpResponse实例response;

        3.   获取远程IP地址、兼容Nignx转发和VIP等、设置本地地址等;

        4.   调用org.jboss.resteasy.plugins.server.netty.RequestDispatcher的service方法,分发Http请求;

        5.   RequestDispatcher的service方法中,调用com.alipay.sofa.rpc.server.rest.SofaSynchronousDispatcher的invoke方法:

1.  public void invoke(HttpRequest request,HttpResponse response)   {
2.        try{
3.           pushContextObjects(request, response);
4.           if (!preprocess(request, response))return;
5.           ResourceInvoker invoker = null;
6.           try {
7.              invoker = getInvoker(request);
8.           } catch (Exception exception) {
9.              //logger.error("getInvoker()failed mapping exception", exception);
10.             writeException(request, response,exception);
11.             return;
12.          }
13.          invoke(request, response, invoker);
14.       }
15.       finally {
16.          clearContextData();
17.       }
18.    }

        调用getInvoker方法:

1.  public ResourceInvokergetInvoker(HttpRequest request)  throwsFailure  {
2.        logger.debug("PathInfo: " +request.getUri().getPath());
3.        if (!request.isInitial())
4.        {
5.           throw newInternalServerErrorException(……);
6.        }
7.        ResourceInvoker invoker =registry.getResourceInvoker(request);
8.        if (invoker == null)
9.        {
10.          throw new NotFoundException(……);
11.       }
12.       return invoker;
13.    }

        调用com.alipay.sofa.rpc.server.rest.SofaResourceMethodRegistry类getResourceInvoker方法,解析请求中的路径, 然后匹配到相应的invoker(此处为com.alipay.sofa.rpc.server.rest.SofaResourceMethodInvoker实例)来执行客户端请求。例如:该Http请求路径为:/webapi/rest/person/sayName/Mike,匹配到相应的invoker为SofaResourceMethodInvoker的一个实例;

        获取Http请求对应的invoker以后,调用SofaSynchronousDispatcher的invoke(HttpRequestrequest, HttpResponse response, ResourceInvoker invoker)方法,处理请求:

1.  public void invoke(HttpRequest request,HttpResponse response, ResourceInvoker invoker)
2.     {
3.        Response jaxrsResponse = null;
4.        try {
5.           jaxrsResponse = invoker.invoke(request,response);
6.           if(request.getAsyncContext().isSuspended()) {
7.              ……           request.getAsyncContext().getAsyncResponse().initialRequestThreadFinished();
8.              jaxrsResponse = null; // we'rehanding response asynchronously
9.           }
10.       } catch (Exception e) {
11.          //logger.error("invoke() failedmapping exception", e);
12.          writeException(request, response, e);
13.          return;
14.       }
15.  
16.       if (jaxrsResponse != null)writeResponse(request, response, jaxrsResponse);
17.    }

        此时,调用SofaResourceMethodInvoker的invoke方法,处理请求:

1.     public BuiltResponse invoke(HttpRequestrequest, HttpResponse response) {
2.        Object target =resource.createResource(request, response, resourceMethodProviderFactory);
3.        return invoke(request, response, target);
4.     }

        调用com.alipay.sofa.rpc.server.rest.SofaResourceFactory的createResource方法,获取响应Http请求的目标对象,此处为PersonServiceImpl的一个实例。

        调用invoke方法:

1.     public BuiltResponse invoke(HttpRequestrequest, HttpResponse response, Object target)
2.     {
3.        ……
4.        BuiltResponse rtn = invokeOnTarget(request,response, target);
5.        return rtn;
6.     }

        调用invokeOnTarget方法:

1.  protected BuiltResponseinvokeOnTarget(HttpRequest request, HttpResponse response, Object target)   {     
2.        ……
3.   
4.        Object rtn = null;
5.        try
6.        {
7.           rtn = methodInjector.invoke(request, response,target);
8.        }
9.        catch (RuntimeException ex)
10.       {
11.          ……
12.       }
13.       ……
14.       jaxrsResponse.addMethodAnnotations(getMethodAnnotations());
15.       return jaxrsResponse;
16.    }

        调用org.jboss.resteasy.core.MethodInjectorImpl的invoke方法:

1.  public Object invoke(HttpRequest request,HttpResponse httpResponse, Object resource) throws Failure,ApplicationException  {
2.        ……
3.        Object result = null;
4.        try
5.        {
6.           result = invokedMethod.invoke(resource, args);
7.        }
8.        catch (IllegalAccessException e) {
9.           ……
10.       }
11.       catch (InvocationTargetException e) {
12.          ……
13.       }
14.       catch (IllegalArgumentException e) {
15.          ……
16.       }
17.       ……
18.       return result;
19.    }

        最后,通过Java反射机制,调用目标类上指定的方法,此处为PersonService的sayName方法。到此为止,请求处理完成,并返回处理结果。

        再来看一下SofaRestRequestHandler的channelRead0方法: 

1.  public class SofaRestRequestHandlerextends SimpleChannelInboundHandler {
2.   
3.      @Override
4.      protected void channelRead0(ChannelHandlerContextctx, Object msg) throws Exception {
5.          if (msg instanceof NettyHttpRequest) {
6.   
7.              NettyHttpRequest request =(NettyHttpRequest) msg;
8.   
9.              try {
10.                 ……
11.                 NettyHttpResponse response = request.getResponse();
12.                 Exception exception = null;
13.                 try {
14.                     // 获取远程ip 兼容nignx转发和vip
15.                     ……
16.                     // 设置本地地址
17.                     ……
18.                     dispatcher.service(ctx, request, response,true);
19.                 } catch (Failure e1) {
20.                     ……
21.                 } catch (Exception ex) {
22.                     ……
23.                 } finally {
24.                     ……                }
25.  
26.                 if (!request.getAsyncContext().isSuspended()){
27.                     response.finish();
28.                     ctx.flush();
29.                 }
30.             } finally {
31.                 ……
32.             }
33.         }
34.     }
35. }

        当接收的处理结果response以后,调研NettyHttpResponse的finish方法,准备处理结果,并写入Channel。

1.     publicvoid finish() throws IOException {
2.        os.flush();
3.        ChannelFuture future;
4.        if (isCommitted()) {
5.           // if committed this means the outputstream was used.
6.           future =ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
7.        } else {
8.           future =ctx.writeAndFlush(getEmptyHttpResponse());
9.        }
10.       
11.       if(!isKeepAlive()) {
12.          future.addListener(ChannelFutureListener.CLOSE);
13.       }
14.  
15.    }

        调用org.jboss.resteasy.plugins.server.netty.ChunkOutputStream的flush方法准备处理结果,并写入Channel。

1.     public void flush() throws IOException {
2.        int readable = buffer.readableBytes();
3.        if (readable == 0) return;
4.        if (!response.isCommitted()) response.prepareChunkStream();
5.        ctx.writeAndFlush(new DefaultHttpContent(buffer.copy()));
6.        buffer.clear();
7.        super.flush();
8.     }

        最后,调用Netty4的DefaultChannelHandlerContext类writeAndFlush方法,把处理结果写入Channel。

        在把处理结果通过网络返回给客户端以前,需要经过已经配置的下行处理器进行编码处理。下行处理器的执行顺序与配置顺序相反,所以按照逆序依次执行restful风格Http响应编码器RestEasyHttpResponseEncoder、自定义HTTP类型ChannelOutboundHandler、Http响应编码器、自定义非HTTP类型ChannelOutboundHandler,最终把已经编码好的处理结果的比特流,通过网络返回给客户端。

        至此,服务端处理结束。
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号