当前位置:   article > 正文

手写一个简单rpc框架(一)_springbeanfactory.getbeanlistbyannotationclass方法

springbeanfactory.getbeanlistbyannotationclass方法

        扑街前言:前面说了netty的基本运用、Java的NIO等一系列的知识,这些知识已经可以做一个简单的rpc框架,本篇和下篇我们一起了解一个怎么完成一个rpc框架,当然个只是为了更好的了解rpc框架的基本逻辑,并不是真的可以用于业务使用。(认识到自己是菜鸟的第47天,今天突然记起来是多少天了)


        在编写具体代码之前,我们要了解什么是rpc框架,它是由什么结构组成的,而最常见RPC框架就是Dubbo。

        RPC 的主要功能目标是让构建分布式计算(应用)更容易,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议规范,简单的来说就是像调用本地服务一样调用远程服务,对开发者而言是透明的。

RPC的优势

  1. RPC框架一般使用长链接,不必每次通信都要3次握手,减少网络开销。
  2. RPC框架一般都有注册中心,有丰富的监控管理。
  3. 发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
  4. 协议私密,安全性较高。
  5. rpc 能做到协议更简单内容更小,效率更高。
  6. rpc是面向服务的更高级的抽象,支持服务注册发现,负载均衡,超时重试,熔断降级等高级特性。

RPC架构设计

        一幅图片解释整个rpc的架构设计


        上述简单描述一下rpc的一些简单概念,那么首先来编写服务端的代码,因为服务端的编写难度要小于客户端。从上面图片可以看出服务端主要是:服务注册(这里用zookeeper作为注册中心)、监听端口接收连接(包括请求解码、响应编码、请求处理,而编码和解码又有一次编、解码和二次编、解码)。当然这是简单的基本功能,比如限流、健康监测之类的后续再说,先迈出第一步很重要。

 zookeeper

        在代码开始之前,还需要简单的了解zookeeper的安装和使用,安装就是在zookeeper的官网下载一下最新的稳定版本,然后解压,打开bin目录,运行zkServer.cmd即可。具体的详细下篇文章再说,本篇文章重点是rpc框架的基本编写。至于Java中使用zookeeper,可以类比Redis的使用,Redis为Java提供了两个客户端 Jedis 和 Redisson,下面代码我们也是使用zookeeper了一个客户端 zkclient。


代码逻辑示例

        上述内容结合之前文章的相关的网络编程内容,可以先写一个服务端,代码如下。 首先结合上面的流程图,大致说一下逻辑。1、需要一个引导类,用于引导整个rpc服务的启动;2、有一个启动器,在启动器中完成服务注册和基于netty的监听;3、需要一个服务注册的server,基于zookeeper实现服务注册,需要封装zookeeper的连接和调用;4、需要一个netty的server,实现一次、二次编解码,并且实现请求处理调用具体的业务逻辑。

引导类

        这里单独说明一下@PostConstruct注解,这个具体作用是当这个类被注册bean的时候,会运行一次被修饰的方法,但是在jdk9之后就去除了。

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.context.annotation.Configuration;
  3. import javax.annotation.PostConstruct;
  4. @Configuration
  5. public class RpcServerBootstrap {
  6. @Autowired
  7. private RpcServerRnner rpcServerRnner;
  8. @PostConstruct
  9. public void initRpcServer (){
  10. // 运行启动器
  11. rpcServerRnner.run();
  12. }
  13. }

启动器

  1. import com.rpc.server.registry.RpcRegistry;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.Resource;
  5. @Component
  6. public class RpcServerRnner {
  7. @Autowired
  8. private RpcRegistry rpcRegistry;
  9. @Resource
  10. private RpcServer rpcServer;
  11. /**
  12. * 用于服务注册和netty监听
  13. */
  14. public void run () {
  15. // 服务注册
  16. rpcRegistry.serviceRegistry();
  17. // 启动服务,监听端口,接收连接请求
  18. rpcServer.start();
  19. }
  20. }

