当前位置:   article > 正文

gRPC 的 java 使用_grpc java

grpc java




默认情况下,gRPC使用Protocol Buffers,用于序列化结构化数据(尽管它可以与JSON等其他数据格式一起使用)

下面介绍 gRPC 在java中的使用

gRPC 官网:https://grpc.io/


1、创建 gRPC项目







1、创建 gRPC项目

新建maven项目,项目名是 grpc-learn

pom.xml 文件内容

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.wsjzzcbq</groupId>
  7. <artifactId>grpc-learn</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  11. <maven.compiler.source>1.8</maven.compiler.source>
  12. <maven.compiler.target>1.8</maven.compiler.target>
  13. </properties>
  14. <repositories>
  15. <repository>
  16. <id>naxus-aliyun</id>
  17. <name>naxus-aliyun</name>
  18. <url>https://maven.aliyun.com/repository/public</url>
  19. <releases>
  20. <enabled>true</enabled>
  21. </releases>
  22. <snapshots>
  23. <enabled>false</enabled>
  24. </snapshots>
  25. </repository>
  26. </repositories>
  27. <dependencies>
  28. <dependency>
  29. <groupId>io.grpc</groupId>
  30. <artifactId>grpc-netty-shaded</artifactId>
  31. <version>1.57.2</version>
  32. <scope>runtime</scope>
  33. </dependency>
  34. <dependency>
  35. <groupId>io.grpc</groupId>
  36. <artifactId>grpc-protobuf</artifactId>
  37. <version>1.57.2</version>
  38. </dependency>
  39. <dependency>
  40. <groupId>io.grpc</groupId>
  41. <artifactId>grpc-stub</artifactId>
  42. <version>1.57.2</version>
  43. </dependency>
  44. <dependency> <!-- necessary for Java 9+ -->
  45. <groupId>org.apache.tomcat</groupId>
  46. <artifactId>annotations-api</artifactId>
  47. <version>6.0.53</version>
  48. <scope>provided</scope>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.projectlombok</groupId>
  52. <artifactId>lombok</artifactId>
  53. <version>1.18.28</version>
  54. <scope>provided</scope>
  55. </dependency>
  56. </dependencies>
  57. <build>
  58. <extensions>
  59. <extension>
  60. <groupId>kr.motd.maven</groupId>
  61. <artifactId>os-maven-plugin</artifactId>
  62. <version>1.7.1</version>
  63. </extension>
  64. </extensions>
  65. <plugins>
  66. <plugin>
  67. <groupId>org.xolstice.maven.plugins</groupId>
  68. <artifactId>protobuf-maven-plugin</artifactId>
  69. <version>0.6.1</version>
  70. <configuration>
  71. <protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact>
  72. <pluginId>grpc-java</pluginId>
  73. <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.57.2:exe:${os.detected.classifier}</pluginArtifact>
  74. </configuration>
  75. <executions>
  76. <execution>
  77. <goals>
  78. <goal>compile</goal>
  79. <goal>compile-custom</goal>
  80. </goals>
  81. </execution>
  82. </executions>
  83. </plugin>
  84. </plugins>
  85. </build>
  86. </project>

在项目main目录下新建proto目录,在proto目录下新建 helloword.proto文件



  1. // Copyright 2019 The gRPC Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. syntax = "proto3";
  15. option java_multiple_files = true;
  16. option java_package = "io.grpc.examples.helloworld";
  17. option java_outer_classname = "HelloWorldProto";
  18. option objc_class_prefix = "HLW";
  19. package helloworld;
  20. // The greeting service definition.
  21. service Greeter {
  22. // Sends a greeting
  23. rpc SayHello (HelloRequest) returns (HelloReply) {}
  24. rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
  25. }
  26. // The request message containing the user's name.
  27. message HelloRequest {
  28. string name = 1;
  29. }
  30. // The response message containing the greetings
  31. message HelloReply {
  32. string message = 1;
  33. }



proto文件创建完成后,使用 protobuf maven插件生成 protobuf 代码和 grpc 代码

在IDEA编辑器中打开maven project 面板,找到 protobuf:compile 和 protobuf:compile-custom

运行 protobuf:compile 生成 protobuf 代码

运行 protobuf:compile-custom 生成 grpc 代码





在上面新建的项目上新建包名 com.wsjzzcbq.grpc.basic

