当前位置:   article > 正文

基于zk和netty的简易版RPC_zk+netty

zk+netty

简易版RPC

RPC是什么?

简单的回答 : 远程服务调用.

应用在哪里?

在dubbo中有应用.

为什么要学习它?

为了更好的理解dubbo框架以及其他的东西.

手写RPC并解析相关知识点(这才是重点)
基础知识点
实现架构(从角色角度分析)

引入官网架构图:
链接: link.
在这里插入图片描述

简单版本结构图
在这里插入图片描述
https://www.processon.com/diagraming/601fd9fee0b34d208a6a2924

具体实现

以下会贴入代码及解释及测试
工程结构

在这里插入图片描述
rpc-common : 属于公共依赖(定义公共接口, 定义规范)
rpc-consumer1: 属于消费端实现(主要通过协议与服务端通信)
rpc-provider1: 属于服务端(服务提供者)实现

基本依赖
 <!--引入netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.39.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <!--<version>1.2.47</version>-->
            <version>1.2.71</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.2</version>
        </dependency>
         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>
         <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.2.6.RELEASE</version>
        </dependency>
          <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--<version>5.1.46</version>-->
            <version>5.1.20</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.21</version>
        </dependency>

        <!--zkclient客户端-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>
        <!--zookeeper 案例 -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142

以上依赖有许多并不是专门为这个RPC服务的, 但是我这里也贴出来了,主要是这个工程还有其他的功能,这不做介绍.

具体实现

rpc-common工程:
IUserService : 接口定义

    /**
     *
     * @param msg
     * @return
     */
    String sayHello(String msg);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
JsonSerializer : 序列化接口实现
  • 1
public class JsonSerializer implements Serializer {

    @Override
    public byte[] serialize(Object o) throws IOException {
        return JSON.toJSONBytes(o);
    }

