赞
踩
偶然间看到一个RPC的框架项目,感觉与其他的项目与众不同,所以准备跟着实现一下。由于教程比较少,因此写下这边记录,慢慢踩坑。既然是重复造轮子,自然是以学习框架与组件的运用为主。
Remote Procedure Call(RPC):远程过程调用。
RPC采用Client-Server结构,通过Request-Response消息模式实现。
RPC调用过程中采用的消息协议称为RPC协议
RPC协议规定请求、响应消息的格式在TCP(网络传输控制协议)上可选用或自定义消息协议来完成RPC消息交互。我们可以选用通用的标准协议(如:http、https),也可以根据自身的需要定义自己的消息协议。
封装好参数编组、消息解组、底层网络通信的RPC程序开发框架,带来的便捷是可以直接在其基础上只需要专注于过程代码编写。
Java领域:
我们将会写一个简易的RPC框架,暂且叫它zarlic-rpc-spring-boot-starter,通过在项目中引入该starter,并简单的配置一下,项目即拥有提供远程服务的能力。
编写自定义注解@Service,被它注解的类将会提供远程服务。
编写自定义注解@InjectService,使用它可注入远程服务。
客户端想要调用远程服务,必须具备服务发现的能力;在知道有哪些服务过后,还必须有服务代理来执行服务调用;客户端想要与服务端通信,必须要有相同的消息协议;客户端想要调用远程服务,那么必须具备网络请求的能力,即网络层功能。
当然,这是客户端所需的最基本的能力,其实还可以扩展的能力,例如负载均衡。
基于面向接口编程的理念,不同角色都实现了定义了相应规范的接口。这里面我们没有发现消息协议相关内容,那是因为服务端也需要消息协议,因此抽离了出来,放在公共层。
客户端的代码结构:
/** * 服务发现抽象类,定义服务发现规范 */ public interface ServiceDiscoverer { List<Service> getService(String name); } /** Zookeeper服务发现者,定义以Zookeeper为注册中心的服务发现细则 * @author zarlic * @date 2021.12.15 15:28 */ public class ZookeeperServiceDiscoverer implements ServiceDiscoverer{ private ZkClient zkClient; public ZookeeperServiceDiscoverer(String zkAddress) { zkClient = new ZkClient(zkAddress); //配置zk中心地址 zkClient.setZkSerializer(new ZookeeperSerializer()); //自定义序列化,在common中定义ZookeeperSerializer } /** * 使用Zookeeper客户端,通过服务名获取服务列表 * 服务名格式:接口全路径 * * @param name 服务名 * @return 服务列表 */ @Override public List<Service> getService(String name) { String servicePath = ZarlicConstant.ZK_SERVICE_PATH + ZarlicConstant.PATH_DELIMITER + name + "/service"; List<String> children = zkClient.getChildren(servicePath); //ofNullable 如果children不为空就将其赋值ArrayList,为空创建一个空对象集合赋值给newList,也就避免了空指针异常。 //使用decode解码出服务名字,再根据服务名字,转换成对应的服务 return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> { String deCh = null; try { deCh = URLDecoder.decode(str,ZarlicConstant.UTF_8); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return JSON.parseObject(deCh,Service.class); }).collect(Collectors.toList()); } }
服务发现者使用Zookeeper来实现,通过ZkClient我们很容易发现已经注册在ZK上的服务。当然我们也可以使用其他组件作为注册中心,例如Redis。
/**网络请求客户端,定义网络请求规范 * @author zarlic * @date 2021.12.15 15:29 */ public interface NetClient { /** * 客户端发送请求,即为Request,传送为序列化之后的数据流byte[] * @param data * @param service * @return * @throws InterruptedException */ byte[] sendRequest(byte[] data, Service service) throws InterruptedException; } /**Netty网络请求客户端,定义通过Netty实现网络请求的细则。 * @author zarlic * @date 2021.12.15 15:29 */ public class NettyNetClient implements NetClient{ /** * 日志调用 */ private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class); @Override public byte[] sendRequest(byte[] data, Service service) throws InterruptedException { String[] addInfoArray = service.getAddress().split(":"); String serverAddress = addInfoArray[0]; String serverPort = addInfoArray[1]; SendHandler sendHandler = new SendHandler(data); byte[] respData; // 配置客户端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); //group设置线程池,channel设置nio类型的channel,option设置通道选项,handler装配流水线 bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { //有连接到达时会创建一个channel @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); // pipeline管理channel中的Handler // 在channel队列中添加一个handler来处理业务 channelPipeline.addLast(sendHandler); } }); // 启动客户端连接 // 开始绑定server // 通过调用sync同步方法阻塞直到绑定成功 bootstrap.connect(serverAddress,Integer.parseInt(serverPort)).sync(); respData = (byte[]) sendHandler.rspData(); logger.info("SendRequest get reply:{}",respData); } finally { // 优雅关闭EventLoopGroup, // 释放掉所有资源包括创建的线程 group.shutdownGracefully(); } return respData; } } /** 发送处理类,定义Netty入站处理细则 * @author zarlic * @date 2021.12.15 15:28 */ public class SendHandler extends ChannelInboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(SendHandler.class); private CountDownLatch countDownLatch; private Object readMsg = null; private byte[] data; public SendHandler(byte[] data) { countDownLatch = new CountDownLatch(1); this.data = data; } /** * 当连接服务端成功后,发送请求数据 * * @param ctx 通道上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Successful connection to server:{}",ctx); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); logger.info("Client send message:{}",reqBuf); ctx.writeAndFlush(reqBuf); } /** * 读取数据,数据读取完毕释放CD锁 * * @param ctx 上下文 * @param msg ByteBuf */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("Client reads message: {}", msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。