新建服务端 server 代码

  1. package com.wsjzzcbq.grpc.basic;
  2. import io.grpc.Grpc;
  3. import io.grpc.InsecureServerCredentials;
  4. import io.grpc.Server;
  5. import io.grpc.examples.helloworld.GreeterGrpc;
  6. import io.grpc.examples.helloworld.HelloReply;
  7. import io.grpc.examples.helloworld.HelloRequest;
  8. import io.grpc.stub.StreamObserver;
  9. import java.io.IOException;
  10. /**
  11. * GrpcServer
  12. *
  13. * @author wsjz
  14. * @date 2023/08/22
  15. */
  16. public class GrpcServer {
  17. public static void main(String[] args) throws IOException, InterruptedException {
  18. int port = 50051;
  19. Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
  20. .addService(new GreeterImpl())
  21. .build()
  22. .start();
  23. Runtime.getRuntime().addShutdownHook(new Thread(()->{
  24. // Use stderr here since the logger may have been reset by its JVM shutdown hook.
  25. System.err.println("*** shutting down gRPC server since JVM is shutting down");
  26. stopServer(server);
  27. System.err.println("*** server shut down");
  28. }));
  29. server.awaitTermination();
  30. }
  31. private static void stopServer(Server server) {
  32. if (server != null) {
  33. server.shutdown();
  34. }
  35. }
  36. static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
  37. @Override
  38. public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
  39. HelloReply reply = HelloReply.newBuilder().setMessage("你好!山外青山楼外楼, " + req.getName()).build();
  40. responseObserver.onNext(reply);
  41. responseObserver.onCompleted();
  42. }
  43. @Override
  44. public void sayHelloAgain(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
  45. HelloReply reply = HelloReply.newBuilder().setMessage("你好,行到水穷处,坐看云起时 " + req.getName()).build();
  46. responseObserver.onNext(reply);
  47. responseObserver.onCompleted();
  48. }
  49. }
  50. }

再建客户端 client 代码

  1. package com.wsjzzcbq.grpc.basic;
  2. import io.grpc.Grpc;
  3. import io.grpc.InsecureChannelCredentials;
  4. import io.grpc.ManagedChannel;
  5. import io.grpc.examples.helloworld.GreeterGrpc;
  6. import io.grpc.examples.helloworld.HelloReply;
  7. import io.grpc.examples.helloworld.HelloRequest;
  8. import java.util.concurrent.TimeUnit;
  9. /**
  10. * GrpcClient
  11. *
  12. * @author wsjz
  13. * @date 2023/08/22
  14. */
  15. public class GrpcClient {
  16. public static void main(String[] args) throws InterruptedException {
  17. String host = "localhost";
  18. int port = 50051;
  19. ManagedChannel managedChannel = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create()).build();
  20. GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(managedChannel);
  21. HelloRequest helloRequest = HelloRequest.newBuilder().setName("沧海月明珠有泪").build();
  22. HelloReply reply = blockingStub.sayHello(helloRequest);
  23. System.out.println(reply.getMessage());
  24. HelloReply response = blockingStub.sayHelloAgain(helloRequest);
  25. System.out.println(response.getMessage());
  26. managedChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
  27. }
  28. }


先启动服务端 GrpcServer,在启动客户端 GrpcClient



拦截器 GrpcInterceptor 代码

  1. package com.wsjzzcbq.grpc.basic;
  2. import io.grpc.*;
  3. import java.util.UUID;
  4. /**
  5. * GrpcInterceptor
  6. *
  7. * @author wsjz
  8. * @date 2023/08/22
  9. */
  10. public class GrpcInterceptor implements ServerInterceptor {
  11. @Override
  12. public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
  13. System.out.println("拦截");
  14. //将uudi字符串放入context中,后面处理可从context中获取
  15. Context ctx = Context.current().withValue(Constant.USER_TOKEN, UUID.randomUUID().toString());
  16. return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
  17. }
  18. }

添加常量代码 Constant

  1. package com.wsjzzcbq.grpc.basic;
  2. import io.grpc.Context;
  3. /**
  4. * Constant
  5. *
  6. * @author wsjz
  7. * @date 2023/08/22
  8. */
  9. public class Constant {
  10. public static final Context.Key<String> USER_TOKEN = Context.key("userToken");
  11. }

