赞
踩
学习summer的github,主要用于记录方便后续查看代码
- package com.summer.simplerpc.annotation;
-
- import org.springframework.stereotype.Component;
-
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
-
- /**
- * RPC consumer注解
- *
- * @author summer
- * @version $Id: SimpleRpcProviderBean.java, v 0.1 2022年01月16日 11:53 AM summer Exp $
- */
- @Retention(RetentionPolicy.RUNTIME)
- //注解打在属性上
- @Target(ElementType.FIELD)
- @Component
- public @interface SimpleRpcConsumer {
- /**
- * 服务版本号
- * @return
- */
- String serviceVersion() default "1.0.0";
-
- /**
- * 注册中心类型-默认zk
- * @return
- */
- String registerType() default "zookeeper";
-
- /**
- * 注册中心地址
- * @return
- */
- String registerAddress() default "127.0.0.1:2181";
- }

- package com.summer.simplerpc.annotation;
-
- import org.springframework.stereotype.Component;
-
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
-
- /**
- * RPC provider注解
- *
- * @author summer
- * @version $Id: SimpleRpcProviderBean.java, v 0.1 2022年01月16日 11:53 AM summer Exp $
- */
- @Retention(RetentionPolicy.RUNTIME)
- //注解打在类上
- @Target(ElementType.TYPE)
- @Component
- public @interface SimpleRpcProvider {
- Class<?> serviceInterface() default Object.class;
- String serviceVersion() default "1.0.0";
- }

- package com.summer.simplerpc.consumer;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * rpc consumer starter
- *
- * @author summer
- * @version $Id: SimplerConsumerAutoConfiguration.java, v 0.1 2022年01月16日 6:19 PM summer Exp $
- */
- @Configuration
- @Slf4j
- public class SimplerConsumerAutoConfiguration {
-
- @Bean
- public static BeanFactoryPostProcessor initRpcConsumer() throws Exception {
- return new SimpleRpcConsumerPostProcessor();
- }
- }

- package com.summer.simplerpc.consumer;
-
- import com.summer.simplerpc.registry.ServiceRegistry;
- import com.summer.simplerpc.registry.cache.ServiceProviderCache;
- import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;
- import com.summer.simplerpc.registry.zk.ZkServiceRegistry;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.FactoryBean;
-
- import java.lang.reflect.Proxy;
-
- /**
- * 生成rpc consumer代理bean的FactoryBean
- *
- * @author summer
- * @version $Id: SimpleRpcConsumerFactoryBean.java, v 0.1 2022年01月18日 8:58 AM summer Exp $
- */
- @Slf4j
- public class SimpleRpcConsumerFactoryBean implements FactoryBean {
-
- /**
- * 调用的服务接口类
- */
- private Class<?> interfaceClass;
-
- /**
- * 服务版本号
- */
- private String serviceVersion;
-
- /**
- * 注册中心类型
- */
- private String registryType;
-
- /**
- * 注册中心地址
- */
- private String registryAddress;
-
- /**
- * 实际的bean
- */
- private Object object;
-
- /**
- * init方法,通过动态代理生成bean
- *
- * @throws Exception
- */
- public void init() throws Exception {
- ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache();
- ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(registryAddress, serviceProviderCache);
-
- //动态代理
- this.object = Proxy.newProxyInstance(
- interfaceClass.getClassLoader(),
- new Class<?>[] {interfaceClass},
- new SimpleRpcInvokeHandler<>(this.serviceVersion, zkServiceRegistry));
- log.info("SimpleRpcConsumerFactoryBean getObject {}", interfaceClass.getName());
- }
-
- /**
- * 返回创建的bean实例
- *
- * @return
- * @throws Exception
- */
- @Override
- public Object getObject() throws Exception {
- return this.object;
- }
-
- /**
- * 创建的bean实例的类型
- *
- * @return
- */
- @Override
- public Class<?> getObjectType() {
- return interfaceClass;
- }
-
- /**
- * 创建的bean实例的作用域
- *
- * @return
- */
- @Override
- public boolean isSingleton() {
- return true;
- }
-
- public void setInterfaceClass(Class<?> interfaceClass) {
- this.interfaceClass = interfaceClass;
- }
-
- public void setServiceVersion(String serviceVersion) {
- this.serviceVersion = serviceVersion;
- }
-
- public void setRegistryType(String registryType) {
- this.registryType = registryType;
- }
-
- public void setRegistryAddress(String registryAddress) {
- this.registryAddress = registryAddress;
- }
- }