    @Override
    public <T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException {
        //反序列化
        return JSON.parseObject(bytes, clazz);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
RpcDecoder : rpc解码器
  • 1
public class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
    private Class<?> clazz;
    private Serializer serializer;
    public RpcDecoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf,
                          List<Object> out) throws Exception {
        int readInt = byteBuf.readInt();
        byte[] bytes = new byte[readInt];
        byteBuf.readBytes(bytes);
        //反序列化
        Object deserialize = serializer.deserialize(clazz, bytes);
        out.add(deserialize);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
RpcEncoder : rpc编码器
  • 1
public class RpcEncoder extends MessageToByteEncoder {
    private Class<?> clazz;
    private Serializer serializer;
    public RpcEncoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg,
                          ByteBuf out) throws Exception {
        /*
            将对象 写入到缓存 :
            写入 int + byte
         */
        if(clazz != null && clazz.isInstance(msg)) {
            byte[] bytes = serializer.serialize(msg);
            out.writeInt(bytes.length);
            out.writeBytes(bytes);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
RpcRequest : rpc请求协议对象
  • 1
  • 传输 协议对象
  • 请求服务id
  • 方法名
  • 类名
  • 形式参数
  • 实际参数
@Data
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = -4172554571135234360L;
    private String requestId;//请求服务id
    private String className;//方法名
    private String methodName;//类名
    private Class<?>[] parameterTypes;//形式参数
    private Object[] parameters;//实际参数
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
RpcResult : rpc返回结果(服务端 -> 客户端)
  • 1
@Data
public class RpcResult implements Serializable {
    private static final long serialVersionUID = 1561074744495374547L;
    private boolean success;
    private String message;
    private Object data;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
Serializer : 序列化接口
  • 1
public interface Serializer {
    /*
    接口 :
     */
    /**
     * 序列化
     * @param o
     * @return
     * @throws IOException
     */
    byte[] serialize(Object o)throws IOException;
    /**
     * 反序列化
     * @param clazz
     * @param bytes
     * @param <T>
     * @return
     * @throws IOException
     */
    <T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

ZookeeperUtils:

public class ZookeeperUtils {
    /**
     *
     */
    private static CuratorFramework client;
    /**
     *
     * @return
     */
    public static CuratorFramework getClient() {
        if(client == null) {
            synchronized (ZookeeperUtils.class) {
                if(null == client) {
                    client = createClient();
                }
            }
        }
        return client;
    }
    /**
     *
     * @return
     */
    private static CuratorFramework createClient() {
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        //使用fluent变成风格
        client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(3000)
                .retryPolicy(exponentialBackoffRetry)
                .namespace("lagou-servers")  //独立的命名空间
                .build();
        client.start();
        System.out.println("zookeeper会话创建了.");
        return client;
    }

    /**
     * 测试创建会话
     * @param args
     */
    public static void main(String[] args) {
        createClient();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
以上涉及知识点: 
1: netty的编解码器, 拆包,粘包,自定义协议.
2: netty的服务处理器的入站,出站顺序及配置.
3: netty的ByteBuf.
4: java的反射.
5: netty的事件触发点.
6: alibaba的fastjson序列化反序列化.
7: zookeeper的基础知识,zookeeper节点的特性.
	zookeeper节点的数据结构: 
	zookeeper节点的分类:
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

rpc-provider1工程:
ServerBoot

@Service
public class ServerBoot implements ApplicationRunner {
    private static String IP = "127.0.0.1";
    private static Integer PORT = 9010;
    /**
     * @param applicationArguments
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        startNettyServer(IP, PORT);
    }
    //*************启动netty
    /**
     * @param ip
     * @param port
     * @throws Exception
     */
    private static void startNettyServer(String ip, int port) throws Exception {
        /*
        1: 引导器
        2: 线程池
        3: 处理器
        4: 监听端口
         */
        //1.创建两个线程池对象
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        //2.创建服务端的启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //3.配置启动引导对象
        serverBootstrap.group(bossGroup, workGroup)
                //设置通道为NIO
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //这个配置要注意: RpcResult + RpcRequest;
                        //我们根据Encoder和Decoder来分入站和出站
                        //根据RpcRequest : 知道这个是要Encoder的
                        //知道RpcResult是要Decoder的.
                        pipeline.addLast(new RpcEncoder(RpcResult.class, new JsonSerializer()));
                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JsonSerializer()));
                        //业务处理类
                        pipeline.addLast(new UserServiceHandler(SpringContextHolder.getApplicationContext()));
                    }
                });

        //这里的sync() 
        serverBootstrap.bind(port).sync();
        //链接zk : 创建临时有序节点
        CuratorFramework client = ZookeeperUtils.getClient();
        String path = "/serverNodes/node";
        //在这里创建 lagou-servers
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(path, (ip + ":" + port).getBytes());
        //赋值为 当前服务器的 ip和端口 --》 可以和远程客户端进行通信
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
UserServiceHandler
  • 1
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
    private ApplicationContext applicationContext;
    public UserServiceHandler(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest rpcRequest = null;
        if(null == msg) {
            returnFailRst(ctx, "netty服务端: 协议对象不能为空!");
            return;
        }
        rpcRequest = (RpcRequest) msg;

        System.out.println("netty服务端: 服务端接收到的协议对象为: " + rpcRequest);
        if(StringUtils.isEmpty(rpcRequest.getRequestId())) {
            returnFailRst(ctx, "netty服务端: 请求的协议对象ID不能为空!!!");
        }
//        Object bean = applicationContext.getBean(rpcRequest.getRequestId());
        Object bean = SpringContextHolder.getApplicationContext().getBean(rpcRequest.getRequestId());
        if(bean == null) {
            returnFailRst(ctx, "netty服务端: 服务ID" + rpcRequest.getRequestId() + "不存在!");
        }
        //构造反射
        Method method = ReflectionUtils.findMethod(bean.getClass(), rpcRequest.getMethodName(),
                rpcRequest.getParameterTypes());
        Object result = ReflectionUtils.invokeMethod(method, bean, rpcRequest.getParameters());
        returnSuccRst(ctx, result);
        System.out.println("netty服务端: 返回结果 " + result);
    }
    /**
     * 构造成功结果返回
     * @param ctx
     * @param result
     */
    private void returnSuccRst(ChannelHandlerContext ctx, Object result) {
        RpcResult rpcResult = new RpcResult();
        rpcResult.setSuccess(true);
        rpcResult.setData(result);
        rpcResult.setMessage("netty服务端: 处理成功!" + result);
        ctx.writeAndFlush(rpcResult);
    }
    /**
     * 返回 失败的结果
     * @param ctx
     * @param s
     */
    private void returnFailRst(ChannelHandlerContext ctx,String s) {
        RpcResult rpcResult = new RpcResult();
        rpcResult.setSuccess(false);
        rpcResult.setMessage("netty服务端: 处理失败!" + s);
        ctx.writeAndFlush(rpcResult);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
SpringContextHolder
  • 1
@Component
public class SpringContextHolder implements ApplicationContextAware,DisposableBean {
    private static ApplicationContext applicationContext = null;
    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        SpringContextHolder.applicationContext = context;
    }
    /**
     * 提供外部访问
     * @return
     */
    public static ApplicationContext getApplicationContext() {
        if (applicationContext == null) {
            throw new IllegalStateException("applicaitonContext属性未注入, 请在SpringBoot启动类中注册SpringContextHolder.");
        }else {
            return applicationContext;
        }
    }
    @Override
    public void destroy() throws Exception {
        applicationContext = null;
//        SpringContextHolder.applicationContext = null;
    }
    //***********提供一系列 获取 bean的方法
    /**
     * 通过name获取 Bean
     * @param name 类名称
     * @return 实例对象
     */
    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }
    /**
     * 通过class获取Bean
     */
    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }
    /**
     * 通过name,以及Clazz返回指定的Bean
     */
    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name, clazz);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
UserServiceImpl
  • 1
@Service(value = "IUserService")
public class UserServiceImpl implements IUserService {
    private static Integer PORT = 9010;
    @Override
    public String sayHello(String msg) {
        System.out.println("这是端口("+ PORT +")服务端的第一个方法:sayHello---》 " + msg);
        int i = RandomUtil.randomInt(3);
        ThreadUtil.sleep(i * 1000);
        return "这是端口("+ PORT +")服务器返回数据 : " + msg;
    }
    @Override
    public String sayHelloWorld(String msg) {
        System.out.println("这是端口("+ PORT +")服务端的第二个方法sayHelloWorld---》 " + msg);
        return "这是端口("+ PORT +")服务端的第二个方法返回数据 : " + msg;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
LaGouV1Application
以上涉及知识点: 
1: springboot
2: springboot的自动装配
3: 其他
  • 1
  • 2
  • 3
  • 4
  • 5

rpc-consumer1工程:
RpcConsumer

public class RpcConsumer {
    /**
     * 线程池
     */
    private static ExecutorService executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
    );
    /**
     * 记录 服务端 处理器
     */
    private volatile static Map<String, UserClientHandler> userClientHandlerMap;

    /**
     * 记录最近一次服务处理 时间
     */
    private volatile static Map<String, Long> userClientHandlerTime = new ConcurrentHashMap<>();
    /**
     *
     */
    private static volatile long count = 0;
    /*
    1: 启动所有客户端
    2: 注册监听
     */
    private static void init() throws Exception {
        CuratorFramework client = ZookeeperUtils.getClient();
        String path = "/serverNodes";
        //初始化所有的客户端连接
        initClients(client, path);

        //注册zk 父路径监听
        registerZkPathListener(client, path);
    }
    /**
     * 注册路径监听
     * @param client
     * @param path
     */
    private static void registerZkPathListener(CuratorFramework client, String path) throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                ChildData childData = event.getData();
                PathChildrenCacheEvent.Type eventType = event.getType();
                System.out.println("zk节点监控: childData: " + childData);
                System.out.println("zk节点监控: eventType: " + eventType);
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        pathChildrenCache.rebuild();
                        break;
                    case CONNECTION_SUSPENDED:
                        break;
                    case CONNECTION_LOST:
                        System.out.println("Connection lost.");
                        break;
                    case CHILD_ADDED:
                        addClient(client, event.getData());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("Child updated");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("节点移除!");
                        delClient(client, event.getData());
                        break;
                    default:
                            break;
                }
            }
        });
    }
    /**
     * 移除 服务 端连接
     * @param client
     * @param data
     */
    private static void delClient(CuratorFramework client, ChildData data) {
        System.out.println("zk节点监控: 移除childData : " + data);
        byte[] dataData = data.getData();
        UserClientHandler userClientHandler = userClientHandlerMap.get(client);
        if(userClientHandler == null) {
            System.out.println("zk节点监控: 服务节点: " + client + "不存在!");
        }
        String serverKey = new String(dataData);
        if(userClientHandler.isState()) {
            System.out.println("zk节点监控: 服务节点:" + serverKey + "是激活的不能被删除!!!");
        } else {
            userClientHandlerMap.remove(serverKey);
            System.out.println("zk节点监控: delete server:" + serverKey);
        }
    }
    /**
     * 新增 服务 端连接
     * @param client
     * @param data
     */
    private static void addClient(CuratorFramework client, ChildData data) throws InterruptedException {
        System.out.println("zk节点监控: 新增ChildData: " + data);
        byte[] dataData = data.getData();
        String serverKey = new String(dataData);
        System.out.println("zk节点监控: add server :" + serverKey);
        String[] split = serverKey.split(":");
        initClient(split[0], split[1]);
    }

    /**
     * 初始化 和所有 服务端的链接 : netty 链接
     * @param client
     * @param path
     */
    private synchronized static void initClients(CuratorFramework client, String path) {
        if(userClientHandlerMap == null) {
            userClientHandlerMap = new HashMap<>(64);
        }
        try {
            List<String> list = client.getChildren().forPath(path);
            //
            for (String link : list) {
                String allPath = path + "/" + link;
                System.out.println("zk节点监控: 已注册服务节点路径: " + allPath);
                byte[] bytes = client.getData().forPath(allPath);
                String s = new String(bytes);
                System.out.println("zk节点监控: 已注册服务节点路径的值: " + s);
                String[] split = s.split(":");
                //初始化netty客户 端连接
                initClient(split[0], split[1]);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     *
     * @param ip
     * @param port
     */
    private static void initClient(String ip, String port) throws InterruptedException {
        //这儿的 UserClientHandler 将会被设置一个值 存放到容器当中, 然后会被取出发送客户端请求
        UserClientHandler userClientHandler = new UserClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JsonSerializer()));
                        pipeline.addLast(new RpcDecoder(RpcResult.class, new JsonSerializer()));
                        pipeline.addLast(userClientHandler);
                    }
                });
        bootstrap.connect(ip, Integer.parseInt(port)).sync();
        //打印这个存储
        userClientHandlerMap.put(ip + ":" + port, userClientHandler);
        System.out.println("初始化存储: 容器存储: " + (ip + ":" + port) + " = " + userClientHandler);
    }
    /**
     *
     * @return
     */
    private static Integer clearClients() {
        for (Map.Entry<String, UserClientHandler> key : userClientHandlerMap.entrySet()) {
            UserClientHandler userClientHandler = userClientHandlerMap.get(key);
            if(userClientHandler != null) {
                if(!userClientHandler.isState()) {
                    System.out.println("zk节点监控: 从服务列表中移除服务节点:" + key);
                    userClientHandlerMap.remove(key);
                    userClientHandlerTime.remove(key);
                }
            }

        }
        return userClientHandlerMap.size();
    }
    //现在来发请求 --> 获取远程服务响应 (netty + zookeeper)
    //只有接口, 通过代理 + 协议完成
    /**
     *
     * @param proxyClass
     * @return
     */
    public Object createProxy(Class<?> proxyClass) {
        Object proxyInstance = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{proxyClass},
                new RpcInvokeInvocationHandler());
        return proxyInstance;
    }
    /**
     * rpc 远程服务 反射调用类
     */
    private class RpcInvokeInvocationHandler implements InvocationHandler {
        /**
         * 代理类生成后, 在调用原方法的时候会执行这儿的逻辑 todo
         * @param proxy
         * @param method
         * @param args
         * @return
         * @throws Throwable
         */
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            /*
            1: 初始化所有的连接
            2: 初始化处理器容器
             */
            if(userClientHandlerMap == null) {
                init();
            }
            //判断当前节点是否可用 : 判断是否有可用的节点
            Integer availableCount = clearClients();
            if(availableCount == 0) {
                throw new RuntimeException("代理: 无服务节点可用!!!");
            }
            //构造协议对象
            RpcRequest rpcRequest = new RpcRequest();
            //获取服务类名
            //根据方法可以推理出来
            String className = method.getDeclaringClass().getName();
            String packageName = method.getDeclaringClass().getPackage().getName();
            //获取类
            String serviceName = StrUtil.removeAll(className, packageName + '.');
            System.out.println("代理: className: " + className + "\tpackageName: " + packageName + "\tserviceName: " + serviceName);
            rpcRequest.setRequestId(serviceName);
            rpcRequest.setClassName(className);
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setParameters(args);
            rpcRequest.setParameterTypes(method.getParameterTypes());
            System.out.println("代理: rpcRequest: " + rpcRequest);
            LoadBalanceService loadBalanceService;
            loadBalanceService = new ServiceLBRandom();
            //获取服务key :
            String serverKey = loadBalanceService.getService();
            System.out.println("代理: serverkey: " + serverKey);
            //这个key 是从服务处理容器中随机获取的
            UserClientHandler userClientHandler = userClientHandlerMap.get(serverKey);
            //发送消息
            userClientHandler.setToSendParam(rpcRequest);
            //使用线程池异步接收
            long begin = DateUtil.current();
            Object result = executorService.submit(userClientHandler).get();
            long spendMs = DateUtil.spendMs(begin);
            //记录耗时
            getUserClientHandlerTime().put(serverKey, spendMs);
            System.out.println("代理: call server:" + serverKey+",耗时:" + spendMs + "ms");
            //返回结果
            return result;
        }
    }

    /**
     *
     * @return
     */
    public static Map<String, Long> getUserClientHandlerTime() {
        return userClientHandlerTime;
    }

    public static Map<String, UserClientHandler> getUserClientHandlerMap() {
        return userClientHandlerMap;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
UserClientHandler
  • 1
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    /*
    1: 异步发送
    2: 线程池接收结果
    3: 发送数据
    4: 记录接收结果
    5: 加入等待唤醒机制 wait() notify()
    6: 上下文对象 Ctx ChannelHandlerContext
     */
    private ChannelHandlerContext ctx;
    /**
     * 发送参数
     */
    private Object toSendParam;
    /**
     * 结束结果
     */
    private Object toRecvResult;
    /**
     * 标识当前服务器的状态
     */
    private boolean state = false;
    /**
     * 设置发送 请求信息
     * @param toSendParam
     */
    public void setToSendParam(Object toSendParam) {
        this.toSendParam = toSendParam;
    }
    /**
     * 获取服务器状态 state
     * @return
     */
    public boolean isState() {
        return state;
    }
    /**
     * 阻塞等待结果
     * @return
     * @throws Exception
     */
    @Override
    public synchronized Object call() throws Exception {
        System.out.println("发送对象: " + this.toSendParam.toString());
        this.ctx.writeAndFlush(this.toSendParam);
        System.out.println("netty客户端: 睡眠线程(线程名) : " + Thread.currentThread().getName());
        wait();
        return toRecvResult;
    }
    /**
     * 激活 与服务段连接后发生事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        this.state = true;
    }
    /**
     * 与服务端 断开连接事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("netty客户端: 断开连接.");
        this.state = false;
    }
    /**
     * 读取 服务端发送的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(null != msg) {
            RpcResult rpcResult = (RpcResult) msg;
            //成功失败判断
            if(rpcResult.isSuccess()) {
                this.toRecvResult = rpcResult.getData();
            } else {
                new RuntimeException(rpcResult.getMessage());
            }
        }
        System.out.println("netty客户端: 唤醒线程(唤醒线程名) : " + Thread.currentThread().getName());
        notify();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
LoadBalanceService
  • 1
public interface LoadBalanceService {
    /**
     * 获取服务
     * @return
     */
    String getService();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
ServiceLBLeastTime
  • 1
public class ServiceLBLeastTime implements LoadBalanceService {
    @Override
    public String getService() {
        Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap();
        Map<String, Long> userClientHandlerTime = RpcConsumer.getUserClientHandlerTime();
        Set<String> services = userClientHandlerMap.keySet();
        long dr=-1;
        String serverKey="";
        //循环得到耗时最短的服务节点
        for (String service : services) {
            boolean b = userClientHandlerTime.containsKey(service);
            if(b) {
                Long aLong = userClientHandlerTime.get(service);
                if(aLong < dr || dr < 0) {
                    dr = aLong;
                    serverKey = service;
                }
            } else {
                //一次都没执行过 说明 没有承担负载
                serverKey = service;
                break;
            }
        }
        return serverKey;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
ServiceLBRandom
  • 1
public class ServiceLBRandom implements LoadBalanceService {

    @Override
    public String getService() {
        Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap();
        String[] keys = userClientHandlerMap.keySet().toArray(new String[0]);
        System.out.println("keys: " + keys);
        for (String x : keys) {
            System.out.println("keys : " + x);
        }
        int i = RandomUtil.randomInt(keys.length);
        String key = keys[i];
        System.out.println("随机获得的服务key : " + key);
        return key;
    }

    public static void main(String[] args) {
        HashMap<String, String> map = new HashMap<>();
        map.put("1", "1");
        map.put("3", "3");
        map.put("2", "2");
        map.put("5", "5");
        String[] strings = map.keySet().toArray(new String[0]);
        System.out.println("strings: " + strings);
        for (String x : strings) {
            System.out.println("keys : " + x);
        }
        /*
keys : 1
keys : 2
keys : 3
keys : 5
         */

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
扩展点思考

1: 如何定义一个订单服务呢?
首先我们在rpc-common工程中定义一个订单接口IOrderService, 然后我们在
rpc-provider中写好OrderServiceImpl实现类, 实现订单接口.
最后我们在消费端创建代理对象, 然后传输对应的协议对象进行请求订单服务即可.

2: 待扩展.
3: 关于负载均衡那一块可以使用策略模式进行优化,加入其它的算法,加入权重等值进行判断, 待优化.
4: 关于ip和端口的配置可以引入动态配置

遇到问题

1: netty在连接服务端后,写了如下代码, 出现了发送请求没有收到任何响应的情况
客户端这么写之后发送请求直接没有任何响应,没有任何报错,也没有任何结果.

 ChannelFuture channelFuture = bootstrap.connect().sync();
 channelFuture.channel().closeFuture();
  • 1
  • 2

2: notify()和wait()方法的使用必须结合sychronized关键字, 否则线程无法唤醒,或者是报非法的监视器异常: illegalMonitorException.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/854826
推荐阅读
相关标签
  

闽ICP备14008679号