修改服务端 server 代码

  1. package com.wsjzzcbq.grpc.basic;
  2. import io.grpc.Grpc;
  3. import io.grpc.InsecureServerCredentials;
  4. import io.grpc.Server;
  5. import io.grpc.examples.helloworld.GreeterGrpc;
  6. import io.grpc.examples.helloworld.HelloReply;
  7. import io.grpc.examples.helloworld.HelloRequest;
  8. import io.grpc.stub.StreamObserver;
  9. import java.io.IOException;
  10. /**
  11. * GrpcServer
  12. *
  13. * @author wsjz
  14. * @date 2023/08/22
  15. */
  16. public class GrpcServer {
  17. public static void main(String[] args) throws IOException, InterruptedException {
  18. int port = 50051;
  19. Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
  20. .addService(new GreeterImpl())
  21. .intercept(new GrpcInterceptor())
  22. .build()
  23. .start();
  24. Runtime.getRuntime().addShutdownHook(new Thread(()->{
  25. // Use stderr here since the logger may have been reset by its JVM shutdown hook.
  26. System.err.println("*** shutting down gRPC server since JVM is shutting down");
  27. stopServer(server);
  28. System.err.println("*** server shut down");
  29. }));
  30. server.awaitTermination();
  31. }
  32. private static void stopServer(Server server) {
  33. if (server != null) {
  34. server.shutdown();
  35. }
  36. }
  37. static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
  38. @Override
  39. public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
  40. //context中获取
  41. String userToken = Constant.USER_TOKEN.get();
  42. System.out.println(userToken);
  43. HelloReply reply = HelloReply.newBuilder().setMessage("你好!山外青山楼外楼, " + userToken + " " + req.getName()).build();
  44. responseObserver.onNext(reply);
  45. responseObserver.onCompleted();
  46. }
  47. @Override
  48. public void sayHelloAgain(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
  49. HelloReply reply = HelloReply.newBuilder().setMessage("你好,行到水穷处,坐看云起时 " + req.getName()).build();
  50. responseObserver.onNext(reply);
  51. responseObserver.onCompleted();
  52. }
  53. }
  54. }







新建com.wsjzzcbq.grpc.loadbalance 包,用来存放负载均衡的代码

新建 ExampleNameResolver 类

  1. package com.wsjzzcbq.grpc.loadbalance;
  2. import com.google.common.collect.ImmutableMap;
  3. import io.grpc.EquivalentAddressGroup;
  4. import io.grpc.NameResolver;
  5. import io.grpc.Status;
  6. import java.net.InetSocketAddress;
  7. import java.net.SocketAddress;
  8. import java.net.URI;
  9. import java.util.Arrays;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.stream.Collectors;
  13. import java.util.stream.Stream;
  14. /**
  15. * ExampleNameResolver
  16. *
  17. * @author wsjz
  18. * @date 2023/08/22
  19. */
  20. public class ExampleNameResolver extends NameResolver{
  21. private Listener2 listener;
  22. private final URI uri;
  23. private final Map<String,List<InetSocketAddress>> addrStore;
  24. public ExampleNameResolver(URI targetUri) {
  25. this.uri = targetUri;
  26. // This is a fake name resolver, so we just hard code the address here.
  27. addrStore = ImmutableMap.<String, List<InetSocketAddress>>builder()
  28. .put(LoadBalanceClient.exampleServiceName,
  29. Stream.iterate(LoadBalanceServer.startPort, p->p+1)
  30. .limit(LoadBalanceServer.serverCount)
  31. .map(port->new InetSocketAddress("localhost",port))
  32. .collect(Collectors.toList())
  33. )
  34. .build();
  35. }
  36. @Override
  37. public String getServiceAuthority() {
  38. // Be consistent with behavior in grpc-go, authority is saved in Host field of URI.
  39. if (uri.getHost() != null) {
  40. return uri.getHost();
  41. }
  42. return "no host";
  43. }
  44. @Override
  45. public void shutdown() {
  46. }
  47. @Override
  48. public void start(Listener2 listener) {
  49. this.listener = listener;
  50. this.resolve();
  51. }
  52. @Override
  53. public void refresh() {
  54. this.resolve();
  55. }
  56. private void resolve() {
  57. List<InetSocketAddress> addresses = addrStore.get(uri.getPath().substring(1));
  58. try {
  59. List<EquivalentAddressGroup> equivalentAddressGroup = addresses.stream()
  60. // convert to socket address
  61. .map(this::toSocketAddress)
  62. // every socket address is a single EquivalentAddressGroup, so they can be accessed randomly
  63. .map(Arrays::asList)
  64. .map(this::addrToEquivalentAddressGroup)
  65. .collect(Collectors.toList());
  66. ResolutionResult resolutionResult = ResolutionResult.newBuilder()
  67. .setAddresses(equivalentAddressGroup)
  68. .build();
  69. this.listener.onResult(resolutionResult);
  70. } catch (Exception e){
  71. // when error occurs, notify listener
  72. this.listener.onError(Status.UNAVAILABLE.withDescription("Unable to resolve host ").withCause(e));
  73. }
  74. }
  75. private SocketAddress toSocketAddress(InetSocketAddress address) {
  76. return new InetSocketAddress(address.getHostName(), address.getPort());
  77. }
  78. private EquivalentAddressGroup addrToEquivalentAddressGroup(List<SocketAddress> addrList) {
  79. return new EquivalentAddressGroup(addrList);
  80. }
  81. }