- package com.summer.simplerpc.consumer;
-
- import com.summer.simplerpc.io.RPCDecoder;
- import com.summer.simplerpc.io.RPCEncoder;
- import com.summer.simplerpc.model.SimpleRpcRequest;
- import com.summer.simplerpc.model.SimpleRpcResponse;
- import com.summer.simplerpc.registry.ServiceRegistry;
- import com.summer.simplerpc.registry.model.ServiceMetaConfig;
- import com.summer.simplerpc.util.ServiceUtils;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import lombok.extern.slf4j.Slf4j;
-
- /**
- * consumer netty handler
- *
- * @author summer
- * @version $Id: SimpleRpcConsumerNettyHandler.java, v 0.1 2022年01月19日 8:23 AM summer Exp $
- */
- @Slf4j
- public class SimpleRpcConsumerNettyHandler extends SimpleChannelInboundHandler<SimpleRpcResponse> {
-
- /**
- * 注册中心
- */
- private ServiceRegistry serviceRegistry;
-
- /**
- * netty EventLoopGroup
- */
- private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
-
- /**
- * netty channel
- */
- private Channel channel;
-
- /**
- * rpc response
- */
- private SimpleRpcResponse rpcResponse;
-
- /**
- * lock
- */
- private final Object lock = new Object();
-
- /**
- * 构造函数
- *
- * @param serviceRegistry
- */
- public SimpleRpcConsumerNettyHandler(ServiceRegistry serviceRegistry) {
- this.serviceRegistry = serviceRegistry;
- }
-
- /**
- * 发起RPC网络调用请求
- *
- * @param simpleRpcRequest 请求参数
- * @return
- */
- public SimpleRpcResponse sendRpcRequest(SimpleRpcRequest simpleRpcRequest) {
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroup)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline()
- .addLast(new RPCEncoder())
- .addLast(new RPCDecoder())
- //通过.class获取此类型的实例(https://www.cnblogs.com/penglee/p/3993033.html)
- .addLast(SimpleRpcConsumerNettyHandler.this);
- }
- });
-
- String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());
- ServiceMetaConfig serviceMetaConfig = this.serviceRegistry.discovery(key);
- if (serviceMetaConfig == null) {
- log.error("sendRpcRequest fail,serviceMetaConfig not found");
- throw new Exception("serviceMetaConfig not found in registry");
- }
-
- log.info("sendRpcRequest begin,serviceMetaConfig=" + serviceMetaConfig.toString() + ",key=" + key);
- final ChannelFuture channelFuture = bootstrap.connect(serviceMetaConfig.getAddress(), serviceMetaConfig.getPort())
- .sync();
- channelFuture.addListener((ChannelFutureListener)args0 -> {
- if (channelFuture.isSuccess()) {
- log.info("rpc invoke success,");
- } else {
- log.info("rpc invoke fail," + channelFuture.cause().getStackTrace());
- eventLoopGroup.shutdownGracefully();
- }
- });
-
- this.channel = channelFuture.channel();
- this.channel.writeAndFlush(simpleRpcRequest).sync();
-
- synchronized (this.lock) {
- log.info("sendRpcRequest lock.wait");
- this.lock.wait();
- }
-
- log.info("get rpc response=" + rpcResponse.toString());
- return this.rpcResponse;
- } catch (Exception e) {
- log.error("sendRpcRequest exception,", e);
- return null;
- } finally {
- //关闭相关连接
- if (this.channel != null) {
- this.channel.close();
- }
- if (this.eventLoopGroup != null) {
- this.eventLoopGroup.shutdownGracefully();
- }
- }
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcResponse simpleRpcResponse) throws Exception {
- this.rpcResponse = simpleRpcResponse;
-
- log.info("rpc consumer netty handler,channelRead0,rpcResponse=" + rpcResponse);
-
- //收到远程网络的rpc response,通知调用端
- synchronized (lock) {
- log.info("channelRead0 simpleRpcResponse lock.notifyAll");
- lock.notifyAll();
- }
- }
- }