配置信息对象

  1. import lombok.Data;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.stereotype.Component;
  4. @Data
  5. @Component
  6. public class RpcServerConfiguration {
  7. /**
  8. * ZK根节点名称
  9. */
  10. @Value("${rpc.server.zk.root}")
  11. private String zkRoot;
  12. /**
  13. * ZK地址信息
  14. */
  15. @Value("${rpc.server.zk.addr}")
  16. private String zkAddr;
  17. /**
  18. * RPC通讯端口
  19. */
  20. @Value("${rpc.network.port}")
  21. private int rpcPort;
  22. /**
  23. * Spring Boot 服务端口
  24. */
  25. @Value("${server.port}")
  26. private int serverPort;
  27. /**
  28. * ZK连接超时时间配置
  29. */
  30. @Value("${rpc.server.zk.timeout:10000}")
  31. private int connectTimeout;
  32. }

zookeeper客户端连接

  1. import org.I0Itec.zkclient.ZkClient;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class ServerZkClientConfig {
  7. /**
  8. * RPC服务端配置
  9. */
  10. @Autowired
  11. private RpcServerConfiguration rpcServerConfiguration;
  12. /**
  13. * 声音ZK客户端
  14. * @return
  15. */
  16. @Bean
  17. public ZkClient zkClient() {
  18. return new ZkClient(rpcServerConfiguration.getZkAddr(), rpcServerConfiguration.getConnectTimeout());
  19. }
  20. }

zookeeper连接操作接口

  1. import org.I0Itec.zkclient.ZkClient;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * Zookeeper连接操作接口
  6. */
  7. @Component
  8. public class ServerZKit {
  9. @Autowired
  10. private ZkClient zkClient;
  11. @Autowired
  12. private RpcServerConfiguration rpcServerConfiguration;
  13. /***
  14. * 根节点创建
  15. */
  16. public void createRootNode() {
  17. boolean exists = zkClient.exists(rpcServerConfiguration.getZkRoot());
  18. if (!exists) {
  19. zkClient.createPersistent(rpcServerConfiguration.getZkRoot());
  20. }
  21. }
  22. /***
  23. * 创建其他节点
  24. * @param path
  25. */
  26. public void createPersistentNode(String path) {
  27. String pathName = rpcServerConfiguration.getZkRoot() + "/" + path;
  28. boolean exists = zkClient.exists(pathName);
  29. if (!exists) {
  30. zkClient.createPersistent(pathName);
  31. }
  32. }
  33. /***
  34. * 创建节点
  35. * @param path
  36. */
  37. public void createNode(String path) {
  38. String pathName = rpcServerConfiguration.getZkRoot() + "/" + path;
  39. boolean exists = zkClient.exists(pathName);
  40. if (!exists) {
  41. zkClient.createEphemeral(pathName);
  42. }
  43. }
  44. }

用于服务请求连接的注解

  1. import org.springframework.core.annotation.AliasFor;
  2. import org.springframework.stereotype.Component;
  3. import java.lang.annotation.*;
  4. @Component
  5. @Target(ElementType.TYPE)
  6. @Retention(RetentionPolicy.RUNTIME)
  7. @Documented
  8. public @interface HrpcService {
  9. /**
  10. * 等同于@Component的value
  11. * @return
  12. */
  13. @AliasFor(annotation = Component.class)
  14. String value() default "";
  15. /**
  16. * 服务接口Class
  17. * @return
  18. */
  19. Class<?> interfaceClass() default void.class;
  20. /**
  21. * 服务接口名称
  22. * @return
  23. */
  24. String interfaceName() default "";
  25. /**
  26. * 服务版本号
  27. * @return
  28. */
  29. String version() default "";
  30. /**
  31. * 服务分组
  32. * @return
  33. */
  34. String group() default "";
  35. }