新建 ExampleNameResolverProvider 类

  1. package com.wsjzzcbq.grpc.loadbalance;
  2. import io.grpc.NameResolver;
  3. import io.grpc.NameResolverProvider;
  4. import java.net.URI;
  5. /**
  6. * ExampleNameResolverProvider
  7. *
  8. * @author wsjz
  9. * @date 2023/08/22
  10. */
  11. public class ExampleNameResolverProvider extends NameResolverProvider {
  12. @Override
  13. public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
  14. return new ExampleNameResolver(targetUri);
  15. }
  16. @Override
  17. protected boolean isAvailable() {
  18. return true;
  19. }
  20. @Override
  21. protected int priority() {
  22. return 5;
  23. }
  24. @Override
  25. // gRPC choose the first NameResolverProvider that supports the target URI scheme.
  26. public String getDefaultScheme() {
  27. return LoadBalanceClient.exampleScheme;
  28. }
  29. }

新建服务端 LoadBalanceServer 类

  1. package com.wsjzzcbq.grpc.loadbalance;
  2. import io.grpc.Server;
  3. import io.grpc.ServerBuilder;
  4. import io.grpc.examples.helloworld.GreeterGrpc;
  5. import io.grpc.examples.helloworld.HelloReply;
  6. import io.grpc.examples.helloworld.HelloRequest;
  7. import io.grpc.stub.StreamObserver;
  8. import java.io.IOException;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * LoadBalanceServer
  12. *
  13. * @author wsjz
  14. * @date 2023/08/22
  15. */
  16. public class LoadBalanceServer {
  17. public static final int serverCount = 3;
  18. public static final int startPort = 50051;
  19. public static void main(String[] args) throws IOException, InterruptedException {
  20. //创建3个server
  21. Server[] servers = new Server[3];
  22. for (int i=0; i<serverCount; i++) {
  23. int port = startPort + i;
  24. servers[i] = ServerBuilder.forPort(port)
  25. .addService(new GreeterImpl(port))
  26. .build()
  27. .start();
  28. }
  29. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  30. System.err.println("*** shutting down gRPC server since JVM is shutting down");
  31. try {
  32. stop(servers);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace(System.err);
  35. }
  36. System.err.println("*** server shut down");
  37. }));
  38. blockUntilShutdown(servers);
  39. }
  40. private static void blockUntilShutdown(Server[] servers) throws InterruptedException {
  41. for (int i = 0; i < serverCount; i++) {
  42. if (servers[i] != null) {
  43. servers[i].awaitTermination();
  44. }
  45. }
  46. }
  47. private static void stop(Server[] servers) throws InterruptedException {
  48. for (int i = 0; i < serverCount; i++) {
  49. if (servers[i] != null) {
  50. servers[i].shutdown().awaitTermination(30, TimeUnit.SECONDS);
  51. }
  52. }
  53. }
  54. static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
  55. int port;
  56. public GreeterImpl(int port) {
  57. this.port = port;
  58. }
  59. @Override
  60. public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
  61. HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName() + " from server<" + this.port + ">").build();
  62. responseObserver.onNext(reply);
  63. responseObserver.onCompleted();
  64. }
  65. @Override
  66. public void sayHelloAgain(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
  67. HelloReply reply = HelloReply.newBuilder()
  68. .setMessage("可上九天揽月,可下五洋捉鳖" + request.getName() + "server port " + this.port)
  69. .build();
  70. responseObserver.onNext(reply);
  71. responseObserver.onCompleted();
  72. }
  73. }
  74. }

