当前位置:   article > 正文

Netty 实现dubbo rpc

Netty 实现dubbo rpc

一、RPC 的基本介绍

  RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。

常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.

若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云

二、RPC 调用的过程

在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。

调用流程说明

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 接收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给server stub
  • server stub 将返回导入结果进行编码并发送给消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client) 得到结果
  • 其中,RPC 框架的目标就是把2-8 这些步骤封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

三、dubbo RPC

1.需求说明

dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。

模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。

2.设计说明

创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:

3.代码实现

netty用的包:4.1.20.Final。pom.xml如下:

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.20.Final</version>
  5. </dependency>

1)公共接口

  1. /**
  2. * @author: fqtang
  3. * @date: 2024/05/05/21:51
  4. * @description: 服务提供方和服务消费方都需要
  5. */
  6. public interface HelloService {
  7. String say(String mes);
  8. }

2)公共接口实现类

  1. import org.springframework.util.StringUtils;
  2. import com.tfq.netty.dubborpc.publicinterface.HelloService;
  3. /**
  4. * @author: fqtang
  5. * @date: 2024/05/05/21:53
  6. * @description: 描述
  7. */
  8. public class HelloServiceImpl implements HelloService {
  9. private static int count = 0;
  10. /**
  11. * 当有消费方调用该方法时就返回一个结果
  12. *
  13. * @param mes 传入消息
  14. * @return 返回结果
  15. */
  16. @Override
  17. public String say(String mes) {
  18. System.out.println("收到客户端消息=" + mes);
  19. if(StringUtils.isEmpty(mes)) {
  20. return "你好客户端,我已经收到你的消息 ";
  21. }else{
  22. return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";
  23. }
  24. }
  25. }

3)服务提供者

  1. import com.tfq.netty.dubborpc.netty.NettyServer;
  2. /**
  3. * @author: fqtang
  4. * @date: 2024/05/05/21:57
  5. * @description: 启动服务提供者,就是NettyServer
  6. */
  7. public class ServerBootstrap {
  8. public static void main(String[] args) {
  9. String hostName="127.0.0.1";
  10. int port = 8001;
  11. NettyServer.startServer(hostName,port);
  12. }
  13. }
  14. import io.netty.bootstrap.ServerBootstrap;
  15. import io.netty.channel.ChannelFuture;
  16. import io.netty.channel.ChannelInitializer;
  17. import io.netty.channel.ChannelPipeline;
  18. import io.netty.channel.EventLoopGroup;
  19. import io.netty.channel.nio.NioEventLoopGroup;
  20. import io.netty.channel.socket.SocketChannel;
  21. import io.netty.channel.socket.nio.NioServerSocketChannel;
  22. import io.netty.handler.codec.string.StringDecoder;
  23. import io.netty.handler.codec.string.StringEncoder;
  24. /**
  25. * @author: fqtang
  26. * @date: 2024/05/05/21:59
  27. * @description: 描述
  28. */
  29. public class NettyServer {
  30. public static void startServer(String hostName,int port){
  31. startServer0(hostName,port);
  32. }
  33. /**
  34. * 编写一个方法,完成对Netty Server的初始化工作和启动
  35. * @param hostName
  36. * @param port
  37. */
  38. private static void startServer0(String hostName,int port){
  39. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  40. EventLoopGroup workerGroup = new NioEventLoopGroup();
  41. try{
  42. ServerBootstrap serverBootstrap = new ServerBootstrap();
  43. serverBootstrap.group(bossGroup,workerGroup)
  44. .channel(NioServerSocketChannel.class)
  45. .childHandler(new ChannelInitializer<SocketChannel>() {
  46. @Override
  47. protected void initChannel(SocketChannel ch) throws Exception {
  48. ChannelPipeline pipeline = ch.pipeline();
  49. pipeline.addLast(new StringDecoder());
  50. pipeline.addLast(new StringEncoder());
  51. pipeline.addLast(new NettyServerHandler());
  52. }
  53. });
  54. ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();
  55. System.out.println("服务提供方开始提供服务~~~");
  56. channelFuture.channel().closeFuture().sync();
  57. }catch(Exception e){
  58. e.printStackTrace();
  59. }finally {
  60. bossGroup.shutdownGracefully();
  61. workerGroup.shutdownGracefully();
  62. }
  63. }
  64. }
  65. import io.netty.channel.ChannelHandlerContext;
  66. import io.netty.channel.ChannelInboundHandlerAdapter;
  67. import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
  68. import com.tfq.netty.dubborpc.provider.HelloServiceImpl;
  69. /**
  70. * @author: fqtang
  71. * @date: 2024/05/05/22:03
  72. * @description: 描述
  73. */
  74. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  75. @Override
  76. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  77. //获取客户端调用的消息,并调用服务
  78. System.out.println("msg = " + msg);
  79. //客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头
  80. //比如:dubboserver#hello#xxxx
  81. if(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {
  82. String res = new HelloServiceImpl().say(msg.toString()
  83. .substring(msg.toString()
  84. .lastIndexOf("#") + 1));
  85. ctx.writeAndFlush(res);
  86. }
  87. }
  88. @Override
  89. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  90. ctx.close();
  91. }
  92. }