创建一个Spring的Bean工厂,用于封装获取IOC容器中的bean信息

  1. import org.springframework.beans.BeansException;
  2. import org.springframework.beans.factory.support.DefaultListableBeanFactory;
  3. import org.springframework.context.ApplicationContext;
  4. import org.springframework.context.ApplicationContextAware;
  5. import org.springframework.stereotype.Component;
  6. import java.lang.annotation.Annotation;
  7. import java.util.Map;
  8. @Component
  9. public class SpringBeanFactory implements ApplicationContextAware {
  10. /**
  11. * ioc容器
  12. */
  13. private static ApplicationContext context;
  14. @Override
  15. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  16. context = applicationContext;
  17. }
  18. /*public static ApplicationContext getApplicationContext() {
  19. return context;
  20. }*/
  21. /**
  22. * 根据Class获取bean
  23. * @param cls
  24. * @param <T>
  25. * @return
  26. */
  27. public static <T> T getBean(Class<T> cls) {
  28. return context.getBean(cls);
  29. }
  30. /**
  31. * 根据beanName获取bean
  32. * @param beanName
  33. * @return
  34. */
  35. public static Object getBean(String beanName) {
  36. return context.getBean(beanName);
  37. }
  38. /***
  39. * 获取有指定注解的对象
  40. * @param annotationClass
  41. * @return
  42. */
  43. public static Map<String, Object> getBeanListByAnnotationClass(Class<? extends Annotation> annotationClass) {
  44. return context.getBeansWithAnnotation(annotationClass);
  45. }
  46. /**
  47. * 向容器注册单例bean
  48. * @param bean
  49. */
  50. public static void registerSingleton(Object bean) {
  51. DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
  52. // 让bean完成Spring初始化过程中所有增强器检验,只是不重新创建bean
  53. beanFactory.applyBeanPostProcessorsAfterInitialization(bean,bean.getClass().getName());
  54. //将bean以单例的形式入驻到容器中,此时通过bean.getClass().getName()或bean.getClass()都可以拿到放入Spring容器的Bean
  55. beanFactory.registerSingleton(bean.getClass().getName(),bean);
  56. }
  57. }

服务注册

        上面的准备工作基本上就做完了,下面开始正式的逻辑代码。这里提一点,当spring boot整合这个自定义框架的时候,可以有很多方式,这个不再细说,可以参考文章spring boot的自动配置,这里可以直接粗暴一点在spring boot项目的启动类上@SpringBootApplication(scanBasePackages ={"包路径","包路径"})。

        再说一下@component 注解,这是由spring 提供,被其修饰的类被声明为spring 的组件,简单来说就是创建bean并放置IOC容器中。

  1. /**
  2. * 服务注册接口
  3. */
  4. public interface RpcRegistry {
  5. /**
  6. * 服务注册
  7. */
  8. void serviceRegistry();
  9. }
  10. import com.rpc.annotation.HrpcService;
  11. import com.rpc.server.config.RpcServerConfiguration;
  12. import com.rpc.server.registry.RpcRegistry;
  13. import com.rpc.spring.SpringBeanFactory;
  14. import com.rpc.util.IpUtil;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.springframework.stereotype.Component;
  17. import java.util.Map;
  18. @Component
  19. @Slf4j
  20. public class ZkRegistry implements RpcRegistry {
  21. /**
  22. * 封装的bean工厂
  23. */
  24. @Autowired
  25. private SpringBeanFactory springBeanFactory;
  26. /**
  27. * 封装的zookeeper的客户端
  28. */
  29. @Autowired
  30. private ServerZKit zKitClient;
  31. /**
  32. * 配置信息对象
  33. */
  34. @Autowired
  35. private RpcServerConfiguration rpcServerConfiguration;
  36. @Override
  37. public void serviceRegistry() {
  38. /*
  39. * 1、首先要获取被HrpcService 注解修饰的,IOC中的所有的bean信息
  40. * 2、拿到bean之后,再获取bean上的HrpcService 注解对象
  41. * 3、拿到HrpcService 注解上面的接口信息
  42. * 4、创建zookeeper上的根节点,并获取服务端ip和配置文件中的zookeeper端口,创建以接口名称为key,ip+端口为value的子节点
  43. * 5、注册成功
  44. */
  45. // 获取被HrpcService 注解修饰的,IOC中的所有的bean信息
  46. Map<String, Object> annotationClass = springBeanFactory.getBeanListByAnnotationClass(HrpcService.class);
  47. // 没被注册信息,直接结束
  48. if (annotationClass == null || annotationClass.size() < 0){
  49. return;
  50. }
  51. // 迭代所有的bean
  52. for (Object bean : annotationClass.values()) {
  53. // 获取HrpcService 注解信息
  54. HrpcService hrpcService = bean.getClass().getAnnotation(HrpcService.class);
  55. // 获取HrpcService 注解的interfaceClass属性,也就是接口对象
  56. Class<?> interfaceClass = hrpcService.interfaceClass();
  57. // 获取接口的名称
  58. String name = interfaceClass.getName();
  59. /*
  60. * 开始往zookeeper添加节点
  61. */
  62. // 根节点
  63. zKitClient.createRootNode();
  64. // 子节点,用于接口名称
  65. zKitClient.createPersistentNode(name);
  66. // 获取ip
  67. String ip = IpUtil.getRealIp();
  68. // ip + 端口
  69. String node = ip + rpcServerConfiguration.getZkAddr();
  70. // 子节点对应下级节点
  71. zKitClient.createNode(name + "/" + node);
  72. // 打印日志
  73. log.info("服务{}-{}注册成功", name, node);
  74. }
  75. }
  76. }