新建客户端 LoadBalanceClient 类

  1. package com.wsjzzcbq.grpc.loadbalance;
  2. import io.grpc.ManagedChannel;
  3. import io.grpc.ManagedChannelBuilder;
  4. import io.grpc.NameResolverRegistry;
  5. import io.grpc.StatusRuntimeException;
  6. import io.grpc.examples.helloworld.GreeterGrpc;
  7. import io.grpc.examples.helloworld.HelloReply;
  8. import io.grpc.examples.helloworld.HelloRequest;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * LoadBalanceClient
  12. *
  13. * @author wsjz
  14. * @date 2023/08/22
  15. */
  16. public class LoadBalanceClient {
  17. public static final String exampleScheme = "example";
  18. public static final String exampleServiceName = "lb.example.grpc.io";
  19. public static void main(String[] args) throws InterruptedException {
  20. NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider());
  21. String target = String.format("%s:///%s", exampleScheme, exampleServiceName);
  22. first_pickPolicy(target);
  23. System.out.println("更改发送策略");
  24. roundRobinPolicy(target);
  25. }
  26. /**
  27. * 使用first_pick策略发送消息
  28. * @param target
  29. */
  30. private static void first_pickPolicy(String target) throws InterruptedException {
  31. ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
  32. .usePlaintext()
  33. .build();
  34. try {
  35. GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);
  36. //连发5条消息
  37. for (int i = 0; i < 5; i++) {
  38. greet(blockingStub, "request" + i);
  39. }
  40. } finally {
  41. channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
  42. }
  43. }
  44. /**
  45. * 使用round_robin策略发送消息
  46. * @param target
  47. * @throws InterruptedException
  48. */
  49. private static void roundRobinPolicy(String target) throws InterruptedException {
  50. //使用round_robin策略发送消息
  51. ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
  52. .defaultLoadBalancingPolicy("round_robin")
  53. .usePlaintext()
  54. .build();
  55. try {
  56. GreeterGrpc.GreeterBlockingStub blockingStub2 = GreeterGrpc.newBlockingStub(channel);
  57. for (int i = 0; i < 6; i++) {
  58. greet2(blockingStub2, "request" + i);
  59. }
  60. } finally {
  61. channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
  62. }
  63. }
  64. public static void greet(GreeterGrpc.GreeterBlockingStub blockingStub, String name) {
  65. HelloRequest request = HelloRequest.newBuilder().setName(name).build();
  66. HelloReply response;
  67. try {
  68. response = blockingStub.sayHello(request);
  69. System.out.println("Greeting返回: " + response.getMessage());
  70. } catch (StatusRuntimeException e) {
  71. System.out.println(e.getStatus());
  72. return;
  73. }
  74. }
  75. public static void greet2(GreeterGrpc.GreeterBlockingStub blockingStub, String name) {
  76. HelloRequest request = HelloRequest.newBuilder().setName(name).build();
  77. HelloReply response;
  78. try {
  79. response = blockingStub.sayHelloAgain(request);
  80. System.out.println("Greeting返回: " + response.getMessage());
  81. } catch (StatusRuntimeException e) {
  82. System.out.println(e.getStatus());
  83. return;
  84. }
  85. }
  86. }


客户端代码先使用 first_pick 策略发送5次请求,之后再用 round_robin 策略发送6次请求

看打印 ip 情况


grpc自带健康检查功能,需要添加依赖 grpc-services

  1. <dependency>
  2. <groupId>io.grpc</groupId>
  3. <artifactId>grpc-services</artifactId>
  4. <version>1.57.2</version>
  5. </dependency>


新建包 com.wsjzzcbq.grpc.healthservice

