赞
踩
RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。
常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.
若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云
在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。
调用流程说明
dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。
模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。
创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:
netty用的包:4.1.20.Final。pom.xml如下:
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.20.Final</version>
- </dependency>
- /**
- * @author: fqtang
- * @date: 2024/05/05/21:51
- * @description: 服务提供方和服务消费方都需要
- */
- public interface HelloService {
-
- String say(String mes);
- }
- import org.springframework.util.StringUtils;
- import com.tfq.netty.dubborpc.publicinterface.HelloService;
-
- /**
- * @author: fqtang
- * @date: 2024/05/05/21:53
- * @description: 描述
- */
- public class HelloServiceImpl implements HelloService {
-
- private static int count = 0;
-
- /**
- * 当有消费方调用该方法时就返回一个结果
- *
- * @param mes 传入消息
- * @return 返回结果
- */
- @Override
- public String say(String mes) {
- System.out.println("收到客户端消息=" + mes);
- if(StringUtils.isEmpty(mes)) {
- return "你好客户端,我已经收到你的消息 ";
- }else{
- return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";
- }
- }
- }
- import com.tfq.netty.dubborpc.netty.NettyServer;
-
- /**
- * @author: fqtang
- * @date: 2024/05/05/21:57
- * @description: 启动服务提供者,就是NettyServer
- */
- public class ServerBootstrap {
-
- public static void main(String[] args) {
-
- String hostName="127.0.0.1";
- int port = 8001;
- NettyServer.startServer(hostName,port);
- }
-
- }
-
-
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
-
- /**
- * @author: fqtang
- * @date: 2024/05/05/21:59
- * @description: 描述
- */
- public class NettyServer {
-
- public static void startServer(String hostName,int port){
- startServer0(hostName,port);
- }
-
- /**
- * 编写一个方法,完成对Netty Server的初始化工作和启动
- * @param hostName
- * @param port
- */
- private static void startServer0(String hostName,int port){
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
-
- try{
- ServerBootstrap serverBootstrap = new ServerBootstrap();
-
- serverBootstrap.group(bossGroup,workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new StringDecoder());
- pipeline.addLast(new StringEncoder());
- pipeline.addLast(new NettyServerHandler());
- }
- });
-
- ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();
- System.out.println("服务提供方开始提供服务~~~");
- channelFuture.channel().closeFuture().sync();
- }catch(Exception e){
- e.printStackTrace();
- }finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
-
- }
- }
-
-
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
- import com.tfq.netty.dubborpc.provider.HelloServiceImpl;
-
- /**
- * @author: fqtang
- * @date: 2024/05/05/22:03
- * @description: 描述
- */
- public class NettyServerHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- //获取客户端调用的消息,并调用服务
- System.out.println("msg = " + msg);
- //客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头
- //比如:dubboserver#hello#xxxx
- if(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {
- String res = new HelloServiceImpl().say(msg.toString()
- .substring(msg.toString()
- .lastIndexOf("#") + 1));
- ctx.writeAndFlush(res);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.close();
- }
- }
-
-
- import com.tfq.netty.dubborpc.netty.NettyClient;
- import com.tfq.netty.dubborpc.publicinterface.HelloService;
-
- /**
- * @author: fqtang
- * @date: 2024/05/05/23:26
- * @description: 消费者
- */
- public class ClientBootstrap {
-
- /**
- * 这里定义协议头
- */
- public static final String ProtocolHeader = "dubboserver#say#";
-
- public static void main(String[] args) throws InterruptedException {
- //创建一个消费者
- NettyClient customer = new NettyClient();
- //创建代理对象
- HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);
- while(true) {
- Thread.sleep(10 * 1000);
- //通过代理对象调用提供者的方法(服务)
- String res = helloService.say("你好 dubbo~");
- System.out.println("调用的结果 res = " + res);
- }
- }
- }
-
-
-
- import java.lang.reflect.Proxy;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
-
- /**
- * @author: fqtang
- * @date: 2024/05/05/23:04
- * @description: 描述
- */
- public class NettyClient {
-
- //创建一个线程池
- private static ExecutorService executorService = Executors.newFixedThreadPool(2);
-
- private static NettyClientHandler clientHandler;
-
- /**
- * 编写方法使用代理模式,获取一个代理对象
- * @param serviceClass
- * @param protocolHeader
- * @return
- */
- public Object getBean(final Class<?> serviceClass, final String protocolHeader) {
-
- return Proxy.newProxyInstance(Thread.currentThread()
- .getContextClassLoader(),
- new Class<?>[]{serviceClass}, (proxy, method, args) -> {
- if(clientHandler == null) {
- initClient("127.0.0.1", 8001);
- }
- //设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],
- //args[0] 就是客户端调用api say(???),参数
- clientHandler.setParam(protocolHeader + args[0]);
- return executorService.submit(clientHandler).get();
- });
- }
-
- private static void initClient(String hostName, int port) {
- EventLoopGroup worker = new NioEventLoopGroup();
- try {
- clientHandler = new NettyClientHandler();
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(worker)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline channelPipeline = ch.pipeline();
- channelPipeline.addLast(new StringDecoder());
- channelPipeline.addLast(new StringEncoder());
- channelPipeline.addLast(clientHandler);
- }
- });
-
- ChannelFuture channelFuture = bootstrap.connect(hostName, port)
- .sync();
- /*channelFuture.channel()
- .closeFuture()
- .sync();*/
- } catch(InterruptedException e) {
- e.printStackTrace();
- } /*finally {
- worker.shutdownGracefully();
- }*/
- }
- }
-
-
-
- package com.tfq.netty.dubborpc.netty;
-
- import java.util.concurrent.Callable;
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
-
- /**
- * @author: fqtang
- * @date: 2024/05/05/22:48
- * @description: 描述
- */
- public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
-
- private ChannelHandlerContext context;
- /**
- * 返回的结果
- */
- private String result;
- /**
- * 客户端调用方法返回的参数
- */
- private String param;
-
- /**
- * 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- //因为在其他方法会使用到这个ctx
- context = ctx;
- System.out.println("调用(1) channelActive--->连接到服务器");
- }
-
- /**
- * 被调用(4)
- * 收到服务器的数据后,调用方法
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- result = (String) msg;
- System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);
- //唤醒等待的线程
- notify();
- System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.close();
- }
-
- /**
- * 被调用(3), 被调用(5)
- * 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果
- * @return
- * @throws Exception
- */
- @Override
- public synchronized Object call() throws Exception {
- context.writeAndFlush(param);
- System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");
- //进行wait,等待channelRead 方法获取到服务器的结果后,唤醒
- wait();
- System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");
- return result;
- }
-
- /**
- * 被调用(2)
- * @param param
- */
- void setParam(String param){
- System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");
- this.param = param;
- }
- }
-
若有问题请留言。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。