序列化工具

  1. import io.protostuff.LinkedBuffer;
  2. import io.protostuff.ProtostuffIOUtil;
  3. import io.protostuff.Schema;
  4. import io.protostuff.runtime.RuntimeSchema;
  5. import lombok.extern.slf4j.Slf4j;
  6. import java.util.*;
  7. import java.util.concurrent.CopyOnWriteArrayList;
  8. /**
  9. * @description
  10. * @author: ts
  11. * @create:2021-04-08 10:31
  12. */
  13. @Slf4j
  14. public class ProtostuffUtil {
  15. //存储因为无法直接序列化/反序列化 而需要被包装的类型Class
  16. private static final Set<Class<?>> WRAPPER_SET = new HashSet<Class<?>>();
  17. static {
  18. WRAPPER_SET.add(List.class);
  19. WRAPPER_SET.add(ArrayList.class);
  20. WRAPPER_SET.add(CopyOnWriteArrayList.class);
  21. WRAPPER_SET.add(LinkedList.class);
  22. WRAPPER_SET.add(Stack.class);
  23. WRAPPER_SET.add(Vector.class);
  24. WRAPPER_SET.add(Map.class);
  25. WRAPPER_SET.add(HashMap.class);
  26. WRAPPER_SET.add(TreeMap.class);
  27. WRAPPER_SET.add(LinkedHashMap.class);
  28. WRAPPER_SET.add(Hashtable.class);
  29. WRAPPER_SET.add(SortedMap.class);
  30. WRAPPER_SET.add(Object.class);
  31. }
  32. //注册需要使用包装类进行序列化的Class对象
  33. public static void registerWrapperClass(Class<?> clazz) {
  34. WRAPPER_SET.add(clazz);
  35. }
  36. /**
  37. * 将对象序列化为字节数组
  38. * @param t
  39. * @param useWrapper 为true完全使用包装模式 为false则选择性的使用包装模式
  40. * @param <T>
  41. * @return
  42. */
  43. public static <T> byte[] serialize(T t,boolean useWrapper) {
  44. Object serializerObj = t;
  45. if (useWrapper) {
  46. serializerObj = SerializeDeserializeWrapper.build(t);
  47. }
  48. return serialize(serializerObj);
  49. }
  50. /**
  51. * 将对象序列化为字节数组
  52. * @param t
  53. * @param <T>
  54. * @return
  55. */
  56. public static <T> byte[] serialize(T t) {
  57. //获取序列化对象的class
  58. Class<T> clazz = (Class<T>) t.getClass();
  59. Object serializerObj = t;
  60. if (WRAPPER_SET.contains(clazz)) {
  61. serializerObj = SerializeDeserializeWrapper.build(t);//将原始序列化对象进行包装
  62. }
  63. return doSerialize(serializerObj);
  64. }
  65. /**
  66. * 执行序列化
  67. * @param t
  68. * @param <T>
  69. * @return
  70. */
  71. public static <T> byte[] doSerialize(T t) {
  72. //获取序列化对象的class
  73. Class<T> clazz = (Class<T>) t.getClass();
  74. //获取Schema
  75. // RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz);//根据给定的class创建schema
  76. /**
  77. * this is lazily created and cached by RuntimeSchema
  78. * so its safe to call RuntimeSchema.getSchema() over and over The getSchema method is also thread-safe
  79. */
  80. Schema<T> schema = RuntimeSchema.getSchema(clazz);//内部有缓存机制
  81. /**
  82. * Re-use (manage) this buffer to avoid allocating on every serialization
  83. */
  84. LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
  85. byte[] protostuff = null;
  86. try {
  87. protostuff = ProtostuffIOUtil.toByteArray(t, schema, buffer);
  88. } catch (Exception e){
  89. log.error("protostuff serialize error,{}",e.getMessage());
  90. }finally {
  91. buffer.clear();
  92. }
  93. return protostuff;
  94. }
  95. /**
  96. * 反序列化
  97. * @param data
  98. * @param clazz
  99. * @param <T>
  100. * @return
  101. */
  102. public static <T> T deserialize(byte[] data,Class<T> clazz) {
  103. //判断是否经过包装
  104. if (WRAPPER_SET.contains(clazz)) {
  105. SerializeDeserializeWrapper<T> wrapper = new SerializeDeserializeWrapper<T>();
  106. ProtostuffIOUtil.mergeFrom(data,wrapper,RuntimeSchema.getSchema(SerializeDeserializeWrapper.class));
  107. return wrapper.getData();
  108. }else {
  109. Schema<T> schema = RuntimeSchema.getSchema(clazz);
  110. T newMessage = schema.newMessage();
  111. ProtostuffIOUtil.mergeFrom(data,newMessage,schema);
  112. return newMessage;
  113. }
  114. }
  115. private static class SerializeDeserializeWrapper<T> {
  116. //被包装的数据
  117. T data;
  118. public static <T> SerializeDeserializeWrapper<T> build(T data){
  119. SerializeDeserializeWrapper<T> wrapper = new SerializeDeserializeWrapper<T>();
  120. wrapper.setData(data);
  121. return wrapper;
  122. }
  123. public T getData() {
  124. return data;
  125. }
  126. public void setData(T data) {
  127. this.data = data;
  128. }
  129. }
  130. }