- package com.summer.simplerpc.consumer;
-
- import com.summer.simplerpc.annotation.SimpleRpcConsumer;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.BeanClassLoaderAware;
- import org.springframework.beans.factory.config.BeanDefinition;
- import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
- import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
- import org.springframework.beans.factory.support.BeanDefinitionBuilder;
- import org.springframework.beans.factory.support.BeanDefinitionRegistry;
- import org.springframework.core.annotation.AnnotationUtils;
- import org.springframework.util.ClassUtils;
- import org.springframework.util.ReflectionUtils;
- import org.springframework.util.StringUtils;
-
- import java.lang.reflect.Field;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * rpc consumer具体处理器。负责扫描代码中含@SimpleRpcConsumer注解的属性,进行代理实现远端网络调用。
- *
- * @author summer
- * @version $Id: SimpleRpcConsumerPostProcessor.java, v 0.1 2022年01月18日 8:28 AM summer Exp $
- */
- @Slf4j
- public class SimpleRpcConsumerPostProcessor implements BeanFactoryPostProcessor, BeanClassLoaderAware {
-
- /**
- * classloader
- */
- private ClassLoader classLoader;
-
- /**
- * 保存BeanDefinition列表
- */
- private Map<String, BeanDefinition> beanDefinitions = new HashMap<>();
-
- @Override
- public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
- log.info("SimpleRpcConsumerPostProcessor postProcessBeanFactory begin");
- //遍历bean,改些打了SimpleRpcConsumer注解的属性
- for (String beanName : beanFactory.getBeanDefinitionNames()) {
- BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanName);
- if (StringUtils.isEmpty(beanDefinition.getBeanClassName())) {
- continue;
- }
- Class<?> clazz = ClassUtils.resolveClassName(beanDefinition.getBeanClassName(), this.classLoader);
- ReflectionUtils.doWithFields(clazz, this::processConsumerBeanDefinition);
- }
-
- //将BeanDefinition重新注入spring容器
- BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry)beanFactory;
- for (Map.Entry<String, BeanDefinition> entry : this.beanDefinitions.entrySet()) {
- log.info("register BeanDefinition[" + entry.getKey() + "," + entry.getValue() + "]");
- beanDefinitionRegistry.registerBeanDefinition(entry.getKey(), entry.getValue());
- }
- }
-
- /**
- * 带有rpc消费者注解的bean定义的重处理
- *
- * @param field 属性
- */
- private void processConsumerBeanDefinition(Field field) {
- SimpleRpcConsumer simpleRpcConsumer = AnnotationUtils.getAnnotation(field, SimpleRpcConsumer.class);
-
- //筛选出打了rpc consumer注解的属性
- if (simpleRpcConsumer == null) {
- return;
- }
-
- log.info("processConsumerBeanDefinition,find a simpleRpcConsumer field:" + field.toString());
- BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(SimpleRpcConsumerFactoryBean.class);
- beanDefinitionBuilder.setInitMethodName("init");
- beanDefinitionBuilder.addPropertyValue("interfaceClass", field.getType());
- beanDefinitionBuilder.addPropertyValue("serviceVersion", simpleRpcConsumer.serviceVersion());
- beanDefinitionBuilder.addPropertyValue("registryType", simpleRpcConsumer.registerType());
- beanDefinitionBuilder.addPropertyValue("registryAddress", simpleRpcConsumer.registerAddress());
-
- BeanDefinition beanDefinition = beanDefinitionBuilder.getBeanDefinition();
-
- log.info("processConsumerBeanDefinition,find a simpleRpcConsumer field,result beanDefinition:" + field.toString());
-
- beanDefinitions.put(field.getName(), beanDefinition);
- }
-
- /**
- * 获取classloader
- *
- * @param classLoader
- */
- @Override
- public void setBeanClassLoader(ClassLoader classLoader) {
- this.classLoader = classLoader;
- }
- }

- package com.summer.simplerpc.consumer;
-
- import com.summer.simplerpc.model.SimpleRpcRequest;
- import com.summer.simplerpc.model.SimpleRpcResponse;
- import com.summer.simplerpc.registry.ServiceRegistry;
- import lombok.extern.slf4j.Slf4j;
-
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.util.UUID;
-
- /**
- * RPC调用动态代理handler实现
- *
- * @author summer
- * @version $Id: SimpleRpcInvokeHandler.java, v 0.1 2022年01月18日 9:19 AM summer Exp $
- */
- @Slf4j
- public class SimpleRpcInvokeHandler<T> implements InvocationHandler {
-
- /**
- * 服务版本号
- */
- private String serviceVersion;
-
- /**
- * 注册中心
- */
- private ServiceRegistry serviceRegistry;
-
- /**
- * 默认构造函数
- */
- public SimpleRpcInvokeHandler() {
-
- }
-
- public SimpleRpcInvokeHandler(String serviceVersion, ServiceRegistry serviceRegistry) {
- this.serviceVersion = serviceVersion;
- this.serviceRegistry = serviceRegistry;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- SimpleRpcRequest simpleRpcRequest = new SimpleRpcRequest();
- simpleRpcRequest.setBizNO(UUID.randomUUID().toString());
- simpleRpcRequest.setClassName(method.getDeclaringClass().getName());
- simpleRpcRequest.setServiceVersion(this.serviceVersion);
- simpleRpcRequest.setMethodName(method.getName());
- simpleRpcRequest.setParamTypes(method.getParameterTypes());
- simpleRpcRequest.setParamValues(args);
-
- log.info("begin simpleRpcRequest=" + simpleRpcRequest.toString());
-
- SimpleRpcConsumerNettyHandler simpleRpcConsumerNettyHandler = new SimpleRpcConsumerNettyHandler(this.serviceRegistry);
- SimpleRpcResponse simpleRpcResponse = simpleRpcConsumerNettyHandler.sendRpcRequest(simpleRpcRequest);
-
- log.info("result simpleRpcResponse=" + simpleRpcResponse);
- return simpleRpcResponse.getData();
- }
- }

