赞
踩
扑街前言:前面说了netty的基本运用、Java的NIO等一系列的知识,这些知识已经可以做一个简单的rpc框架,本篇和下篇我们一起了解一个怎么完成一个rpc框架,当然个只是为了更好的了解rpc框架的基本逻辑,并不是真的可以用于业务使用。(认识到自己是菜鸟的第47天,今天突然记起来是多少天了)
在编写具体代码之前,我们要了解什么是rpc框架,它是由什么结构组成的,而最常见RPC框架就是Dubbo。
RPC 的主要功能目标是让构建分布式计算(应用)更容易,是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议规范,简单的来说就是像调用本地服务一样调用远程服务,对开发者而言是透明的。
一幅图片解释整个rpc的架构设计
上述简单描述一下rpc的一些简单概念,那么首先来编写服务端的代码,因为服务端的编写难度要小于客户端。从上面图片可以看出服务端主要是:服务注册(这里用zookeeper作为注册中心)、监听端口接收连接(包括请求解码、响应编码、请求处理,而编码和解码又有一次编、解码和二次编、解码)。当然这是简单的基本功能,比如限流、健康监测之类的后续再说,先迈出第一步很重要。
在代码开始之前,还需要简单的了解zookeeper的安装和使用,安装就是在zookeeper的官网下载一下最新的稳定版本,然后解压,打开bin目录,运行zkServer.cmd即可。具体的详细下篇文章再说,本篇文章重点是rpc框架的基本编写。至于Java中使用zookeeper,可以类比Redis的使用,Redis为Java提供了两个客户端 Jedis 和 Redisson,下面代码我们也是使用zookeeper了一个客户端 zkclient。
上述内容结合之前文章的相关的网络编程内容,可以先写一个服务端,代码如下。 首先结合上面的流程图,大致说一下逻辑。1、需要一个引导类,用于引导整个rpc服务的启动;2、有一个启动器,在启动器中完成服务注册和基于netty的监听;3、需要一个服务注册的server,基于zookeeper实现服务注册,需要封装zookeeper的连接和调用;4、需要一个netty的server,实现一次、二次编解码,并且实现请求处理调用具体的业务逻辑。
这里单独说明一下@PostConstruct注解,这个具体作用是当这个类被注册bean的时候,会运行一次被修饰的方法,但是在jdk9之后就去除了。
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
-
- @Configuration
- public class RpcServerBootstrap {
-
- @Autowired
- private RpcServerRnner rpcServerRnner;
-
- @PostConstruct
- public void initRpcServer (){
- // 运行启动器
- rpcServerRnner.run();
- }
- }
- import com.rpc.server.registry.RpcRegistry;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
-
- @Component
- public class RpcServerRnner {
- @Autowired
- private RpcRegistry rpcRegistry;
-
- @Resource
- private RpcServer rpcServer;
-
- /**
- * 用于服务注册和netty监听
- */
- public void run () {
- // 服务注册
- rpcRegistry.serviceRegistry();
-
- // 启动服务,监听端口,接收连接请求
- rpcServer.start();
- }
- }
- import lombok.Data;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
-
- @Data
- @Component
- public class RpcServerConfiguration {
-
- /**
- * ZK根节点名称
- */
- @Value("${rpc.server.zk.root}")
- private String zkRoot;
-
- /**
- * ZK地址信息
- */
- @Value("${rpc.server.zk.addr}")
- private String zkAddr;
-
-
- /**
- * RPC通讯端口
- */
- @Value("${rpc.network.port}")
- private int rpcPort;
-
- /**
- * Spring Boot 服务端口
- */
- @Value("${server.port}")
- private int serverPort;
-
- /**
- * ZK连接超时时间配置
- */
- @Value("${rpc.server.zk.timeout:10000}")
- private int connectTimeout;
- }
- import org.I0Itec.zkclient.ZkClient;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class ServerZkClientConfig {
-
- /**
- * RPC服务端配置
- */
- @Autowired
- private RpcServerConfiguration rpcServerConfiguration;
-
- /**
- * 声音ZK客户端
- * @return
- */
- @Bean
- public ZkClient zkClient() {
- return new ZkClient(rpcServerConfiguration.getZkAddr(), rpcServerConfiguration.getConnectTimeout());
- }
- }
- import org.I0Itec.zkclient.ZkClient;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- /**
- * Zookeeper连接操作接口
- */
- @Component
- public class ServerZKit {
-
- @Autowired
- private ZkClient zkClient;
-
- @Autowired
- private RpcServerConfiguration rpcServerConfiguration;
-
- /***
- * 根节点创建
- */
- public void createRootNode() {
- boolean exists = zkClient.exists(rpcServerConfiguration.getZkRoot());
- if (!exists) {
- zkClient.createPersistent(rpcServerConfiguration.getZkRoot());
- }
- }
-
- /***
- * 创建其他节点
- * @param path
- */
- public void createPersistentNode(String path) {
- String pathName = rpcServerConfiguration.getZkRoot() + "/" + path;
- boolean exists = zkClient.exists(pathName);
- if (!exists) {
- zkClient.createPersistent(pathName);
- }
- }
-
- /***
- * 创建节点
- * @param path
- */
- public void createNode(String path) {
- String pathName = rpcServerConfiguration.getZkRoot() + "/" + path;
- boolean exists = zkClient.exists(pathName);
- if (!exists) {
- zkClient.createEphemeral(pathName);
- }
- }
- }
- import org.springframework.core.annotation.AliasFor;
- import org.springframework.stereotype.Component;
- import java.lang.annotation.*;
-
- @Component
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface HrpcService {
-
- /**
- * 等同于@Component的value
- * @return
- */
- @AliasFor(annotation = Component.class)
- String value() default "";
-
- /**
- * 服务接口Class
- * @return
- */
- Class<?> interfaceClass() default void.class;
-
- /**
- * 服务接口名称
- * @return
- */
- String interfaceName() default "";
-
- /**
- * 服务版本号
- * @return
- */
- String version() default "";
-
- /**
- * 服务分组
- * @return
- */
- String group() default "";
- }
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.support.DefaultListableBeanFactory;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.stereotype.Component;
-
- import java.lang.annotation.Annotation;
- import java.util.Map;
-
- @Component
- public class SpringBeanFactory implements ApplicationContextAware {
-
- /**
- * ioc容器
- */
- private static ApplicationContext context;
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- context = applicationContext;
- }
-
- /*public static ApplicationContext getApplicationContext() {
- return context;
- }*/
-
- /**
- * 根据Class获取bean
- * @param cls
- * @param <T>
- * @return
- */
- public static <T> T getBean(Class<T> cls) {
- return context.getBean(cls);
- }
-
- /**
- * 根据beanName获取bean
- * @param beanName
- * @return
- */
- public static Object getBean(String beanName) {
- return context.getBean(beanName);
- }
-
- /***
- * 获取有指定注解的对象
- * @param annotationClass
- * @return
- */
- public static Map<String, Object> getBeanListByAnnotationClass(Class<? extends Annotation> annotationClass) {
- return context.getBeansWithAnnotation(annotationClass);
- }
-
- /**
- * 向容器注册单例bean
- * @param bean
- */
- public static void registerSingleton(Object bean) {
- DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
- // 让bean完成Spring初始化过程中所有增强器检验,只是不重新创建bean
- beanFactory.applyBeanPostProcessorsAfterInitialization(bean,bean.getClass().getName());
- //将bean以单例的形式入驻到容器中,此时通过bean.getClass().getName()或bean.getClass()都可以拿到放入Spring容器的Bean
- beanFactory.registerSingleton(bean.getClass().getName(),bean);
- }
- }
上面的准备工作基本上就做完了,下面开始正式的逻辑代码。这里提一点,当spring boot整合这个自定义框架的时候,可以有很多方式,这个不再细说,可以参考文章spring boot的自动配置,这里可以直接粗暴一点在spring boot项目的启动类上@SpringBootApplication(scanBasePackages ={"包路径","包路径"})。
再说一下@component 注解,这是由spring 提供,被其修饰的类被声明为spring 的组件,简单来说就是创建bean并放置IOC容器中。
- /**
- * 服务注册接口
- */
- public interface RpcRegistry {
- /**
- * 服务注册
- */
- void serviceRegistry();
- }
-
- import com.rpc.annotation.HrpcService;
- import com.rpc.server.config.RpcServerConfiguration;
- import com.rpc.server.registry.RpcRegistry;
- import com.rpc.spring.SpringBeanFactory;
- import com.rpc.util.IpUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- import java.util.Map;
-
- @Component
- @Slf4j
- public class ZkRegistry implements RpcRegistry {
- /**
- * 封装的bean工厂
- */
- @Autowired
- private SpringBeanFactory springBeanFactory;
-
- /**
- * 封装的zookeeper的客户端
- */
- @Autowired
- private ServerZKit zKitClient;
-
- /**
- * 配置信息对象
- */
- @Autowired
- private RpcServerConfiguration rpcServerConfiguration;
-
- @Override
- public void serviceRegistry() {
- /*
- * 1、首先要获取被HrpcService 注解修饰的,IOC中的所有的bean信息
- * 2、拿到bean之后,再获取bean上的HrpcService 注解对象
- * 3、拿到HrpcService 注解上面的接口信息
- * 4、创建zookeeper上的根节点,并获取服务端ip和配置文件中的zookeeper端口,创建以接口名称为key,ip+端口为value的子节点
- * 5、注册成功
- */
- // 获取被HrpcService 注解修饰的,IOC中的所有的bean信息
- Map<String, Object> annotationClass = springBeanFactory.getBeanListByAnnotationClass(HrpcService.class);
-
- // 没被注册信息,直接结束
- if (annotationClass == null || annotationClass.size() < 0){
- return;
- }
-
- // 迭代所有的bean
- for (Object bean : annotationClass.values()) {
- // 获取HrpcService 注解信息
- HrpcService hrpcService = bean.getClass().getAnnotation(HrpcService.class);
-
- // 获取HrpcService 注解的interfaceClass属性,也就是接口对象
- Class<?> interfaceClass = hrpcService.interfaceClass();
-
- // 获取接口的名称
- String name = interfaceClass.getName();
-
- /*
- * 开始往zookeeper添加节点
- */
- // 根节点
- zKitClient.createRootNode();
-
- // 子节点,用于接口名称
- zKitClient.createPersistentNode(name);
-
- // 获取ip
- String ip = IpUtil.getRealIp();
- // ip + 端口
- String node = ip + rpcServerConfiguration.getZkAddr();
- // 子节点对应下级节点
- zKitClient.createNode(name + "/" + node);
-
- // 打印日志
- log.info("服务{}-{}注册成功", name, node);
- }
- }
- }
- import io.protostuff.LinkedBuffer;
- import io.protostuff.ProtostuffIOUtil;
- import io.protostuff.Schema;
- import io.protostuff.runtime.RuntimeSchema;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.*;
- import java.util.concurrent.CopyOnWriteArrayList;
-
- /**
- * @description
- * @author: ts
- * @create:2021-04-08 10:31
- */
- @Slf4j
- public class ProtostuffUtil {
-
- //存储因为无法直接序列化/反序列化 而需要被包装的类型Class
- private static final Set<Class<?>> WRAPPER_SET = new HashSet<Class<?>>();
-
- static {
- WRAPPER_SET.add(List.class);
- WRAPPER_SET.add(ArrayList.class);
- WRAPPER_SET.add(CopyOnWriteArrayList.class);
- WRAPPER_SET.add(LinkedList.class);
- WRAPPER_SET.add(Stack.class);
- WRAPPER_SET.add(Vector.class);
- WRAPPER_SET.add(Map.class);
- WRAPPER_SET.add(HashMap.class);
- WRAPPER_SET.add(TreeMap.class);
- WRAPPER_SET.add(LinkedHashMap.class);
- WRAPPER_SET.add(Hashtable.class);
- WRAPPER_SET.add(SortedMap.class);
- WRAPPER_SET.add(Object.class);
- }
-
- //注册需要使用包装类进行序列化的Class对象
- public static void registerWrapperClass(Class<?> clazz) {
- WRAPPER_SET.add(clazz);
- }
-
- /**
- * 将对象序列化为字节数组
- * @param t
- * @param useWrapper 为true完全使用包装模式 为false则选择性的使用包装模式
- * @param <T>
- * @return
- */
- public static <T> byte[] serialize(T t,boolean useWrapper) {
- Object serializerObj = t;
- if (useWrapper) {
- serializerObj = SerializeDeserializeWrapper.build(t);
- }
- return serialize(serializerObj);
- }
-
- /**
- * 将对象序列化为字节数组
- * @param t
- * @param <T>
- * @return
- */
- public static <T> byte[] serialize(T t) {
- //获取序列化对象的class
- Class<T> clazz = (Class<T>) t.getClass();
- Object serializerObj = t;
- if (WRAPPER_SET.contains(clazz)) {
- serializerObj = SerializeDeserializeWrapper.build(t);//将原始序列化对象进行包装
- }
- return doSerialize(serializerObj);
- }
-
-
- /**
- * 执行序列化
- * @param t
- * @param <T>
- * @return
- */
- public static <T> byte[] doSerialize(T t) {
- //获取序列化对象的class
- Class<T> clazz = (Class<T>) t.getClass();
- //获取Schema
- // RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz);//根据给定的class创建schema
- /**
- * this is lazily created and cached by RuntimeSchema
- * so its safe to call RuntimeSchema.getSchema() over and over The getSchema method is also thread-safe
- */
- Schema<T> schema = RuntimeSchema.getSchema(clazz);//内部有缓存机制
- /**
- * Re-use (manage) this buffer to avoid allocating on every serialization
- */
- LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
- byte[] protostuff = null;
- try {
- protostuff = ProtostuffIOUtil.toByteArray(t, schema, buffer);
- } catch (Exception e){
- log.error("protostuff serialize error,{}",e.getMessage());
- }finally {
- buffer.clear();
- }
- return protostuff;
- }
-
-
- /**
- * 反序列化
- * @param data
- * @param clazz
- * @param <T>
- * @return
- */
- public static <T> T deserialize(byte[] data,Class<T> clazz) {
- //判断是否经过包装
- if (WRAPPER_SET.contains(clazz)) {
- SerializeDeserializeWrapper<T> wrapper = new SerializeDeserializeWrapper<T>();
- ProtostuffIOUtil.mergeFrom(data,wrapper,RuntimeSchema.getSchema(SerializeDeserializeWrapper.class));
- return wrapper.getData();
- }else {
- Schema<T> schema = RuntimeSchema.getSchema(clazz);
- T newMessage = schema.newMessage();
- ProtostuffIOUtil.mergeFrom(data,newMessage,schema);
- return newMessage;
- }
- }
-
-
- private static class SerializeDeserializeWrapper<T> {
- //被包装的数据
- T data;
-
- public static <T> SerializeDeserializeWrapper<T> build(T data){
- SerializeDeserializeWrapper<T> wrapper = new SerializeDeserializeWrapper<T>();
- wrapper.setData(data);
- return wrapper;
- }
-
- public T getData() {
- return data;
- }
-
- public void setData(T data) {
- this.data = data;
- }
- }
- }
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
- /**
- * 一次解码
- */
- public class FrameDecoder extends LengthFieldBasedFrameDecoder {
- public FrameDecoder() {
- super(Integer.MAX_VALUE, 0, 4, 0, 4);
- }
- }
- import com.rpc.util.ProtostuffUtil;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToMessageEncoder;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.List;
-
- /**
- * 服务端的二次编码
- */
- @Slf4j
- public class RpcResponseEncoder extends MessageToMessageEncoder<ByteBuf> {
- @Override
- protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
- /*
- * 首先将传入的ByteBuf 序列化为一个byte 数组
- * 然后用ChannelHandlerContext 构建一个buffer对象
- * 最后写入buffer,添加out写出
- */
- try {
- // 使用序列化工具,将msg序列化
- byte[] bytes = ProtostuffUtil.serialize(msg);
-
- // 由ctx分配构建一个buffer对象
- ByteBuf buffer = ctx.alloc().buffer(bytes.length);
- // 将数据交给buffer
- buffer.writeBytes(bytes);
-
- // 添加写出
- out.add(buffer);
- } catch (Exception e) {
- // 异常
- log.error("RpcResponseEncoder exception ,msg={}",e.getMessage());
- }
- }
- }
- import io.netty.handler.codec.LengthFieldPrepender;
-
- /**
- * 一次解码
- */
- public class FrameEncoder extends LengthFieldPrepender {
- public FrameEncoder() {
- super(4);
- }
- }
- import com.itheima.rpc.util.ProtostuffUtil;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToMessageEncoder;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.List;
-
- /**
- * 服务端的二次编码
- */
- @Slf4j
- public class RpcResponseEncoder extends MessageToMessageEncoder<ByteBuf> {
- @Override
- protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
- /*
- * 首先将传入的ByteBuf 序列化为一个byte 数组
- * 然后用ChannelHandlerContext 构建一个buffer对象
- * 最后写入buffer,添加out写出
- */
- try {
- // 使用序列化工具,将msg序列化
- byte[] bytes = ProtostuffUtil.serialize(msg);
-
- // 由ctx分配构建一个buffer对象
- ByteBuf buffer = ctx.alloc().buffer(bytes.length);
- // 将数据交给buffer
- buffer.writeBytes(bytes);
-
- // 添加写出
- out.add(buffer);
- } catch (Exception e) {
- // 异常
- log.error("RpcResponseEncoder exception ,msg={}",e.getMessage());
- }
- }
- }
- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- @Data
- @Builder
- @NoArgsConstructor
- @AllArgsConstructor
- public class RpcResponse {
- private String requestId;
- private Object result;
- private Throwable cause;
-
- public boolean isError() {
- return cause != null;
- }
- }
- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
- import lombok.NoArgsConstructor;
-
- @Data
- @Builder
- @NoArgsConstructor
- @AllArgsConstructor
- public class RpcRequest {
- private String requestId;
- private String className;
- private String methodName;
- private Class<?>[] parameterTypes;
- private Object[] parameters;
- }
- import com.itheima.rpc.data.RpcRequest;
- import com.itheima.rpc.data.RpcResponse;
- import com.itheima.rpc.spring.SpringBeanFactory;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import lombok.extern.slf4j.Slf4j;
-
- import java.lang.reflect.InvocationTargetException;
- import java.lang.reflect.Method;
-
- /**
- * 客户端请求业务调用
- */
- @Slf4j
- public class RpcRequestHandler extends SimpleChannelInboundHandler<RpcRequest> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
- /*
- * 先创建请求对应的响应对象
- * 从请求对象中获取相关接口信息,接口名称、方法名称、参数类型、参数
- * 根据接名称从容器中获取bean
- * 用反射,根据方法和参数类型拿到Method 对象
- * 将参数传入Method 对象,然后运行,拿到返回值
- * 将返回值给到响应对象
- * 一定要将响应对象回写给客户端
- */
- log.info("服务端收到的请求是:{}",rpcRequest);
- // 构建响应对象
- RpcResponse rpcResponse = new RpcResponse();
- // 于请求对象关联
- rpcResponse.setRequestId(rpcRequest.getRequestId());
-
- try {
- // 接口名称
- String interfaceName = rpcRequest.getClassName();
- // 方法名称
- String methodName = rpcRequest.getMethodName();
- // 参数类型
- Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
- // 实际参数
- Object[] parameters = rpcRequest.getParameters();
-
- // 从容器中获取bean实例
- Object bean = SpringBeanFactory.getBean(Class.forName(interfaceName));
- // 反射获取method 对象
- Method method = bean.getClass().getMethod(methodName, parameterTypes);
- // 执行对应方法,拿到返回值
- Object result = method.invoke(bean, parameters);
-
- // 添加到响应对象
- rpcResponse.setResult(result);
- } catch (Exception e) {
- log.error("RpcRequestHandler exception,msg={}",e.getMessage());
- rpcResponse.setCause(e);
- } finally {
- // 将结果写回
- log.info("向客户端发送响应,{}",rpcResponse);
- ctx.writeAndFlush(rpcResponse);
- }
- }
- }
- import com.rpc.netty.codec.FrameDecoder;
- import com.rpc.netty.codec.FrameEncoder;
- import com.rpc.netty.codec.RpcRequestDecoder;
- import com.rpc.netty.codec.RpcResponseEncoder;
- import com.rpc.netty.handler.RpcRequestHandler;
- import com.rpc.server.boot.RpcServer;
- import com.rpc.server.config.RpcServerConfiguration;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.util.NettyRuntime;
- import io.netty.util.concurrent.DefaultThreadFactory;
- import org.springframework.beans.factory.annotation.Autowired;
-
- public class NettServer implements RpcServer {
- @Autowired
- private RpcServerConfiguration rpcServerConfiguration;
-
-
- @Override
- public void start() {
- /*
- * 首先要获取三个线程池,用于注册serverSocketChannel、socketChannel 还有业务逻辑处理(即请求调用)
- * 再构建引导类,并配置先关信息,将注册请求和处理读写的线程池配置到引导类中,然后配置好相关的 handler(第一、二次编解码,请求调用),注意请求调用使用线程池做处理
- * 最后启动引导类,绑定监听端口,设置同步
- * 监控等待关闭
- * 优雅的关闭线程池
- */
- // 构建注册serverSocketChannel 的线程池
- NioEventLoopGroup boss = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
- // 构建注册socketChannel 的线程池
- NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
- // 构建业务调用的线程池
- NioEventLoopGroup rpcRequestHandler = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2, new DefaultThreadFactory("reqRequestHandler"));
-
- // 业务逻辑调用
- RpcRequestHandler requestHandler = new RpcRequestHandler();
-
- try {
- // 构建引导类
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- // 配置引导类
- serverBootstrap.group(boss, worker)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG,1024)
- .childOption(ChannelOption.TCP_NODELAY,true)
- .childOption(ChannelOption.SO_KEEPALIVE,true)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- // 获取 pipeline
- ChannelPipeline pipeline = socketChannel.pipeline();
-
- /*
- * 配置 handler
- */
- // 一级编码
- pipeline.addLast("FrameEncoder", new FrameEncoder());
- // 二级编码
- pipeline.addLast("RpcResponseEncoder", new RpcResponseEncoder());
-
- // 一级解码
- pipeline.addLast("FrameDecoder", new FrameDecoder());
- // 二级解码
- pipeline.addLast("RpcRequestDecoder", new RpcRequestDecoder());
-
- // 业务线程池调用
- pipeline.addLast(rpcRequestHandler, "requestHandler", requestHandler);
- }
- });
-
- // 启动引导类,监听端口,设置同步
- ChannelFuture future = serverBootstrap.bind(rpcServerConfiguration.getRpcPort()).sync();
- // 监控等待关闭
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- // 异常
- e.printStackTrace();
- } finally {
- /*
- * 优雅的关闭各个线程池
- */
- boss.shutdownGracefully();
- worker.shutdownGracefully();
- rpcRequestHandler.shutdownGracefully();
- }
- }
- }
上述内容就已经完成了一个服务端的创建,后续文章在说客户端,本次结束。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。