一次编码

  1. import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
  2. /**
  3. * 一次解码
  4. */
  5. public class FrameDecoder extends LengthFieldBasedFrameDecoder {
  6. public FrameDecoder() {
  7. super(Integer.MAX_VALUE, 0, 4, 0, 4);
  8. }
  9. }

二次编码

  1. import com.rpc.util.ProtostuffUtil;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToMessageEncoder;
  5. import lombok.extern.slf4j.Slf4j;
  6. import java.util.List;
  7. /**
  8. * 服务端的二次编码
  9. */
  10. @Slf4j
  11. public class RpcResponseEncoder extends MessageToMessageEncoder<ByteBuf> {
  12. @Override
  13. protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
  14. /*
  15. * 首先将传入的ByteBuf 序列化为一个byte 数组
  16. * 然后用ChannelHandlerContext 构建一个buffer对象
  17. * 最后写入buffer,添加out写出
  18. */
  19. try {
  20. // 使用序列化工具,将msg序列化
  21. byte[] bytes = ProtostuffUtil.serialize(msg);
  22. // 由ctx分配构建一个buffer对象
  23. ByteBuf buffer = ctx.alloc().buffer(bytes.length);
  24. // 将数据交给buffer
  25. buffer.writeBytes(bytes);
  26. // 添加写出
  27. out.add(buffer);
  28. } catch (Exception e) {
  29. // 异常
  30. log.error("RpcResponseEncoder exception ,msg={}",e.getMessage());
  31. }
  32. }
  33. }

一次解码

  1. import io.netty.handler.codec.LengthFieldPrepender;
  2. /**
  3. * 一次解码
  4. */
  5. public class FrameEncoder extends LengthFieldPrepender {
  6. public FrameEncoder() {
  7. super(4);
  8. }
  9. }