- package com.summer.simplerpc.io;
-
- import com.caucho.hessian.io.HessianInput;
- import com.caucho.hessian.io.HessianOutput;
- import lombok.extern.slf4j.Slf4j;
-
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
-
- /**
- * 序列化工具类(Hessian序列化协议)
- *
- * @author summer
- * @version $Id: HessianUtils.java, v 0.1 2022年01月16日 5:00 PM summer Exp $
- */
- @Slf4j
- public class HessianUtils {
-
- /**
- * 序列化
- *
- * @param object
- * @param <T>
- * @return
- */
- public final static <T> byte[] serialize(T object) {
- ByteArrayOutputStream byteArrayOutputStream = null;
- HessianOutput hessianOutput = null;
-
- try {
- byteArrayOutputStream = new ByteArrayOutputStream();
- hessianOutput = new HessianOutput(byteArrayOutputStream);
- hessianOutput.writeObject(object);
- return byteArrayOutputStream.toByteArray();
- } catch (IOException ioe) {
- log.error("serialize io exception,object=" + object, ioe);
- } finally {
- if (byteArrayOutputStream != null) {
- try {
- byteArrayOutputStream.close();
- } catch (IOException ioe) {
- log.error("serialize byteArrayOutputStream close io exception,object=" + object, ioe);
- }
- }
-
- if (hessianOutput != null) {
- try {
- hessianOutput.close();
- } catch (IOException ioe) {
- log.error("serialize hessianOutput close io exception,object=" + object, ioe);
- }
- }
- }
-
- return null;
- }
-
- /**
- * 反序列化
- *
- * @param bytes
- * @param <T>
- * @return
- */
- public final static <T> T deserialize(byte[] bytes) {
- ByteArrayInputStream byteArrayInputStream = null;
- HessianInput hessianInput = null;
- try {
- byteArrayInputStream = new ByteArrayInputStream(bytes);
- hessianInput = new HessianInput(byteArrayInputStream);
- return (T)hessianInput.readObject();
- } catch (IOException ioe) {
- log.error("deserialize io exception,bytes=" + bytes, ioe);
- } finally {
- if (byteArrayInputStream != null) {
- try {
- byteArrayInputStream.close();
- } catch (IOException ioe) {
- log.error("deserialize byteArrayOutputStream close io exception,bytes=" + bytes, ioe);
- }
- }
- if (hessianInput != null) {
- try {
- hessianInput.close();
- } catch (Exception ioe) {
- log.error("deserialize hessianInput close io exception,bytes=" + bytes, ioe);
- }
- }
- }
- return null;
- }
- }

- package com.summer.simplerpc.io;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- import lombok.extern.slf4j.Slf4j;
-
- import java.util.List;
-
- /**
- * 自定义解码器,解析出RPC请求对象
- *
- * @author summer
- * @version $Id: RPCDecoder.java, v 0.1 2022年01月16日 5:22 PM summer Exp $
- */
- @Slf4j
- public class RPCDecoder extends ByteToMessageDecoder {
-
- @Override
- public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
- //入参校验
- if (in.readableBytes() < 4) {
- log.error("decode fail,input ByteBuf illegal,in.readableBytes=" + in.readableBytes());
- return;
- }
-
- in.markReaderIndex();
- //读取长度内容
- int dataLen = in.readInt();
- //剩余可读内容小于预定长度
- if (in.readableBytes() < dataLen) {
- log.error("decode fail,input ByteBuf illegal,in.readableBytes {0} less than dataLen {1}", in.readableBytes(), dataLen);
- return;
- }
-
- //读取实际内容
- byte[] actualDataBytes = new byte[dataLen];
- in.readBytes(actualDataBytes);
- //反序列化
- Object dataObj = HessianUtils.deserialize(actualDataBytes);
- if (dataObj == null) {
- log.error("decode fail,input ByteBuf illegal,dataObj null,actualDataBytes={0}", actualDataBytes);
- return;
- }
- out.add(dataObj);
- }
- }

- package com.summer.simplerpc.io;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.MessageToByteEncoder;
- import lombok.extern.slf4j.Slf4j;
-
- /**
- * 自动以编码器,将rpc返回结果编码为字节流
- *
- * @author summer
- * @version $Id: RPCEncoder.java, v 0.1 2022年01月16日 5:29 PM summer Exp $
- */
- @Slf4j
- public class RPCEncoder extends MessageToByteEncoder {
-
- @Override
- protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
- byte[] data = HessianUtils.serialize(o);
- if (data == null) {
- log.error("encode fail,result data null,result object=" + o);
- }
-
- byteBuf.writeInt(data.length);
- byteBuf.writeBytes(data);
- }
- }

- package com.summer.simplerpc.model;
-
- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.stereotype.Component;
-
- /**
- * RPC通用配置信息,提供用户自定义的功能
- *
- * @author summer
- * @version $Id: RpcCommonProperty.java, v 0.1 2022年01月16日 6:09 PM summer Exp $
- */
- @Data
- @Component
- @ConfigurationProperties(prefix = "summer.simplerpc")
- public class RpcCommonProperty {
-
- /**
- * 服务提供方地址
- */
- private String serviceAddress;
-
- /**
- * 注册中心类型
- */
- private String registryType;
-
- /**
- * 注册中心地址
- */
- private String registryAddress;
- }