新建服务端 HealthServiceServer

  1. package com.wsjzzcbq.grpc.healthservice;
  2. import io.grpc.Grpc;
  3. import io.grpc.InsecureServerCredentials;
  4. import io.grpc.Server;
  5. import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
  6. import io.grpc.protobuf.services.HealthStatusManager;
  7. import java.io.IOException;
  8. import java.time.LocalDateTime;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * HealthServiceServer
  12. *
  13. * @author wsjz
  14. * @date 2023/08/22
  15. */
  16. public class HealthServiceServer {
  17. public static void main(String[] args) throws IOException, InterruptedException {
  18. int port = 50051;
  19. //健康状态管理
  20. HealthStatusManager health = new HealthStatusManager();
  21. Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
  22. .addService(health.getHealthService())
  23. .build()
  24. .start();
  25. changeHealthStatus(health);
  26. Runtime.getRuntime().addShutdownHook(new Thread(()->{
  27. System.err.println("*** shutting down gRPC server since JVM is shutting down");
  28. try {
  29. stopServer(server);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace(System.err);
  32. }
  33. System.err.println("*** server shut down");
  34. }));
  35. blockUntilShutdown(server);
  36. }
  37. private static void stopServer(Server server) throws InterruptedException {
  38. if (server != null) {
  39. server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
  40. }
  41. }
  42. private static void blockUntilShutdown(Server server) throws InterruptedException {
  43. if (server != null) {
  44. server.awaitTermination();
  45. }
  46. }
  47. private static void changeHealthStatus(HealthStatusManager health) {
  48. System.out.println(LocalDateTime.now() + "服务可用");
  49. new Thread(()->{
  50. try {
  51. TimeUnit.SECONDS.sleep(8);
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. //3秒钟后健康状态改为不能提供服务
  56. health.setStatus("", ServingStatus.NOT_SERVING);
  57. System.out.println(LocalDateTime.now() + "修改为服务不可用");
  58. try {
  59. TimeUnit.SECONDS.sleep(10);
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. //再过3秒修改健康状态为可服务
  64. health.setStatus("", ServingStatus.SERVING);
  65. System.out.println(LocalDateTime.now() + "修改为服务可用");
  66. }).start();
  67. }
  68. }

代码说明, changeHealthStatus 方法,会在服务端启动8秒钟后将健康状态改为不能服务状态,之后再过10秒再将服务状态改为可以服务状态,也就是说服务端启动8秒后,会有10秒健康状态是不能服务

新建客户端 HealthServiceClient

  1. package com.wsjzzcbq.grpc.healthservice;
  2. import io.grpc.Grpc;
  3. import io.grpc.InsecureChannelCredentials;
  4. import io.grpc.ManagedChannel;
  5. import io.grpc.ManagedChannelBuilder;
  6. import io.grpc.health.v1.HealthCheckRequest;
  7. import io.grpc.health.v1.HealthCheckResponse;
  8. import io.grpc.health.v1.HealthGrpc;
  9. import io.grpc.stub.StreamObserver;
  10. import java.time.LocalDateTime;
  11. import java.util.concurrent.CountDownLatch;
  12. import java.util.concurrent.TimeUnit;
  13. /**
  14. * HealthServiceClient
  15. *
  16. * @author wsjz
  17. * @date 2023/08/22
  18. */
  19. public class HealthServiceClient {
  20. public static void main(String[] args) throws InterruptedException {
  21. String target = "localhost:50051";
  22. ManagedChannelBuilder<?> builder = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create());
  23. ManagedChannel channel = builder.build();
  24. //阻塞调用
  25. HealthGrpc.HealthBlockingStub healthBlockingStub = HealthGrpc.newBlockingStub(channel);
  26. blockingStub(healthBlockingStub);
  27. //非阻塞调用
  28. // HealthGrpc.HealthStub healthStub = HealthGrpc.newStub(channel);
  29. // CountDownLatch countDownLatch = new CountDownLatch(1);
  30. // healthStub(healthStub, countDownLatch);
  31. // countDownLatch.await();
  32. }
  33. /**
  34. * 同步阻塞调用
  35. * @param healthBlockingStub
  36. * @throws InterruptedException
  37. */
  38. private static void blockingStub(HealthGrpc.HealthBlockingStub healthBlockingStub) throws InterruptedException {
  39. HealthCheckRequest healthCheckRequest = HealthCheckRequest.getDefaultInstance();
  40. HealthCheckResponse response = healthBlockingStub.check(healthCheckRequest);
  41. System.out.println(LocalDateTime.now() + ":" + response.getStatus());
  42. TimeUnit.SECONDS.sleep(8);
  43. HealthCheckResponse response2 = healthBlockingStub.check(healthCheckRequest);
  44. System.out.println(LocalDateTime.now() + ":" + response2.getStatus());
  45. TimeUnit.SECONDS.sleep(18);
  46. HealthCheckResponse response3 = healthBlockingStub.check(healthCheckRequest);
  47. System.out.println(LocalDateTime.now() + ":" + response3.getStatus());
  48. }
  49. /**
  50. * 异步调用
  51. * @param healthStub
  52. * @param countDownLatch
  53. */
  54. private static void healthStub(HealthGrpc.HealthStub healthStub, CountDownLatch countDownLatch) {
  55. HealthCheckRequest healthCheckRequest = HealthCheckRequest.getDefaultInstance();
  56. healthStub.check(healthCheckRequest, new StreamObserver<HealthCheckResponse>() {
  57. @Override
  58. public void onNext(HealthCheckResponse value) {
  59. System.out.println("onNext");
  60. System.out.println(value);
  61. }
  62. @Override
  63. public void onError(Throwable t) {
  64. System.out.println("onError");
  65. }
  66. @Override
  67. public void onCompleted() {
  68. System.out.println("onCompleted");
  69. countDownLatch.countDown();
  70. }
  71. });
  72. }
  73. }






HealthStatusManager 说明

其实这里的健康检查本质上是一个grpc服务,和前面写的 GreeterImpl 是一样的,只是这个是官网实现的,如果不想用,也可以自己实现一个,感兴趣的读者可以看看官方代码

下面是 HealthStatusManager 的代码截图

HealthStatusManager 的 getHealthService方法返回一个 HealthServiceImpl 对象

HealthServiceImpl 的代码截图

可以看到 HealthServiceImpl 继承自 HealthGrpc.HealthImplBase



新建包 com.wsjzzcbq.grpc.retrying

服务端 RetryingHelloWorldServer 代码

  1. package com.wsjzzcbq.grpc.retrying;
  2. import io.grpc.Grpc;
  3. import io.grpc.InsecureServerCredentials;
  4. import io.grpc.Server;
  5. import io.grpc.Status;
  6. import io.grpc.examples.helloworld.GreeterGrpc;
  7. import io.grpc.examples.helloworld.HelloReply;
  8. import io.grpc.examples.helloworld.HelloRequest;
  9. import io.grpc.stub.StreamObserver;
  10. import java.io.IOException;
  11. import java.util.concurrent.TimeUnit;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13. /**
  14. * RetryingHelloWorldServer
  15. *
  16. * @author wsjz
  17. * @date 2023/08/23
  18. */
  19. public class RetryingHelloWorldServer {
  20. public static void main(String[] args) throws IOException, InterruptedException {
  21. int port = 50051;
  22. Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
  23. .addService(new GreeterImpl())
  24. .build()
  25. .start();
  26. Runtime.getRuntime().addShutdownHook(new Thread(()->{
  27. try {
  28. stopServer(server);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }));
  33. blockUntilShutdown(server);
  34. }
  35. private static void stopServer(Server server) throws InterruptedException {
  36. if (server != null) {
  37. server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
  38. }
  39. }
  40. private static void blockUntilShutdown(Server server) throws InterruptedException {
  41. if (server != null) {
  42. server.awaitTermination();
  43. }
  44. }
  45. static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
  46. AtomicInteger retryCounter = new AtomicInteger(0);
  47. @Override
  48. public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
  49. int count = retryCounter.incrementAndGet();
  50. System.out.println("调用次数:" + count);
  51. responseObserver.onError(Status.UNAVAILABLE
  52. .withDescription("Greeter temporarily unavailable..." + request.getName()).asRuntimeException());
  53. }
  54. }
  55. }