二次解码

  1. import com.itheima.rpc.util.ProtostuffUtil;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToMessageEncoder;
  5. import lombok.extern.slf4j.Slf4j;
  6. import java.util.List;
  7. /**
  8. * 服务端的二次编码
  9. */
  10. @Slf4j
  11. public class RpcResponseEncoder extends MessageToMessageEncoder<ByteBuf> {
  12. @Override
  13. protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
  14. /*
  15. * 首先将传入的ByteBuf 序列化为一个byte 数组
  16. * 然后用ChannelHandlerContext 构建一个buffer对象
  17. * 最后写入buffer,添加out写出
  18. */
  19. try {
  20. // 使用序列化工具,将msg序列化
  21. byte[] bytes = ProtostuffUtil.serialize(msg);
  22. // 由ctx分配构建一个buffer对象
  23. ByteBuf buffer = ctx.alloc().buffer(bytes.length);
  24. // 将数据交给buffer
  25. buffer.writeBytes(bytes);
  26. // 添加写出
  27. out.add(buffer);
  28. } catch (Exception e) {
  29. // 异常
  30. log.error("RpcResponseEncoder exception ,msg={}",e.getMessage());
  31. }
  32. }
  33. }

响应对象

  1. import lombok.AllArgsConstructor;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @Data
  6. @Builder
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class RpcResponse {
  10. private String requestId;
  11. private Object result;
  12. private Throwable cause;
  13. public boolean isError() {
  14. return cause != null;
  15. }
  16. }

请求对象

  1. import lombok.AllArgsConstructor;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @Data
  6. @Builder
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class RpcRequest {
  10. private String requestId;
  11. private String className;
  12. private String methodName;
  13. private Class<?>[] parameterTypes;
  14. private Object[] parameters;
  15. }

业务逻辑调用Handler

  1. import com.itheima.rpc.data.RpcRequest;
  2. import com.itheima.rpc.data.RpcResponse;
  3. import com.itheima.rpc.spring.SpringBeanFactory;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import lombok.extern.slf4j.Slf4j;
  7. import java.lang.reflect.InvocationTargetException;
  8. import java.lang.reflect.Method;
  9. /**
  10. * 客户端请求业务调用
  11. */
  12. @Slf4j
  13. public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
  14. @Override
  15. protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
  16. /*
  17. * 先创建请求对应的响应对象
  18. * 从请求对象中获取相关接口信息,接口名称、方法名称、参数类型、参数
  19. * 根据接名称从容器中获取bean
  20. * 用反射,根据方法和参数类型拿到Method 对象
  21. * 将参数传入Method 对象,然后运行,拿到返回值
  22. * 将返回值给到响应对象
  23. * 一定要将响应对象回写给客户端
  24. */
  25. log.info("服务端收到的请求是:{}",rpcRequest);
  26. // 构建响应对象
  27. RpcResponse rpcResponse = new RpcResponse();
  28. // 于请求对象关联
  29. rpcResponse.setRequestId(rpcRequest.getRequestId());
  30. try {
  31. // 接口名称
  32. String interfaceName = rpcRequest.getClassName();
  33. // 方法名称
  34. String methodName = rpcRequest.getMethodName();
  35. // 参数类型
  36. Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
  37. // 实际参数
  38. Object[] parameters = rpcRequest.getParameters();
  39. // 从容器中获取bean实例
  40. Object bean = SpringBeanFactory.getBean(Class.forName(interfaceName));
  41. // 反射获取method 对象
  42. Method method = bean.getClass().getMethod(methodName, parameterTypes);
  43. // 执行对应方法,拿到返回值
  44. Object result = method.invoke(bean, parameters);
  45. // 添加到响应对象
  46. rpcResponse.setResult(result);
  47. } catch (Exception e) {
  48. log.error("RpcRequestHandler exception,msg={}",e.getMessage());
  49. rpcResponse.setCause(e);
  50. } finally {
  51. // 将结果写回
  52. log.info("向客户端发送响应,{}",rpcResponse);
  53. ctx.writeAndFlush(rpcResponse);
  54. }
  55. }
  56. }