- package com.summer.simplerpc.model;
-
- import lombok.Data;
-
- import java.io.Serializable;
-
- /**
- * RPC请求领域模型
- *
- * @author summer
- * @version $Id: SimpleRpcRequest.java, v 0.1 2022年01月16日 5:37 PM summer Exp $
- */
- @Data
- public class SimpleRpcRequest implements Serializable {
-
- private static final long serialVersionUID = -6523563004185159591L;
-
- /**
- * 业务流水号
- */
- private String bizNO;
-
- /**
- * 服务类名
- */
- private String className;
-
- /**
- * 服务方法名
- */
- private String methodName;
-
- /**
- * 服务版本号
- */
- private String serviceVersion;
-
- /**
- * 参数类型列表
- */
- private Class<?>[] paramTypes;
-
- /**
- * 参数值列表
- */
- private Object[] paramValues;
- }

- package com.summer.simplerpc.model;
-
- import lombok.Data;
-
- import java.io.Serializable;
-
- /**
- * rpc处理结果
- *
- * @author summer
- * @version $Id: SimpleRpcResponse.java, v 0.1 2022年01月16日 5:52 PM summer Exp $
- */
- @Data
- public class SimpleRpcResponse implements Serializable {
-
- private static final long serialVersionUID = 7306531831668743451L;
-
- /**
- * 业务流水号
- */
- private String bizNO;
-
- /**
- * 错误结果提示消息
- */
- private String msg;
-
- /**
- * 实际结果
- */
- private Object data;
-
- }

- package com.summer.simplerpc.provider;
-
- import com.google.common.util.concurrent.ThreadFactoryBuilder;
- import com.summer.simplerpc.annotation.SimpleRpcProvider;
- import com.summer.simplerpc.io.RPCDecoder;
- import com.summer.simplerpc.io.RPCEncoder;
- import com.summer.simplerpc.registry.ServiceRegistry;
- import com.summer.simplerpc.registry.model.ServiceMetaConfig;
- import com.summer.simplerpc.util.ServiceUtils;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.beans.factory.config.BeanPostProcessor;
-
- import java.util.Map;
- import java.util.concurrent.*;
-
- /**
- * rpc provider功能实现。
- *
- * 负责扫描服务provider注解bean,注册服务到注册中心,启动netty监听。
- * 提供RPC请求实际处理。
- *
- * @author summer
- * @version $Id: SimpleRpcProviderBean.java, v 0.1 2022年01月16日 12:19 PM summer Exp $
- */
- @Slf4j
- public class SimpleRpcProviderBean implements InitializingBean, BeanPostProcessor {
-
- /**
- * 地址
- */
- private String address;
-
- /**
- * 服务注册中心
- */
- private ServiceRegistry serviceRegistry;
-
- /**
- * 服务提供bean的缓存map
- */
- private Map<String, Object> providerBeanMap = new ConcurrentHashMap<>(64);
-
- /**
- * 处理实际rpc请求的线程池
- */
- private static ThreadPoolExecutor rpcThreadPoolExecutor;
-
- private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("simplerpc-provider-pool-%d").build();
-
- /**
- * netty相关
- */
- private EventLoopGroup bossGroup = null;
- private EventLoopGroup workerGroup = null;
-
- /**
- * 构造函数
- *
- * @param address 地址
- * @param serviceRegistry 服务注册中心
- */
- public SimpleRpcProviderBean(String address, ServiceRegistry serviceRegistry) {
- this.address = address;
- this.serviceRegistry = serviceRegistry;
- }
-
- @Override
- public void afterPropertiesSet() throws Exception {
- //启动netty服务监听
- new Thread(() -> {
- try {
- startNettyServer();
- } catch (InterruptedException e) {
- log.error("startNettyServer exception,", e);
- }
- }).start();
- }
-
- /**
- * 提交rpc处理任务
- *
- * @param task 任务
- */
- public static void submit(Runnable task) {
- if (rpcThreadPoolExecutor == null) {
- synchronized (SimpleRpcProviderBean.class) {
- if (rpcThreadPoolExecutor == null) {
- rpcThreadPoolExecutor = new ThreadPoolExecutor(100, 100,
- 600L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),
- threadFactory);
- }
- }
- }
- rpcThreadPoolExecutor.submit(task);
- }
-
- /**
- * 启动netty服务监听
- *
- * @throws InterruptedException
- */
- private void startNettyServer() throws InterruptedException {
- if (workerGroup != null && bossGroup != null) {
- return;
- }
-
- log.info("startNettyServer begin");
-
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
-
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) {
- socketChannel.pipeline()
- .addLast(new LengthFieldBasedFrameDecoder(65535,0,4,0,0))
- .addLast(new RPCDecoder())
- .addLast(new RPCEncoder())
- .addLast(new SimpleRpcProviderNettyHandler(providerBeanMap))
- ;
- }
- })
- .option(ChannelOption.SO_BACKLOG, 512)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
-
- String[] array = address.split(":");
- String host = array[0];
- int port = Integer.parseInt(array[1]);
-
- //启动服务
- ChannelFuture future = serverBootstrap.bind(host, port).sync();
-
- log.info(String.format("startNettyServer,host=%s,port=%s", host, port));
-
- future.channel().closeFuture().sync();
- }
-
- @Override
- public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
- return bean;
- }
-
- @Override
- public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
- //获取bean上的注解
- SimpleRpcProvider simpleRpcProvider = bean.getClass().getAnnotation(SimpleRpcProvider.class);
- if (simpleRpcProvider == null) {
- //无注解直接return原始的bean
- return bean;
- }
-
- //缓存保存
- String serviceName = simpleRpcProvider.serviceInterface().getName();
- String version = simpleRpcProvider.serviceVersion();
- providerBeanMap.put(ServiceUtils.buildServiceKey(serviceName, version), bean);
-
- log.info("postProcessAfterInitialization find a simpleRpcProvider[" + serviceName + "," + version + "]");
-
- //将服务注册到注册中心
- String[] addressArray = address.split(ServiceUtils.SPLIT_CHAR);
- String host = addressArray[0];
- String port = addressArray[1];
-
- ServiceMetaConfig serviceMetaConfig = new ServiceMetaConfig();
- serviceMetaConfig.setAddress(host);
- serviceMetaConfig.setName(serviceName);
- serviceMetaConfig.setVersion(version);
- serviceMetaConfig.setPort(Integer.parseInt(port));
-
- try {
- serviceRegistry.register(serviceMetaConfig);
- log.info("register service success,serviceMetaConfig=" + serviceMetaConfig.toString());
- } catch (Exception e) {
- log.error("register service fail,serviceMetaConfig=" + serviceMetaConfig.toString(), e);
- }
-
- return bean;
- }
- }