这里失败重试需要一些配置项,看下面 json文件配置

  1. {
  2. "methodConfig": [
  3. {
  4. "name": [
  5. {
  6. "service": "helloworld.Greeter",
  7. "method": "SayHello"
  8. }
  9. ],
  10. "retryPolicy": {
  11. "maxAttempts": 5,
  12. "initialBackoff": "0.5s",
  13. "maxBackoff": "30s",
  14. "backoffMultiplier": 2,
  15. "retryableStatusCodes": [
  17. ]
  18. }
  19. }
  20. ]
  21. }

maxAttempts 最大重试次数

retryableStatusCodes 进行重试的状态码

上面的 json 配置需要客户端创建 ManagedChannel 时配置,可以通过代码读取 json文件转成map传进去,或者改成代码配置,笔者改成了在代码中配置

添加 json 依赖

  1. <dependency>
  2. <groupId>com.alibaba.fastjson2</groupId>
  3. <artifactId>fastjson2</artifactId>
  4. <version>2.0.39</version>
  5. </dependency>


  1. package com.wsjzzcbq.grpc.retrying;
  2. import com.alibaba.fastjson2.JSONArray;
  3. import com.alibaba.fastjson2.JSONObject;
  4. import io.grpc.*;
  5. import io.grpc.examples.helloworld.GreeterGrpc;
  6. import io.grpc.examples.helloworld.HelloReply;
  7. import io.grpc.examples.helloworld.HelloRequest;
  8. /**
  9. * RetryingHelloWorldClient
  10. *
  11. * @author wsjz
  12. * @date 2023/08/23
  13. */
  14. public class RetryingHelloWorldClient {
  15. public static void main(String[] args) {
  16. String host = "localhost";
  17. int port = 50051;
  18. ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create())
  19. //添加重试配置
  20. .defaultServiceConfig(methodConfig())
  21. .enableRetry();
  22. ManagedChannel channel = channelBuilder.build();
  23. GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);
  24. HelloRequest request = HelloRequest.newBuilder().setName("平生无长物,独往有深情").build();
  25. HelloReply response = null;
  26. try {
  27. response = blockingStub.sayHello(request);
  28. System.out.println(response.getMessage());
  29. } catch (StatusRuntimeException e) {
  30. System.out.println("异常");
  31. e.printStackTrace();
  32. }
  33. }
  34. private static JSONObject methodConfig() {
  35. JSONArray name = new JSONArray();
  36. JSONObject nameItem = new JSONObject();
  37. nameItem.put("service", "helloworld.Greeter");
  38. nameItem.put("method", "SayHello");
  39. name.add(nameItem);
  40. JSONObject retryPolicy = new JSONObject();
  41. retryPolicy.put("maxAttempts", "5");
  42. retryPolicy.put("initialBackoff", "0.5s");
  43. retryPolicy.put("maxBackoff", "30s");
  44. retryPolicy.put("backoffMultiplier", "2");
  45. retryPolicy.put("retryableStatusCodes", JSONArray.of("UNAVAILABLE"));
  46. JSONObject methodConfigItem = new JSONObject();
  47. methodConfigItem.put("name", name);
  48. methodConfigItem.put("retryPolicy", retryPolicy);
  49. JSONObject methodConfig = new JSONObject();
  50. methodConfig.put("methodConfig", JSONArray.of(methodConfigItem));
  51. return methodConfig;
  52. }
  53. }

