当前位置:   article > 正文

RPC实现Consumer 远程调用_rpcconsumer

rpcconsumer

RPC实现Consumer 远程调用

梳理一下基本的实现思路,主要完成一个这样的功能:API 模块中的接口功能在服务端实现(并没有在客户端实现)。因此,客户端调用API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样。具体调用过程如下图所示:

在这里插入图片描述
1.服务提供者

    package com.netty.provider;
    
    import com.netty.api.IRpcService;
    
    /**
     * Created by chenli on 2019/11/13.
     */
    public class IRpcServiceImpl implements IRpcService {
        @Override
        public int add(int a, int b) {
            return a+b;
        }
    
        @Override
        public int sub(int a, int b) {
            return a-b;
        }
    
        @Override
        public int multi(int a, int b) {
            return a*b;
        }
    
        @Override
        public int div(int a, int b) {
            return a/b;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    2.提供给消费端的API接口

    package com.netty.api;
    
    /**
     * Created by chenli on 2019/11/13.
     */
    public interface IRpcService {
        //加
        int add(int a,int b);
    
        /**
         * 减
         * @param a
         * @param b
         * @return
         */
        int sub(int a,int b);
    
        /**
         * ×
         * @param a
         * @param b
         * @return
         */
        int multi(int a,int b);
    
        /**
         * 除
         * @param a
         * @param b
         * @return
         */
        int div(int a , int b);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    3.服务注册中心

    package com.netty.registry;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    /**
     * Created by chenli on 2019/11/13.
     */
    public class RpcRegistry {
        //应该有端口
        private int port;
    
        public RpcRegistry(int port) {
            this.port = port;
        }
        public void start(){
            //利用netty创建注册中心
            //boss线程
            EventLoopGroup bossGroup=new NioEventLoopGroup();
            EventLoopGroup workerGoup=new NioEventLoopGroup();
    
            //服务
            ServerBootstrap server=new ServerBootstrap();
            server.group(bossGroup,workerGoup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline=socketChannel.pipeline();
                            //自定义协议解码器
                            /** 入参有5个,分别解释如下
                             maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                             lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                             lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                             lengthAdjustment:要添加到长度字段值的补偿值
                             initialBytesToStrip:从解码帧中去除的第一个字节数
                             */
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                            //自定义协议编码器
                            pipeline.addLast(new LengthFieldPrepender(4));
                            //对象参数类型编码器
                            pipeline.addLast("encoder",new ObjectEncoder());
                            //对象参数类型解码器
                            pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new RpcRegistryHandler());
                        }
                    })
                    //主线程
                    .option(ChannelOption.SO_BACKLOG,128)
                    //工作线程--保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
                    //启动
            try {
                ChannelFuture channelFuture=server.bind(port).sync();
                System.out.println(" RPC Registry start listen at " + port );
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    
        public static void main(String[] args) {
            new RpcRegistry(8080).start();
        }
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    4.注册中心实际处理逻辑

    package com.netty.registry;
    
    import com.netty.protocol.InvokerProtocol;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.concurrent.EventExecutorGroup;
    
    import java.io.File;
    import java.lang.reflect.Method;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * Created by chenli on 2019/11/13.
     */
    public class RpcRegistryHandler extends ChannelInboundHandlerAdapter {
        //保存所有可用的服务
        private static ConcurrentHashMap<String ,Object> registerMap=new ConcurrentHashMap<>();
        //保存所有相关的服务名
        private List<String> className=new ArrayList<>();
    
        public RpcRegistryHandler() {
            //扫描服务
            //注册类
            scannerClass("com.netty.provider");
    
            doRegistry();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Object result=new Object();
            InvokerProtocol invokerProtocol=(InvokerProtocol) msg;
    
            if (registerMap.containsKey(invokerProtocol.getClassName())){
                Object object=registerMap.get(invokerProtocol.getClassName());
                Method method=object.getClass().getMethod(invokerProtocol.getMethodName(),invokerProtocol.getParams());
                result=method.invoke(object,invokerProtocol.getValues());
            }
            ctx.write(result);
            ctx.flush();
            ctx.close();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        private void doRegistry() {
            if (className.size()==0){
                return;
            }
            for (String className1:className){
                try {
                    Class<?> c=Class.forName(className1);
                    Class<?> i=c.getInterfaces()[0];//拿到该类第一个实现接口
                    registerMap.put(i.getName(),c.newInstance());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void scannerClass( String packageName) {
            URL uri=this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
            File dir=new File(uri.getFile());
            for (File file: dir.listFiles()) {
                if (file.isDirectory()){
                    scannerClass(dir+"."+file.getName());
                }else{
                    className.add(packageName+"."+file.getName().replace(".class","").trim());
                }
            }
    
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    5.传输协议

    package com.netty.protocol;
    
    import java.io.Serializable;
    
    /**
     * Created by chenli on 2019/11/13.
     */
    public class InvokerProtocol implements Serializable{
        private String className;//类名
        private String methodName;//方法名
        private Object[] values;//实参
        private Class<?>[] params;//形参列表
    
        public String getClassName() {
            return className;
        }
    
        public void setClassName(String className) {
            this.className = className;
        }
    
        public String getMethodName() {
            return methodName;
        }
    
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
    
        public Object[] getValues() {
            return values;
        }
    
        public void setValues(Object[] values) {
            this.values = values;
        }
    
        public Class<?>[] getParams() {
            return params;
        }
    
        public void setParams(Class<?>[] params) {
            this.params = params;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    6.消费者测试

    package com.netty.consumer;
    
    import com.netty.api.IRpcService;
    
    /**
     * Created by chenli on 2019/11/13.
     */
    public class RpcConsumer {
        public static void main(String[] args) {
            IRpcService iRpcService=RpcProxy.create(IRpcService.class);
    
            System.out.println(iRpcService.add(1,2));
            System.out.println(iRpcService.sub(2,1));
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    7.远程代理接口

    package com.netty.consumer;
    
    import com.netty.protocol.InvokerProtocol;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.codec.LengthFieldPrepender;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    /**
     * Created by chenli on 2019/11/13.
     */
    public class RpcProxy {
    
        public static <T> T create(Class<?> clazz){
            MethodProxy methodProxy=new MethodProxy(clazz);
            Class<?> [] interfaces=clazz.isInterface() ? new Class[]{clazz}: clazz.getInterfaces();
            T result=(T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,methodProxy);
            return result;
        }
    
        private static class MethodProxy implements InvocationHandler {
            private Class<?> clazz;
    
            public MethodProxy(Class<?> clazz) {
                this.clazz = clazz;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                //如果传进来是一个已实现的具体类
                if (Object.class.equals(method.getDeclaringClass())) {
                    try {
                        //具体实现类,直接调
                        return method.invoke(this, args);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    return rpcInvoke(proxy, method, args);
                }
                return null;
            }
    
            private Object rpcInvoke(Object proxy, Method method, Object[] args) {
                //封装协议
                InvokerProtocol invokerProtocol = new InvokerProtocol();
                invokerProtocol.setMethodName(method.getName());
                invokerProtocol.setClassName(this.clazz.getName());
                invokerProtocol.setValues(args);
                invokerProtocol.setParams(method.getParameterTypes());
    
                final RpcProxyHandler consumerHandler = new RpcProxyHandler();
                EventLoopGroup group = new NioEventLoopGroup();
                Bootstrap service = new Bootstrap();
                service.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //自定义协议解码器
                                /** 入参有5个,分别解释如下
                                 maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                                 lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                                 lengthFieldLength:长度字段的长度:如:长度字段是int型表示,那么这个值就是4(long型就是8)
                                 lengthAdjustment:要添加到长度字段值的补偿值
                                 initialBytesToStrip:从解码帧中去除的第一个字节数
                                 */
                                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                //自定义协议编码器
                                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                                //对象参数类型编码器
                                pipeline.addLast("encoder", new ObjectEncoder());
                                //对象参数类型解码器
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast("handler", consumerHandler);
                            }
                        });
                try {
                    ChannelFuture channelFuture = service.connect("localhost", 8080).sync();
                    channelFuture.channel().writeAndFlush(invokerProtocol).sync();
                    channelFuture.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return consumerHandler.getResponse();
            }
    
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    8.处理返回消息

    package com.netty.consumer;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * Created by chenli on 2019/11/14.
     */
    public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
        private Object response;
        public Object getResponse() {
            return response;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            response=msg;
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    版权声明:本文为CSDN博主「Leon_Jinhai_Sun」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/Leon_Jinhai_Sun/article/details/112549242

    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/424564
    推荐阅读
      

    闽ICP备14008679号