- package com.summer.simplerpc.provider;
-
- import com.summer.simplerpc.model.SimpleRpcRequest;
- import com.summer.simplerpc.model.SimpleRpcResponse;
- import com.summer.simplerpc.util.ServiceUtils;
- import io.netty.channel.ChannelFutureListener;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.cglib.reflect.FastClass;
-
- import java.util.Map;
-
- /**
- * RPC核心处理逻辑handler
- *
- * @author summer
- * @version $Id: SimpleRpcProviderNettyHandler.java, v 0.1 2022年01月16日 5:36 PM summer Exp $
- */
- @Slf4j
- public class SimpleRpcProviderNettyHandler extends SimpleChannelInboundHandler<SimpleRpcRequest> {
-
- /**
- * 提供rpc服务的实例缓存map
- */
- private Map<String, Object> handlerMap;
-
- /**
- * 构造函数
- *
- * @param handlerMap
- */
- public SimpleRpcProviderNettyHandler(Map<String, Object> handlerMap) {
- this.handlerMap = handlerMap;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext channelHandlerContext, SimpleRpcRequest simpleRpcRequest) throws Exception {
- SimpleRpcProviderBean.submit(() -> {
- log.debug("Receive rpc request {}", simpleRpcRequest.getBizNO());
- SimpleRpcResponse simpleRpcResponse = new SimpleRpcResponse();
- simpleRpcResponse.setBizNO(simpleRpcRequest.getBizNO());
- try {
- Object result = doHandle(simpleRpcRequest);
- simpleRpcResponse.setData(result);
- } catch (Throwable throwable) {
- simpleRpcResponse.setMsg(throwable.toString());
- log.error("handle rpc request error", throwable);
- }
- channelHandlerContext.writeAndFlush(simpleRpcResponse).addListener(
- (ChannelFutureListener) channelFuture ->
- log.info("return response for request " + simpleRpcRequest.getBizNO() + ",simpleRpcResponse=" + simpleRpcResponse));
- });
- }
-
- /**
- * 通过反射,执行实际的rpc请求
- * @param simpleRpcRequest
- * @return
- */
- private Object doHandle(SimpleRpcRequest simpleRpcRequest) throws Exception {
- String key = ServiceUtils.buildServiceKey(simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());
- if (handlerMap == null || handlerMap.get(key) == null) {
- log.error("doHandle,the provider {0} not exist,", simpleRpcRequest.getClassName(), simpleRpcRequest.getServiceVersion());
- throw new RuntimeException("the provider not exist");
- }
-
- log.info("doHandle,simpleRpcRequest=" + simpleRpcRequest.toString());
-
- Object provider = handlerMap.get(key);
-
- //通过动态代理执行实际的调用
- FastClass fastClass = FastClass.create(provider.getClass());
- return fastClass.invoke(fastClass.getIndex(simpleRpcRequest.getMethodName(), simpleRpcRequest.getParamTypes()),
- provider, simpleRpcRequest.getParamValues());
- }
- }

