赞
踩
简单的回答 : 远程服务调用.
在dubbo中有应用.
为了更好的理解dubbo框架以及其他的东西.
引入官网架构图:
链接: link.
简单版本结构图
https://www.processon.com/diagraming/601fd9fee0b34d208a6a2924
rpc-common : 属于公共依赖(定义公共接口, 定义规范)
rpc-consumer1: 属于消费端实现(主要通过协议与服务端通信)
rpc-provider1: 属于服务端(服务提供者)实现
<!--引入netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <!--<version>1.2.47</version>--> <version>1.2.71</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>5.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-expression</artifactId> <version>5.2.0.RELEASE</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.2.6.RELEASE</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <!--<version>5.1.46</version>--> <version>5.1.20</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.21</version> </dependency> <!--zkclient客户端--> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency> <!--zookeeper 案例 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.8</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
以上依赖有许多并不是专门为这个RPC服务的, 但是我这里也贴出来了,主要是这个工程还有其他的功能,这不做介绍.
rpc-common工程:
IUserService : 接口定义
/**
*
* @param msg
* @return
*/
String sayHello(String msg);
JsonSerializer : 序列化接口实现
public class JsonSerializer implements Serializer {
@Override
public byte[] serialize(Object o) throws IOException {
return JSON.toJSONBytes(o);
}
@Override
public <T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException {
//反序列化
return JSON.parseObject(bytes, clazz);
}
}
RpcDecoder : rpc解码器
public class RpcDecoder extends MessageToMessageDecoder<ByteBuf> { private Class<?> clazz; private Serializer serializer; public RpcDecoder(Class<?> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { int readInt = byteBuf.readInt(); byte[] bytes = new byte[readInt]; byteBuf.readBytes(bytes); //反序列化 Object deserialize = serializer.deserialize(clazz, bytes); out.add(deserialize); } }
RpcEncoder : rpc编码器
public class RpcEncoder extends MessageToByteEncoder { private Class<?> clazz; private Serializer serializer; public RpcEncoder(Class<?> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { /* 将对象 写入到缓存 : 写入 int + byte */ if(clazz != null && clazz.isInstance(msg)) { byte[] bytes = serializer.serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); } } }
RpcRequest : rpc请求协议对象
@Data
public class RpcRequest implements Serializable {
private static final long serialVersionUID = -4172554571135234360L;
private String requestId;//请求服务id
private String className;//方法名
private String methodName;//类名
private Class<?>[] parameterTypes;//形式参数
private Object[] parameters;//实际参数
}
RpcResult : rpc返回结果(服务端 -> 客户端)
@Data
public class RpcResult implements Serializable {
private static final long serialVersionUID = 1561074744495374547L;
private boolean success;
private String message;
private Object data;
}
Serializer : 序列化接口
public interface Serializer { /* 接口 : */ /** * 序列化 * @param o * @return * @throws IOException */ byte[] serialize(Object o)throws IOException; /** * 反序列化 * @param clazz * @param bytes * @param <T> * @return * @throws IOException */ <T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException; }
ZookeeperUtils:
public class ZookeeperUtils { /** * */ private static CuratorFramework client; /** * * @return */ public static CuratorFramework getClient() { if(client == null) { synchronized (ZookeeperUtils.class) { if(null == client) { client = createClient(); } } } return client; } /** * * @return */ private static CuratorFramework createClient() { RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3); //使用fluent变成风格 client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .retryPolicy(exponentialBackoffRetry) .namespace("lagou-servers") //独立的命名空间 .build(); client.start(); System.out.println("zookeeper会话创建了."); return client; } /** * 测试创建会话 * @param args */ public static void main(String[] args) { createClient(); } }
以上涉及知识点:
1: netty的编解码器, 拆包,粘包,自定义协议.
2: netty的服务处理器的入站,出站顺序及配置.
3: netty的ByteBuf.
4: java的反射.
5: netty的事件触发点.
6: alibaba的fastjson序列化反序列化.
7: zookeeper的基础知识,zookeeper节点的特性.
zookeeper节点的数据结构:
zookeeper节点的分类:
rpc-provider1工程:
ServerBoot
@Service public class ServerBoot implements ApplicationRunner { private static String IP = "127.0.0.1"; private static Integer PORT = 9010; /** * @param applicationArguments * @throws Exception */ @Override public void run(ApplicationArguments applicationArguments) throws Exception { startNettyServer(IP, PORT); } //*************启动netty /** * @param ip * @param port * @throws Exception */ private static void startNettyServer(String ip, int port) throws Exception { /* 1: 引导器 2: 线程池 3: 处理器 4: 监听端口 */ //1.创建两个线程池对象 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); //2.创建服务端的启动引导对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //3.配置启动引导对象 serverBootstrap.group(bossGroup, workGroup) //设置通道为NIO .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //这个配置要注意: RpcResult + RpcRequest; //我们根据Encoder和Decoder来分入站和出站 //根据RpcRequest : 知道这个是要Encoder的 //知道RpcResult是要Decoder的. pipeline.addLast(new RpcEncoder(RpcResult.class, new JsonSerializer())); pipeline.addLast(new RpcDecoder(RpcRequest.class, new JsonSerializer())); //业务处理类 pipeline.addLast(new UserServiceHandler(SpringContextHolder.getApplicationContext())); } }); //这里的sync() serverBootstrap.bind(port).sync(); //链接zk : 创建临时有序节点 CuratorFramework client = ZookeeperUtils.getClient(); String path = "/serverNodes/node"; //在这里创建 lagou-servers client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(path, (ip + ":" + port).getBytes()); //赋值为 当前服务器的 ip和端口 --》 可以和远程客户端进行通信 } }
UserServiceHandler
public class UserServiceHandler extends ChannelInboundHandlerAdapter { private ApplicationContext applicationContext; public UserServiceHandler(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcRequest rpcRequest = null; if(null == msg) { returnFailRst(ctx, "netty服务端: 协议对象不能为空!"); return; } rpcRequest = (RpcRequest) msg; System.out.println("netty服务端: 服务端接收到的协议对象为: " + rpcRequest); if(StringUtils.isEmpty(rpcRequest.getRequestId())) { returnFailRst(ctx, "netty服务端: 请求的协议对象ID不能为空!!!"); } // Object bean = applicationContext.getBean(rpcRequest.getRequestId()); Object bean = SpringContextHolder.getApplicationContext().getBean(rpcRequest.getRequestId()); if(bean == null) { returnFailRst(ctx, "netty服务端: 服务ID" + rpcRequest.getRequestId() + "不存在!"); } //构造反射 Method method = ReflectionUtils.findMethod(bean.getClass(), rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); Object result = ReflectionUtils.invokeMethod(method, bean, rpcRequest.getParameters()); returnSuccRst(ctx, result); System.out.println("netty服务端: 返回结果 " + result); } /** * 构造成功结果返回 * @param ctx * @param result */ private void returnSuccRst(ChannelHandlerContext ctx, Object result) { RpcResult rpcResult = new RpcResult(); rpcResult.setSuccess(true); rpcResult.setData(result); rpcResult.setMessage("netty服务端: 处理成功!" + result); ctx.writeAndFlush(rpcResult); } /** * 返回 失败的结果 * @param ctx * @param s */ private void returnFailRst(ChannelHandlerContext ctx,String s) { RpcResult rpcResult = new RpcResult(); rpcResult.setSuccess(false); rpcResult.setMessage("netty服务端: 处理失败!" + s); ctx.writeAndFlush(rpcResult); } }
SpringContextHolder
@Component public class SpringContextHolder implements ApplicationContextAware,DisposableBean { private static ApplicationContext applicationContext = null; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { SpringContextHolder.applicationContext = context; } /** * 提供外部访问 * @return */ public static ApplicationContext getApplicationContext() { if (applicationContext == null) { throw new IllegalStateException("applicaitonContext属性未注入, 请在SpringBoot启动类中注册SpringContextHolder."); }else { return applicationContext; } } @Override public void destroy() throws Exception { applicationContext = null; // SpringContextHolder.applicationContext = null; } //***********提供一系列 获取 bean的方法 /** * 通过name获取 Bean * @param name 类名称 * @return 实例对象 */ public static Object getBean(String name){ return getApplicationContext().getBean(name); } /** * 通过class获取Bean */ public static <T> T getBean(Class<T> clazz){ return getApplicationContext().getBean(clazz); } /** * 通过name,以及Clazz返回指定的Bean */ public static <T> T getBean(String name,Class<T> clazz){ return getApplicationContext().getBean(name, clazz); } }
UserServiceImpl
@Service(value = "IUserService") public class UserServiceImpl implements IUserService { private static Integer PORT = 9010; @Override public String sayHello(String msg) { System.out.println("这是端口("+ PORT +")服务端的第一个方法:sayHello---》 " + msg); int i = RandomUtil.randomInt(3); ThreadUtil.sleep(i * 1000); return "这是端口("+ PORT +")服务器返回数据 : " + msg; } @Override public String sayHelloWorld(String msg) { System.out.println("这是端口("+ PORT +")服务端的第二个方法sayHelloWorld---》 " + msg); return "这是端口("+ PORT +")服务端的第二个方法返回数据 : " + msg; } }
LaGouV1Application
以上涉及知识点:
1: springboot
2: springboot的自动装配
3: 其他
rpc-consumer1工程:
RpcConsumer
public class RpcConsumer { /** * 线程池 */ private static ExecutorService executorService = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() ); /** * 记录 服务端 处理器 */ private volatile static Map<String, UserClientHandler> userClientHandlerMap; /** * 记录最近一次服务处理 时间 */ private volatile static Map<String, Long> userClientHandlerTime = new ConcurrentHashMap<>(); /** * */ private static volatile long count = 0; /* 1: 启动所有客户端 2: 注册监听 */ private static void init() throws Exception { CuratorFramework client = ZookeeperUtils.getClient(); String path = "/serverNodes"; //初始化所有的客户端连接 initClients(client, path); //注册zk 父路径监听 registerZkPathListener(client, path); } /** * 注册路径监听 * @param client * @param path */ private static void registerZkPathListener(CuratorFramework client, String path) throws Exception { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true); pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception { ChildData childData = event.getData(); PathChildrenCacheEvent.Type eventType = event.getType(); System.out.println("zk节点监控: childData: " + childData); System.out.println("zk节点监控: eventType: " + eventType); switch (eventType) { case CONNECTION_RECONNECTED: pathChildrenCache.rebuild(); break; case CONNECTION_SUSPENDED: break; case CONNECTION_LOST: System.out.println("Connection lost."); break; case CHILD_ADDED: addClient(client, event.getData()); break; case CHILD_UPDATED: System.out.println("Child updated"); break; case CHILD_REMOVED: System.out.println("节点移除!"); delClient(client, event.getData()); break; default: break; } } }); } /** * 移除 服务 端连接 * @param client * @param data */ private static void delClient(CuratorFramework client, ChildData data) { System.out.println("zk节点监控: 移除childData : " + data); byte[] dataData = data.getData(); UserClientHandler userClientHandler = userClientHandlerMap.get(client); if(userClientHandler == null) { System.out.println("zk节点监控: 服务节点: " + client + "不存在!"); } String serverKey = new String(dataData); if(userClientHandler.isState()) { System.out.println("zk节点监控: 服务节点:" + serverKey + "是激活的不能被删除!!!"); } else { userClientHandlerMap.remove(serverKey); System.out.println("zk节点监控: delete server:" + serverKey); } } /** * 新增 服务 端连接 * @param client * @param data */ private static void addClient(CuratorFramework client, ChildData data) throws InterruptedException { System.out.println("zk节点监控: 新增ChildData: " + data); byte[] dataData = data.getData(); String serverKey = new String(dataData); System.out.println("zk节点监控: add server :" + serverKey); String[] split = serverKey.split(":"); initClient(split[0], split[1]); } /** * 初始化 和所有 服务端的链接 : netty 链接 * @param client * @param path */ private synchronized static void initClients(CuratorFramework client, String path) { if(userClientHandlerMap == null) { userClientHandlerMap = new HashMap<>(64); } try { List<String> list = client.getChildren().forPath(path); // for (String link : list) { String allPath = path + "/" + link; System.out.println("zk节点监控: 已注册服务节点路径: " + allPath); byte[] bytes = client.getData().forPath(allPath); String s = new String(bytes); System.out.println("zk节点监控: 已注册服务节点路径的值: " + s); String[] split = s.split(":"); //初始化netty客户 端连接 initClient(split[0], split[1]); } } catch (Exception e) { e.printStackTrace(); } } /** * * @param ip * @param port */ private static void initClient(String ip, String port) throws InterruptedException { //这儿的 UserClientHandler 将会被设置一个值 存放到容器当中, 然后会被取出发送客户端请求 UserClientHandler userClientHandler = new UserClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new RpcEncoder(RpcRequest.class, new JsonSerializer())); pipeline.addLast(new RpcDecoder(RpcResult.class, new JsonSerializer())); pipeline.addLast(userClientHandler); } }); bootstrap.connect(ip, Integer.parseInt(port)).sync(); //打印这个存储 userClientHandlerMap.put(ip + ":" + port, userClientHandler); System.out.println("初始化存储: 容器存储: " + (ip + ":" + port) + " = " + userClientHandler); } /** * * @return */ private static Integer clearClients() { for (Map.Entry<String, UserClientHandler> key : userClientHandlerMap.entrySet()) { UserClientHandler userClientHandler = userClientHandlerMap.get(key); if(userClientHandler != null) { if(!userClientHandler.isState()) { System.out.println("zk节点监控: 从服务列表中移除服务节点:" + key); userClientHandlerMap.remove(key); userClientHandlerTime.remove(key); } } } return userClientHandlerMap.size(); } //现在来发请求 --> 获取远程服务响应 (netty + zookeeper) //只有接口, 通过代理 + 协议完成 /** * * @param proxyClass * @return */ public Object createProxy(Class<?> proxyClass) { Object proxyInstance = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{proxyClass}, new RpcInvokeInvocationHandler()); return proxyInstance; } /** * rpc 远程服务 反射调用类 */ private class RpcInvokeInvocationHandler implements InvocationHandler { /** * 代理类生成后, 在调用原方法的时候会执行这儿的逻辑 todo * @param proxy * @param method * @param args * @return * @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { /* 1: 初始化所有的连接 2: 初始化处理器容器 */ if(userClientHandlerMap == null) { init(); } //判断当前节点是否可用 : 判断是否有可用的节点 Integer availableCount = clearClients(); if(availableCount == 0) { throw new RuntimeException("代理: 无服务节点可用!!!"); } //构造协议对象 RpcRequest rpcRequest = new RpcRequest(); //获取服务类名 //根据方法可以推理出来 String className = method.getDeclaringClass().getName(); String packageName = method.getDeclaringClass().getPackage().getName(); //获取类 String serviceName = StrUtil.removeAll(className, packageName + '.'); System.out.println("代理: className: " + className + "\tpackageName: " + packageName + "\tserviceName: " + serviceName); rpcRequest.setRequestId(serviceName); rpcRequest.setClassName(className); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); rpcRequest.setParameterTypes(method.getParameterTypes()); System.out.println("代理: rpcRequest: " + rpcRequest); LoadBalanceService loadBalanceService; loadBalanceService = new ServiceLBRandom(); //获取服务key : String serverKey = loadBalanceService.getService(); System.out.println("代理: serverkey: " + serverKey); //这个key 是从服务处理容器中随机获取的 UserClientHandler userClientHandler = userClientHandlerMap.get(serverKey); //发送消息 userClientHandler.setToSendParam(rpcRequest); //使用线程池异步接收 long begin = DateUtil.current(); Object result = executorService.submit(userClientHandler).get(); long spendMs = DateUtil.spendMs(begin); //记录耗时 getUserClientHandlerTime().put(serverKey, spendMs); System.out.println("代理: call server:" + serverKey+",耗时:" + spendMs + "ms"); //返回结果 return result; } } /** * * @return */ public static Map<String, Long> getUserClientHandlerTime() { return userClientHandlerTime; } public static Map<String, UserClientHandler> getUserClientHandlerMap() { return userClientHandlerMap; } }
UserClientHandler
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable { /* 1: 异步发送 2: 线程池接收结果 3: 发送数据 4: 记录接收结果 5: 加入等待唤醒机制 wait() notify() 6: 上下文对象 Ctx ChannelHandlerContext */ private ChannelHandlerContext ctx; /** * 发送参数 */ private Object toSendParam; /** * 结束结果 */ private Object toRecvResult; /** * 标识当前服务器的状态 */ private boolean state = false; /** * 设置发送 请求信息 * @param toSendParam */ public void setToSendParam(Object toSendParam) { this.toSendParam = toSendParam; } /** * 获取服务器状态 state * @return */ public boolean isState() { return state; } /** * 阻塞等待结果 * @return * @throws Exception */ @Override public synchronized Object call() throws Exception { System.out.println("发送对象: " + this.toSendParam.toString()); this.ctx.writeAndFlush(this.toSendParam); System.out.println("netty客户端: 睡眠线程(线程名) : " + Thread.currentThread().getName()); wait(); return toRecvResult; } /** * 激活 与服务段连接后发生事件 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; this.state = true; } /** * 与服务端 断开连接事件 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("netty客户端: 断开连接."); this.state = false; } /** * 读取 服务端发送的数据 * @param ctx * @param msg * @throws Exception */ @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(null != msg) { RpcResult rpcResult = (RpcResult) msg; //成功失败判断 if(rpcResult.isSuccess()) { this.toRecvResult = rpcResult.getData(); } else { new RuntimeException(rpcResult.getMessage()); } } System.out.println("netty客户端: 唤醒线程(唤醒线程名) : " + Thread.currentThread().getName()); notify(); } }
LoadBalanceService
public interface LoadBalanceService {
/**
* 获取服务
* @return
*/
String getService();
}
ServiceLBLeastTime
public class ServiceLBLeastTime implements LoadBalanceService { @Override public String getService() { Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap(); Map<String, Long> userClientHandlerTime = RpcConsumer.getUserClientHandlerTime(); Set<String> services = userClientHandlerMap.keySet(); long dr=-1; String serverKey=""; //循环得到耗时最短的服务节点 for (String service : services) { boolean b = userClientHandlerTime.containsKey(service); if(b) { Long aLong = userClientHandlerTime.get(service); if(aLong < dr || dr < 0) { dr = aLong; serverKey = service; } } else { //一次都没执行过 说明 没有承担负载 serverKey = service; break; } } return serverKey; } }
ServiceLBRandom
public class ServiceLBRandom implements LoadBalanceService { @Override public String getService() { Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap(); String[] keys = userClientHandlerMap.keySet().toArray(new String[0]); System.out.println("keys: " + keys); for (String x : keys) { System.out.println("keys : " + x); } int i = RandomUtil.randomInt(keys.length); String key = keys[i]; System.out.println("随机获得的服务key : " + key); return key; } public static void main(String[] args) { HashMap<String, String> map = new HashMap<>(); map.put("1", "1"); map.put("3", "3"); map.put("2", "2"); map.put("5", "5"); String[] strings = map.keySet().toArray(new String[0]); System.out.println("strings: " + strings); for (String x : strings) { System.out.println("keys : " + x); } /* keys : 1 keys : 2 keys : 3 keys : 5 */ } }
1: 如何定义一个订单服务呢?
首先我们在rpc-common工程中定义一个订单接口IOrderService, 然后我们在
rpc-provider中写好OrderServiceImpl实现类, 实现订单接口.
最后我们在消费端创建代理对象, 然后传输对应的协议对象进行请求订单服务即可.
2: 待扩展.
3: 关于负载均衡那一块可以使用策略模式进行优化,加入其它的算法,加入权重等值进行判断, 待优化.
4: 关于ip和端口的配置可以引入动态配置
1: netty在连接服务端后,写了如下代码, 出现了发送请求没有收到任何响应的情况
客户端这么写之后发送请求直接没有任何响应,没有任何报错,也没有任何结果.
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture();
2: notify()和wait()方法的使用必须结合sychronized关键字, 否则线程无法唤醒,或者是报非法的监视器异常: illegalMonitorException.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。