4)消费者

  1. import com.tfq.netty.dubborpc.netty.NettyClient;
  2. import com.tfq.netty.dubborpc.publicinterface.HelloService;
  3. /**
  4. * @author: fqtang
  5. * @date: 2024/05/05/23:26
  6. * @description: 消费者
  7. */
  8. public class ClientBootstrap {
  9. /**
  10. * 这里定义协议头
  11. */
  12. public static final String ProtocolHeader = "dubboserver#say#";
  13. public static void main(String[] args) throws InterruptedException {
  14. //创建一个消费者
  15. NettyClient customer = new NettyClient();
  16. //创建代理对象
  17. HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);
  18. while(true) {
  19. Thread.sleep(10 * 1000);
  20. //通过代理对象调用提供者的方法(服务)
  21. String res = helloService.say("你好 dubbo~");
  22. System.out.println("调用的结果 res = " + res);
  23. }
  24. }
  25. }
  26. import java.lang.reflect.Proxy;
  27. import java.util.concurrent.ExecutorService;
  28. import java.util.concurrent.Executors;
  29. import io.netty.bootstrap.Bootstrap;
  30. import io.netty.channel.*;
  31. import io.netty.channel.nio.NioEventLoopGroup;
  32. import io.netty.channel.socket.SocketChannel;
  33. import io.netty.channel.socket.nio.NioSocketChannel;
  34. import io.netty.handler.codec.string.StringDecoder;
  35. import io.netty.handler.codec.string.StringEncoder;
  36. /**
  37. * @author: fqtang
  38. * @date: 2024/05/05/23:04
  39. * @description: 描述
  40. */
  41. public class NettyClient {
  42. //创建一个线程池
  43. private static ExecutorService executorService = Executors.newFixedThreadPool(2);
  44. private static NettyClientHandler clientHandler;
  45. /**
  46. * 编写方法使用代理模式,获取一个代理对象
  47. * @param serviceClass
  48. * @param protocolHeader
  49. * @return
  50. */
  51. public Object getBean(final Class<?> serviceClass, final String protocolHeader) {
  52. return Proxy.newProxyInstance(Thread.currentThread()
  53. .getContextClassLoader(),
  54. new Class<?>[]{serviceClass}, (proxy, method, args) -> {
  55. if(clientHandler == null) {
  56. initClient("127.0.0.1", 8001);
  57. }
  58. //设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],
  59. //args[0] 就是客户端调用api say(???),参数
  60. clientHandler.setParam(protocolHeader + args[0]);
  61. return executorService.submit(clientHandler).get();
  62. });
  63. }
  64. private static void initClient(String hostName, int port) {
  65. EventLoopGroup worker = new NioEventLoopGroup();
  66. try {
  67. clientHandler = new NettyClientHandler();
  68. Bootstrap bootstrap = new Bootstrap();
  69. bootstrap.group(worker)
  70. .channel(NioSocketChannel.class)
  71. .option(ChannelOption.TCP_NODELAY, true)
  72. .handler(new ChannelInitializer<SocketChannel>() {
  73. @Override
  74. protected void initChannel(SocketChannel ch) throws Exception {
  75. ChannelPipeline channelPipeline = ch.pipeline();
  76. channelPipeline.addLast(new StringDecoder());
  77. channelPipeline.addLast(new StringEncoder());
  78. channelPipeline.addLast(clientHandler);
  79. }
  80. });
  81. ChannelFuture channelFuture = bootstrap.connect(hostName, port)
  82. .sync();
  83. /*channelFuture.channel()
  84. .closeFuture()
  85. .sync();*/
  86. } catch(InterruptedException e) {
  87. e.printStackTrace();
  88. } /*finally {
  89. worker.shutdownGracefully();
  90. }*/
  91. }
  92. }
  93. package com.tfq.netty.dubborpc.netty;
  94. import java.util.concurrent.Callable;
  95. import io.netty.channel.ChannelHandlerContext;
  96. import io.netty.channel.ChannelInboundHandlerAdapter;
  97. /**
  98. * @author: fqtang
  99. * @date: 2024/05/05/22:48
  100. * @description: 描述
  101. */
  102. public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
  103. private ChannelHandlerContext context;
  104. /**
  105. * 返回的结果
  106. */
  107. private String result;
  108. /**
  109. * 客户端调用方法返回的参数
  110. */
  111. private String param;
  112. /**
  113. * 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)
  114. * @param ctx
  115. * @throws Exception
  116. */
  117. @Override
  118. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  119. //因为在其他方法会使用到这个ctx
  120. context = ctx;
  121. System.out.println("调用(1) channelActive--->连接到服务器");
  122. }
  123. /**
  124. * 被调用(4)
  125. * 收到服务器的数据后,调用方法
  126. * @param ctx
  127. * @param msg
  128. * @throws Exception
  129. */
  130. @Override
  131. public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  132. result = (String) msg;
  133. System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);
  134. //唤醒等待的线程
  135. notify();
  136. System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");
  137. }
  138. @Override
  139. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  140. ctx.close();
  141. }
  142. /**
  143. * 被调用(3), 被调用(5)
  144. * 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果
  145. * @return
  146. * @throws Exception
  147. */
  148. @Override
  149. public synchronized Object call() throws Exception {
  150. context.writeAndFlush(param);
  151. System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");
  152. //进行wait,等待channelRead 方法获取到服务器的结果后,唤醒
  153. wait();
  154. System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");
  155. return result;
  156. }
  157. /**
  158. * 被调用(2)
  159. * @param param
  160. */
  161. void setParam(String param){
  162. System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");
  163. this.param = param;
  164. }
  165. }

若有问题请留言。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/552452
推荐阅读
相关标签
  

闽ICP备14008679号