赞
踩
> anoyi中的GrpcClient
1. 查看属性
~~~
private static final Map serverMap = new HashMap<>();
private final GrpcProperties grpcProperties;
private final SerializeService serializeService;
private ClientInterceptor clientInterceptor;
~~~
这里四个属性,serverMap 的key值是远程服务端的名字列表,ServerContext是http2.0的请求context,grpcProperties是客户端的配置属性,有端口号,启动时候绑定的,serializeService是序列化的类,clientInterceptor是拦截器,因为grpc本质是http,所以有必要在发送的时候加token或者啥的令牌
2. 构造方法(自己去看)
~~~
public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService) {
this.grpcProperties = grpcProperties;
this.serializeService = serializeService;
}
public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService, ClientInterceptor clientInterceptor) {
this.grpcProperties = grpcProperties;
this.serializeService = serializeService;
this.clientInterceptor = clientInterceptor;
}
~~~
3. 初始化方法
~~~
/**
* 初始化
*/
public void init(){
List remoteServers = grpcProperties.getRemoteServers();
if (!CollectionUtils.isEmpty(remoteServers)) {
for (RemoteServer server : remoteServers) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort())
.defaultLoadBalancingPolicy("round_robin")
.nameResolverFactory(new DnsNameResolverProvider())
.idleTimeout(30, TimeUnit.SECONDS)
.usePlaintext().build();
if (clientInterceptor != null){
Channel newChannel = ClientInterceptors.intercept(channel, clientInterceptor);
serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService));
} else {
Class clazz = grpcProperties.getClientInterceptor();
if (clazz == null) {
serverMap.put(server.getServer(), new ServerContext(channel, serializeService));
}else {
try {
ClientInterceptor interceptor = (ClientInterceptor) clazz.newInstance();
Channel newChannel = ClientInterceptors.intercept(channel, interceptor);
serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService));
} catch (InstantiationException | IllegalAccessException e) {
log.warn("ClientInterceptor cannot use, ignoring...");
serverMap.put(server.getServer(), new ServerContext(channel, serializeService));
}
}
}
}
}
}
~~~
4. 解析初始化方法
我们看到该方法里面有这么一段代码
~~~
ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort())
.defaultLoadBalancingPolicy("round_robin")
.nameResolverFactory(new DnsNameResolverProvider())
.idleTimeout(30, TimeUnit.SECONDS)
.usePlaintext().build();
~~~
这个方法只是实例化客户端连接器,而且这个方法是在循环中,说明我们客户端可以配置多个服务端连接。所以客户端是一对多服务端。才会有上面的serverMap ,其实这个方法很就是生成ServerContext存放到内存中,要访问哪个服务端的时候就拿出来,接下来看看下面的方法
~~~
/**
* 连接远程服务
*/
public static ServerContext connect(String serverName) {
return serverMap.get(serverName);
}
~~~
看到了吧,就是一对多服务端,要的时候取出相应名字的context,接下来看看ServerContext这个类,这个类是我们封装的
5. 查看ServerContext
* [ ] 属性
~~~
private Channel channel;
private final SerializeService defaultSerializeService;
private CommonServiceGrpc.CommonServiceBlockingStub blockingStub;
~~~
这三个属性,有用的是Channel ,这个是通道,SerializeService 是序列化的方法,自己写,记得要跟服务端约定好,
blockingStub是grpc的关键,是google生成的方法,实现通信。
* [ ] 构造方法
~~~
ServerContext(Channel channel, SerializeService serializeService) {
this.channel = channel;
this.defaultSerializeService = serializeService;
blockingStub = CommonServiceGrpc.newBlockingStub(channel);
}
~~~
很简单的构造方法,传参,实例化
* [ ] 请求方法
~~~
/**
* 处理 gRPC 请求
*/
public GrpcResponse handle(SerializeType serializeType, GrpcRequest grpcRequest) {
SerializeService serializeService = SerializeUtils.getSerializeService(serializeType, this.defaultSerializeService);
ByteString bytes = serializeService.serialize(grpcRequest);
int value = (serializeType == null ? -1 : serializeType.getValue());
GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build();
GrpcService.Response response = null;
try{
response = blockingStub.handle(request);
}catch (Exception exception){
log.warn("rpc exception: {}", exception.getMessage());
if ("UNAVAILABLE: io exception".equals(exception.getMessage().trim())){
response = blockingStub.handle(request);
}
}
return serializeService.deserialize(response);
}
~~~
这个方法很简单就是序列化并发送,由于java的http2.0采用ByteString通讯,所以必须序列化成ByteString,这里我们提供三种方法,一个是FastJSONSerializeService,这个是阿里的fastjson序列化,一个是ProtoStuffSerializeService,
这个是protobuff的包,是谷歌提供的,一个是SofaHessianSerializeService,这个是probuf-java,也是谷歌的,接下来看看往下执行的方法
~~~
GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build();
~~~
这个很关键了,GrpcService是用proto生成的,也就是protobuf对http2.0的支持,接下来看看我们写的service.proto
~~~
syntax = "proto3";
option java_package = "com.anoyi.rpc";
option java_outer_classname = "GrpcService";
option java_multiple_files = false;
// 定义通用的 Grpc 服务
service CommonService {
// 处理请求
rpc handle ( Request ) returns ( Response ) {}
}
// 定义通用的 Grpc 请求体
message Request {
int32 serialize = 1;
bytes request = 2;
}
// 定义通用的 Grpc 响应体
message Response {
bytes response = 1;
}
~~~
service.proto采用proto3协议,java_package 的意思是我们生成的类放的包,我们放在com.anoyi.rpc,java_outer_classname 是我们生成的外部类名字,我们叫GrpcService,java_multiple_files 是我们是否支持分成多个java类,我们集成到一个类,叫做GrpcService,所以false了,然后CommonService是个service,我们com.anoyi.rpc包会生成CommonServiceGrpc这个类,自己去继承重写吧,就一个handle方法,参数是Request ,返回值是Response ,都是我们底下定义的结构体,这是个关键的服务端处理方法,Request 是个message ,也就是结构体,有两个字段, int32 serialize = 1;bytes request = 2;一个是serialize ,是序列化方法,request 是我们序列化后的二进制,也及时byteString,Response 也就是一个byteString而已,因为就是返回。当然service.proto,我们会找个方法生成java文件,下一章节会介绍的,然后客户端调用init就会初始化客户端连接grpc的通道,这一章节就结束了
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。