当前位置:   article > 正文

java配置多个grpc client_GrpcClient

java opentelemetry grpc clientinterceptor

> anoyi中的GrpcClient

1. 查看属性

~~~

private static final Map serverMap = new HashMap<>();

private final GrpcProperties grpcProperties;

private final SerializeService serializeService;

private ClientInterceptor clientInterceptor;

~~~

这里四个属性,serverMap 的key值是远程服务端的名字列表,ServerContext是http2.0的请求context,grpcProperties是客户端的配置属性,有端口号,启动时候绑定的,serializeService是序列化的类,clientInterceptor是拦截器,因为grpc本质是http,所以有必要在发送的时候加token或者啥的令牌

2. 构造方法(自己去看)

~~~

public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService) {

this.grpcProperties = grpcProperties;

this.serializeService = serializeService;

}

public GrpcClient(GrpcProperties grpcProperties, SerializeService serializeService, ClientInterceptor clientInterceptor) {

this.grpcProperties = grpcProperties;

this.serializeService = serializeService;

this.clientInterceptor = clientInterceptor;

}

~~~

3. 初始化方法

~~~

/**

* 初始化

*/

public void init(){

List remoteServers = grpcProperties.getRemoteServers();

if (!CollectionUtils.isEmpty(remoteServers)) {

for (RemoteServer server : remoteServers) {

ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort())

.defaultLoadBalancingPolicy("round_robin")

.nameResolverFactory(new DnsNameResolverProvider())

.idleTimeout(30, TimeUnit.SECONDS)

.usePlaintext().build();

if (clientInterceptor != null){

Channel newChannel = ClientInterceptors.intercept(channel, clientInterceptor);

serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService));

} else {

Class clazz = grpcProperties.getClientInterceptor();

if (clazz == null) {

serverMap.put(server.getServer(), new ServerContext(channel, serializeService));

}else {

try {

ClientInterceptor interceptor = (ClientInterceptor) clazz.newInstance();

Channel newChannel = ClientInterceptors.intercept(channel, interceptor);

serverMap.put(server.getServer(), new ServerContext(newChannel, serializeService));

} catch (InstantiationException | IllegalAccessException e) {

log.warn("ClientInterceptor cannot use, ignoring...");

serverMap.put(server.getServer(), new ServerContext(channel, serializeService));

}

}

}

}

}

}

~~~

4. 解析初始化方法

我们看到该方法里面有这么一段代码

~~~

ManagedChannel channel = ManagedChannelBuilder.forAddress(server.getHost(), server.getPort())

.defaultLoadBalancingPolicy("round_robin")

.nameResolverFactory(new DnsNameResolverProvider())

.idleTimeout(30, TimeUnit.SECONDS)

.usePlaintext().build();

~~~

这个方法只是实例化客户端连接器,而且这个方法是在循环中,说明我们客户端可以配置多个服务端连接。所以客户端是一对多服务端。才会有上面的serverMap ,其实这个方法很就是生成ServerContext存放到内存中,要访问哪个服务端的时候就拿出来,接下来看看下面的方法

~~~

/**

* 连接远程服务

*/

public static ServerContext connect(String serverName) {

return serverMap.get(serverName);

}

~~~

看到了吧,就是一对多服务端,要的时候取出相应名字的context,接下来看看ServerContext这个类,这个类是我们封装的

5. 查看ServerContext

* [ ] 属性

~~~

private Channel channel;

private final SerializeService defaultSerializeService;

private CommonServiceGrpc.CommonServiceBlockingStub blockingStub;

~~~

这三个属性,有用的是Channel ,这个是通道,SerializeService 是序列化的方法,自己写,记得要跟服务端约定好,

blockingStub是grpc的关键,是google生成的方法,实现通信。

* [ ] 构造方法

~~~

ServerContext(Channel channel, SerializeService serializeService) {

this.channel = channel;

this.defaultSerializeService = serializeService;

blockingStub = CommonServiceGrpc.newBlockingStub(channel);

}

~~~

很简单的构造方法,传参,实例化

* [ ] 请求方法

~~~

/**

* 处理 gRPC 请求

*/

public GrpcResponse handle(SerializeType serializeType, GrpcRequest grpcRequest) {

SerializeService serializeService = SerializeUtils.getSerializeService(serializeType, this.defaultSerializeService);

ByteString bytes = serializeService.serialize(grpcRequest);

int value = (serializeType == null ? -1 : serializeType.getValue());

GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build();

GrpcService.Response response = null;

try{

response = blockingStub.handle(request);

}catch (Exception exception){

log.warn("rpc exception: {}", exception.getMessage());

if ("UNAVAILABLE: io exception".equals(exception.getMessage().trim())){

response = blockingStub.handle(request);

}

}

return serializeService.deserialize(response);

}

~~~

这个方法很简单就是序列化并发送,由于java的http2.0采用ByteString通讯,所以必须序列化成ByteString,这里我们提供三种方法,一个是FastJSONSerializeService,这个是阿里的fastjson序列化,一个是ProtoStuffSerializeService,

这个是protobuff的包,是谷歌提供的,一个是SofaHessianSerializeService,这个是probuf-java,也是谷歌的,接下来看看往下执行的方法

~~~

GrpcService.Request request = GrpcService.Request.newBuilder().setSerialize(value).setRequest(bytes).build();

~~~

这个很关键了,GrpcService是用proto生成的,也就是protobuf对http2.0的支持,接下来看看我们写的service.proto

~~~

syntax = "proto3";

option java_package = "com.anoyi.rpc";

option java_outer_classname = "GrpcService";

option java_multiple_files = false;

// 定义通用的 Grpc 服务

service CommonService {

// 处理请求

rpc handle ( Request ) returns ( Response ) {}

}

// 定义通用的 Grpc 请求体

message Request {

int32 serialize = 1;

bytes request = 2;

}

// 定义通用的 Grpc 响应体

message Response {

bytes response = 1;

}

~~~

service.proto采用proto3协议,java_package 的意思是我们生成的类放的包,我们放在com.anoyi.rpc,java_outer_classname 是我们生成的外部类名字,我们叫GrpcService,java_multiple_files 是我们是否支持分成多个java类,我们集成到一个类,叫做GrpcService,所以false了,然后CommonService是个service,我们com.anoyi.rpc包会生成CommonServiceGrpc这个类,自己去继承重写吧,就一个handle方法,参数是Request ,返回值是Response ,都是我们底下定义的结构体,这是个关键的服务端处理方法,Request 是个message ,也就是结构体,有两个字段, int32 serialize = 1;bytes request = 2;一个是serialize ,是序列化方法,request 是我们序列化后的二进制,也及时byteString,Response 也就是一个byteString而已,因为就是返回。当然service.proto,我们会找个方法生成java文件,下一章节会介绍的,然后客户端调用init就会初始化客户端连接grpc的通道,这一章节就结束了

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/145935
推荐阅读
相关标签
  

闽ICP备14008679号