- package com.summer.simplerpc.provider;
-
- import com.summer.simplerpc.model.RpcCommonProperty;
- import com.summer.simplerpc.registry.ServiceRegistry;
- import com.summer.simplerpc.registry.cache.ServiceProviderCache;
- import com.summer.simplerpc.registry.cache.ServiceProviderLocalCache;
- import com.summer.simplerpc.registry.zk.ZkServiceRegistry;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * rpc provider starter
- *
- * @author summer
- * @version $Id: SimplerRpcProviderAutoConfiguration.java, v 0.1 2022年01月16日 6:19 PM summer Exp $
- */
- @Configuration
- @Slf4j
- public class SimplerRpcProviderAutoConfiguration {
-
- @Bean
- public SimpleRpcProviderBean initRpcProvider() throws Exception {
- RpcCommonProperty rpcCommonProperty = new RpcCommonProperty();
- rpcCommonProperty.setServiceAddress("127.0.0.1:50001");
- rpcCommonProperty.setRegistryAddress("127.0.0.1:2181");
-
- log.info("===================SimplerRpcProviderAutoConfiguration init,rpcCommonProperty=" + rpcCommonProperty.toString());
- ServiceProviderCache serviceProviderCache = new ServiceProviderLocalCache();
- ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(rpcCommonProperty.getRegistryAddress(), serviceProviderCache);
-
- return new SimpleRpcProviderBean(rpcCommonProperty.getServiceAddress(), zkServiceRegistry);
- }
- }

- package com.summer.simplerpc.registry;
-
- import com.summer.simplerpc.registry.model.ServiceMetaConfig;
-
- /**
- * 注册中心服务接口定义
- *
- * @author summer
- * @version $Id: ServiceRegistry.java, v 0.1 2022年01月16日 10:56 AM summer Exp $
- */
- public interface ServiceRegistry {
-
- /**
- * 注册服务
- *
- * @param serviceMetaConfig 服务元数据配置
- * @throws Exception
- */
- void register(ServiceMetaConfig serviceMetaConfig) throws Exception;
-
- /**
- * 取消注册服务
- *
- * @param serviceMetaConfig 服务元数据配置
- * @throws Exception
- */
- void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception;
-
- /**
- * 服务发现
- *
- * @param serviceName 服务名
- * @return
- * @throws Exception
- */
- ServiceMetaConfig discovery(String serviceName) throws Exception;
- }

- package com.summer.simplerpc.registry.model;
-
- import lombok.Data;
-
- /**
- * 服务元数据配置领域模型
- *
- * @author summer
- * @version $Id: ServiceMetaConfig.java, v 0.1 2022年01月16日 10:58 AM summer Exp $
- */
- @Data
- public class ServiceMetaConfig {
-
- /**
- * 服务名
- */
- private String name;
-
- /**
- * 服务版本
- */
- private String version;
-
- /**
- * 服务地址
- */
- private String address;
-
- /**
- * 服务端口
- */
- private Integer port;
- }

- package com.summer.simplerpc.registry.zk;
-
- import com.summer.simplerpc.registry.cache.ServiceProviderCache;
- import com.summer.simplerpc.registry.model.ServiceMetaConfig;
- import com.summer.simplerpc.registry.ServiceRegistry;
- import com.summer.simplerpc.util.ServiceUtils;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.apache.curator.x.discovery.*;
- import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
- import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
-
- /**
- * 服务注册中心-zk实现
- *
- * @author summer
- * @version $Id: ZkServiceRegistry.java, v 0.1 2022年01月16日 11:07 AM summer Exp $
- */
- public class ZkServiceRegistry implements ServiceRegistry {
-
- /**
- * zk base path
- */
- private final static String ZK_BASE_PATH = "/simplerpc";
-
- /**
- * serviceProvider锁
- */
- private final Object lock = new Object();
-
- /**
- * zk framework client
- */
- private CuratorFramework client;
-
- /**
- * 服务发现
- */
- private ServiceDiscovery<ServiceMetaConfig> serviceDiscovery;
-
- /**
- * serviceProvider缓存
- */
- private ServiceProviderCache serviceProviderCache;
-
- /**
- * 构造函数
- *
- * @param address 地址
- */
- public ZkServiceRegistry(String address, ServiceProviderCache serviceProviderCache) throws Exception {
- this.client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
- this.client.start();
-
- this.serviceProviderCache = serviceProviderCache;
-
- JsonInstanceSerializer<ServiceMetaConfig> serializer = new JsonInstanceSerializer<>(ServiceMetaConfig.class);
- serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaConfig.class)
- .client(client)
- .serializer(serializer)
- .basePath(ZK_BASE_PATH)
- .build();
- serviceDiscovery.start();
- }
-
- @Override
- public void register(ServiceMetaConfig serviceMetaConfig) throws Exception {
- ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder();
- ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder
- .name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion()))
- .address(serviceMetaConfig.getAddress())
- .port(serviceMetaConfig.getPort())
- .payload(serviceMetaConfig)
- .uriSpec(new UriSpec("{scheme}://{address}:{port}"))
- .build();
-
- serviceDiscovery.registerService(serviceInstance);
- }
-
- @Override
- public void unRegister(ServiceMetaConfig serviceMetaConfig) throws Exception {
- ServiceInstanceBuilder<ServiceMetaConfig> serviceInstanceBuilder = ServiceInstance.builder();
- ServiceInstance<ServiceMetaConfig> serviceInstance = serviceInstanceBuilder
- .name(ServiceUtils.buildServiceKey(serviceMetaConfig.getName(), serviceMetaConfig.getVersion()))
- .address(serviceMetaConfig.getAddress())
- .port(serviceMetaConfig.getPort())
- .payload(serviceMetaConfig)
- .uriSpec(new UriSpec("{scheme}://{address}:{port}"))
- .build();
-
- serviceDiscovery.unregisterService(serviceInstance);
- }
-
- @Override
- public ServiceMetaConfig discovery(String serviceName) throws Exception {
- //先读缓存
- ServiceProvider<ServiceMetaConfig> serviceProvider = serviceProviderCache.queryCache(serviceName);
-
- //缓存miss,需要调serviceDiscovery
- if (serviceProvider == null) {
- synchronized (lock) {
- serviceProvider = serviceDiscovery.serviceProviderBuilder()
- .serviceName(serviceName)
- .providerStrategy(new RoundRobinStrategy<>())
- .build();
- serviceProvider.start();
-
- //更新缓存
- serviceProviderCache.updateCache(serviceName, serviceProvider);
- }
- }
-
- ServiceInstance<ServiceMetaConfig> serviceInstance = serviceProvider.getInstance();
- return serviceInstance != null ? serviceInstance.getPayload() : null;
- }
- }