在 methodConfig 方法中配置重试策略






  1. package com.wsjzzcbq.grpc.retrying;
  2. import io.grpc.Grpc;
  3. import io.grpc.InsecureServerCredentials;
  4. import io.grpc.Server;
  5. import io.grpc.Status;
  6. import io.grpc.examples.helloworld.GreeterGrpc;
  7. import io.grpc.examples.helloworld.HelloReply;
  8. import io.grpc.examples.helloworld.HelloRequest;
  9. import io.grpc.stub.StreamObserver;
  10. import java.io.IOException;
  11. import java.util.concurrent.TimeUnit;
  12. import java.util.concurrent.atomic.AtomicInteger;
  13. /**
  14. * RetryingHelloWorldServer
  15. *
  16. * @author wsjz
  17. * @date 2023/08/23
  18. */
  19. public class RetryingHelloWorldServer {
  20. public static void main(String[] args) throws IOException, InterruptedException {
  21. int port = 50051;
  22. Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
  23. .addService(new GreeterImpl())
  24. .build()
  25. .start();
  26. Runtime.getRuntime().addShutdownHook(new Thread(()->{
  27. try {
  28. stopServer(server);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }));
  33. blockUntilShutdown(server);
  34. }
  35. private static void stopServer(Server server) throws InterruptedException {
  36. if (server != null) {
  37. server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
  38. }
  39. }
  40. private static void blockUntilShutdown(Server server) throws InterruptedException {
  41. if (server != null) {
  42. server.awaitTermination();
  43. }
  44. }
  45. static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
  46. AtomicInteger retryCounter = new AtomicInteger(0);
  47. @Override
  48. public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
  49. int count = retryCounter.incrementAndGet();
  50. System.out.println("调用次数:" + count);
  51. if (count <=4) {
  52. responseObserver.onError(Status.UNAVAILABLE
  53. .withDescription("Greeter temporarily unavailable..." + request.getName()).asRuntimeException());
  54. } else {
  55. HelloReply reply = HelloReply.newBuilder().setMessage("瘦影自怜秋水照,卿须怜我我怜卿").build();
  56. responseObserver.onNext(reply);
  57. responseObserver.onCompleted();
  58. }
  59. }
  60. }
  61. }