netty代码实现

  1. import com.rpc.netty.codec.FrameDecoder;
  2. import com.rpc.netty.codec.FrameEncoder;
  3. import com.rpc.netty.codec.RpcRequestDecoder;
  4. import com.rpc.netty.codec.RpcResponseEncoder;
  5. import com.rpc.netty.handler.RpcRequestHandler;
  6. import com.rpc.server.boot.RpcServer;
  7. import com.rpc.server.config.RpcServerConfiguration;
  8. import io.netty.bootstrap.ServerBootstrap;
  9. import io.netty.channel.ChannelFuture;
  10. import io.netty.channel.ChannelInitializer;
  11. import io.netty.channel.ChannelOption;
  12. import io.netty.channel.ChannelPipeline;
  13. import io.netty.channel.nio.NioEventLoopGroup;
  14. import io.netty.channel.socket.SocketChannel;
  15. import io.netty.channel.socket.nio.NioServerSocketChannel;
  16. import io.netty.util.NettyRuntime;
  17. import io.netty.util.concurrent.DefaultThreadFactory;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. public class NettServer implements RpcServer {
  20. @Autowired
  21. private RpcServerConfiguration rpcServerConfiguration;
  22. @Override
  23. public void start() {
  24. /*
  25. * 首先要获取三个线程池,用于注册serverSocketChannel、socketChannel 还有业务逻辑处理(即请求调用)
  26. * 再构建引导类,并配置先关信息,将注册请求和处理读写的线程池配置到引导类中,然后配置好相关的 handler(第一、二次编解码,请求调用),注意请求调用使用线程池做处理
  27. * 最后启动引导类,绑定监听端口,设置同步
  28. * 监控等待关闭
  29. * 优雅的关闭线程池
  30. */
  31. // 构建注册serverSocketChannel 的线程池
  32. NioEventLoopGroup boss = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
  33. // 构建注册socketChannel 的线程池
  34. NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
  35. // 构建业务调用的线程池
  36. NioEventLoopGroup rpcRequestHandler = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2, new DefaultThreadFactory("reqRequestHandler"));
  37. // 业务逻辑调用
  38. RpcRequestHandler requestHandler = new RpcRequestHandler();
  39. try {
  40. // 构建引导类
  41. ServerBootstrap serverBootstrap = new ServerBootstrap();
  42. // 配置引导类
  43. serverBootstrap.group(boss, worker)
  44. .channel(NioServerSocketChannel.class)
  45. .option(ChannelOption.SO_BACKLOG,1024)
  46. .childOption(ChannelOption.TCP_NODELAY,true)
  47. .childOption(ChannelOption.SO_KEEPALIVE,true)
  48. .childHandler(new ChannelInitializer<SocketChannel>() {
  49. @Override
  50. protected void initChannel(SocketChannel socketChannel) throws Exception {
  51. // 获取 pipeline
  52. ChannelPipeline pipeline = socketChannel.pipeline();
  53. /*
  54. * 配置 handler
  55. */
  56. // 一级编码
  57. pipeline.addLast("FrameEncoder", new FrameEncoder());
  58. // 二级编码
  59. pipeline.addLast("RpcResponseEncoder", new RpcResponseEncoder());
  60. // 一级解码
  61. pipeline.addLast("FrameDecoder", new FrameDecoder());
  62. // 二级解码
  63. pipeline.addLast("RpcRequestDecoder", new RpcRequestDecoder());
  64. // 业务线程池调用
  65. pipeline.addLast(rpcRequestHandler, "requestHandler", requestHandler);
  66. }
  67. });
  68. // 启动引导类,监听端口,设置同步
  69. ChannelFuture future = serverBootstrap.bind(rpcServerConfiguration.getRpcPort()).sync();
  70. // 监控等待关闭
  71. future.channel().closeFuture().sync();
  72. } catch (Exception e) {
  73. // 异常
  74. e.printStackTrace();
  75. } finally {
  76. /*
  77. * 优雅的关闭各个线程池
  78. */
  79. boss.shutdownGracefully();
  80. worker.shutdownGracefully();
  81. rpcRequestHandler.shutdownGracefully();
  82. }
  83. }
  84. }

        上述内容就已经完成了一个服务端的创建,后续文章在说客户端,本次结束。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/678848
推荐阅读
相关标签
  

闽ICP备14008679号