- package com.summer.simplerpc.registry.cache;
-
- import com.summer.simplerpc.registry.model.ServiceMetaConfig;
- import org.apache.curator.x.discovery.ServiceProvider;
-
- /**
- *
- * @author summer
- * @version $Id: ServiceProviderCache.java, v 0.1 2022年01月16日 11:41 AM summer Exp $
- */
- public interface ServiceProviderCache {
-
- /**
- * 查询缓存
- * @param serviceName
- * @return
- */
- ServiceProvider<ServiceMetaConfig> queryCache(String serviceName);
-
- /**
- * 更新缓存
- *
- * @param serviceName 服务名
- * @param serviceProvider 服务provider
- * @return
- */
- void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider);
- }

- package com.summer.simplerpc.registry.cache;
-
- import com.summer.simplerpc.registry.model.ServiceMetaConfig;
- import org.apache.curator.x.discovery.ServiceProvider;
-
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
-
- /**
- * 本地缓存实现
- *
- * @author summer
- * @version $Id: ServiceProviderLocalCache.java, v 0.1 2022年01月16日 11:43 AM summer Exp $
- */
- public class ServiceProviderLocalCache implements ServiceProviderCache {
-
- /**
- * 本地缓存map
- */
- private Map<String, ServiceProvider<ServiceMetaConfig>> serviceProviderMap = new ConcurrentHashMap<>();
- @Override
- public ServiceProvider<ServiceMetaConfig> queryCache(String serviceName) {
- return serviceProviderMap.get(serviceName);
- }
-
- @Override
- public void updateCache(String serviceName, ServiceProvider<ServiceMetaConfig> serviceProvider) {
- serviceProviderMap.put(serviceName, serviceProvider);
- }
- }

- package com.summer.simplerpc.util;
-
- /**
- * 服务相关通用工具类
- *
- * @author summer
- * @version $Id: ServiceUtils.java, v 0.1 2022年01月16日 11:28 AM summer Exp $
- */
- public class ServiceUtils {
-
- /**
- * 分隔符
- */
- public final static String SPLIT_CHAR = ":";
-
- /**
- * 服务唯一标识key组装
- *
- * @param serviceName 服务名
- * @param serviceVersion 服务版本
- * @return
- */
- public final static String buildServiceKey(String serviceName, String serviceVersion) {
- return String.join(SPLIT_CHAR, serviceName, serviceVersion);
- }
- }

- package com.summer.simplerpc;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class SimplerpcApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(SimplerpcApplication.class, args);
- }
-
- }
- summer.simplerpc.service.address=127.0.0.1:50001
- summer.simplerpc.service.registry.type=zookeeper
- summer.simplerpc.service.registry.address=127.0.0.1:2181
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starters</artifactId>
- <version>1.5.2.RELEASE</version>
- </parent>
- <groupId>com.summer</groupId>
- <artifactId>simplerpc-starter</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>simplerpc-starter</name>
- <description>Demo project for Spring Boot</description>
- <properties>
- <java.version>1.8</java.version>
- <spring-boot.version>2.2.1.RELEASE</spring-boot.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-dependencies</artifactId>
- <version>${spring-boot.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- <version>${spring-boot.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <version>${spring-boot.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.16.22</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-x-discovery-server</artifactId>
- <version>2.9.1</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.45.Final</version>
- </dependency>
-
- <dependency>
- <groupId>cglib</groupId>
- <artifactId>cglib</artifactId>
- <version>3.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.caucho</groupId>
- <artifactId>hessian</artifactId>
- <version>4.0.38</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>net.minidev</groupId>
- <artifactId>accessors-smart</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。