当前位置:   article > 正文

grpc java io通信模型_io.grpc.managedchannel

io.grpc.managedchannel
1 gRPC 服务调用
gRPC 的通信协议基于标准的 HTTP/2 设计,主要提供了两种 RPC 调用方式:
1   普通 RPC 调用方式,即请求 - 响应模式。

2  基于 HTTP/2.0 的 streaming 调用方式。



1.1 普通 RPC 调用
普通的 RPC 调用提供了三种实现方式:
1  同步阻塞式服务调用,通常实现类是 xxxBlockingStub(基于 proto 定义生成)。
2  异步非阻塞调用,基于 Future-Listener 机制,通常实现类是 xxxFutureStub。

3  异步非阻塞调用,基于 Reactive(Async) 的响应式编程模式,通常实现类是 xxxStub。


普通 rpc demo 如下:

  1. import java.util.concurrent.TimeUnit;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import com.ylifegroup.protobuf.PhoneServiceGrpc;
  5. import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
  6. import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
  7. import com.ylifegroup.protobuf.Phonebook.PhoneType;
  8. import io.grpc.ManagedChannel;
  9. import io.grpc.ManagedChannelBuilder;
  10. import io.grpc.StatusRuntimeException;
  11. /**
  12. * @describe GRpcClient Block demo
  13. * @author zhikai.chen
  14. * @date 2018年5月7日 下午4:00:58
  15. */
  16. public class GRpcClientBlock {
  17. private static final Logger logger = LoggerFactory.getLogger(GRpcClientBlock.class);
  18. private final ManagedChannel channel;
  19. private final PhoneServiceGrpc.PhoneServiceBlockingStub blockingStub;
  20. /** Construct client connecting to gRPC server at {@code host:port}. */
  21. public GRpcClientBlock(String host, int port) {
  22. ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
  23. channel = channelBuilder.build();
  24. blockingStub = PhoneServiceGrpc.newBlockingStub(channel);
  25. }
  26. public void shutdown() throws InterruptedException {
  27. channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  28. }
  29. /** add phone to user. */
  30. public void addPhoneToUser(int uid, PhoneType phoneType, String phoneNubmer) {
  31. logger.info("Will try to add phone to user " + uid);
  32. AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
  33. .setPhoneNumber(phoneNubmer).build();
  34. AddPhoneToUserResponse response;
  35. try {
  36. response = blockingStub.addPhoneToUser(request);
  37. } catch (StatusRuntimeException e) {
  38. logger.warn("RPC failed: {0} --> "+e.getLocalizedMessage(), e.getStatus());
  39. return;
  40. }
  41. logger.info("Result: " + response.getResult());
  42. }
  43. public static void main(String[] args) throws Exception {
  44. GRpcClientBlock client = new GRpcClientBlock("localhost", 50051);
  45. try {
  46. client.addPhoneToUser(1, PhoneType.WORK, "13888888888");
  47. } finally {
  48. client.shutdown();
  49. }
  50. }
  51. }


Future 模型demo如下:

  1. import java.util.concurrent.Executors;
  2. import java.util.concurrent.TimeUnit;
  3. import javax.annotation.Nullable;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import com.google.common.util.concurrent.FutureCallback;
  7. import com.google.common.util.concurrent.Futures;
  8. import com.ylifegroup.protobuf.PhoneServiceGrpc;
  9. import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
  10. import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
  11. import com.ylifegroup.protobuf.Phonebook.PhoneType;
  12. import io.grpc.ManagedChannel;
  13. import io.grpc.ManagedChannelBuilder;
  14. /**
  15. * @describe GRpcClient Future demo
  16. * @author zhikai.chen
  17. * @date 2018年5月7日 下午4:00:58
  18. */
  19. public class GRpcClientFuture {
  20. private static final Logger logger = LoggerFactory.getLogger(GRpcClientFuture.class);
  21. private final ManagedChannel channel;
  22. private final PhoneServiceGrpc.PhoneServiceFutureStub futureStub;
  23. /** Construct client connecting to gRPC server at {@code host:port}. */
  24. public GRpcClientFuture(String host, int port) {
  25. ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
  26. channel = channelBuilder.build();
  27. futureStub = PhoneServiceGrpc.newFutureStub(channel);
  28. }
  29. public void shutdown() throws InterruptedException {
  30. channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  31. }
  32. /** add phone to user. */
  33. public void addPhoneToUserFuture1(int uid, PhoneType phoneType, String phoneNubmer) {
  34. logger.info("Will try to add phone to user " + uid);
  35. AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
  36. .setPhoneNumber(phoneNubmer).build();
  37. try {
  38. com.google.common.util.concurrent.ListenableFuture<AddPhoneToUserResponse>
  39. listenableFuture = futureStub.addPhoneToUser(request);
  40. Futures.addCallback(listenableFuture, new FutureCallback<AddPhoneToUserResponse>() {
  41. @Override
  42. public void onSuccess(@Nullable AddPhoneToUserResponse result) {
  43. logger.info("result: " + result.getResult());
  44. }
  45. @Override
  46. public void onFailure(Throwable t) {
  47. logger.warn(t.getMessage());
  48. }
  49. });
  50. } catch (Exception e) {
  51. logger.warn("RPC failed: {0}", e);
  52. return;
  53. }
  54. }
  55. /** add phone to user. */
  56. public void addPhoneToUserFuture2(int uid, PhoneType phoneType, String phoneNubmer) {
  57. logger.info("Will try to add phone to user " + uid);
  58. AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
  59. .setPhoneNumber(phoneNubmer).build();
  60. try {
  61. com.google.common.util.concurrent.ListenableFuture<AddPhoneToUserResponse>
  62. listenableFuture = futureStub.addPhoneToUser(request);
  63. listenableFuture.addListener(()->
  64. {
  65. try {
  66. AddPhoneToUserResponse response = listenableFuture.get();
  67. logger.info("result: " + response.getResult());
  68. }
  69. catch(Exception e)
  70. {
  71. e.printStackTrace();
  72. }
  73. }, Executors.newFixedThreadPool(1));
  74. } catch (Exception e) {
  75. logger.warn("RPC failed: {0}", e);
  76. return;
  77. }
  78. }
  79. public static void main(String[] args) throws Exception {
  80. GRpcClientFuture client = new GRpcClientFuture("localhost", 50051);
  81. try {
  82. client.addPhoneToUserFuture1(1, PhoneType.WORK, "13888888888");
  83. client.addPhoneToUserFuture2(2, PhoneType.WORK, "13888888888");
  84. TimeUnit.SECONDS.sleep(3);
  85. //TODO 这个模式需要自己手动退出
  86. Runtime.getRuntime().exit(0);
  87. } finally {
  88. client.shutdown();
  89. }
  90. }
  91. }

Reactive(Async)模型demo如下:

  1. import java.util.concurrent.TimeUnit;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import com.ylifegroup.protobuf.PhoneServiceGrpc;
  5. import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
  6. import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
  7. import com.ylifegroup.protobuf.Phonebook.PhoneType;
  8. import io.grpc.ManagedChannel;
  9. import io.grpc.ManagedChannelBuilder;
  10. /**
  11. * @describe GRpcClient Async demo
  12. * @author zhikai.chen
  13. * @date 2018年5月7日 下午4:00:58
  14. */
  15. public class GRpcClientAsync {
  16. private static final Logger logger = LoggerFactory.getLogger(GRpcClientAsync.class);
  17. private final ManagedChannel channel;
  18. private final PhoneServiceGrpc.PhoneServiceStub stub;
  19. /** Construct client connecting to gRPC server at {@code host:port}. */
  20. public GRpcClientAsync(String host, int port) {
  21. ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
  22. channel = channelBuilder.build();
  23. stub = PhoneServiceGrpc.newStub(channel);
  24. }
  25. public void shutdown() throws InterruptedException {
  26. channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  27. }
  28. /** add phone to user. */
  29. public void addPhoneToUserAsync(int uid, PhoneType phoneType, String phoneNubmer) {
  30. logger.info("Will try to add phone to user " + uid);
  31. AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
  32. .setPhoneNumber(phoneNubmer).build();
  33. io.grpc.stub.StreamObserver<AddPhoneToUserResponse> responseObserver =
  34. new io.grpc.stub.StreamObserver<AddPhoneToUserResponse>()
  35. {
  36. public void onNext(AddPhoneToUserResponse response)
  37. {
  38. logger.info("Result: " + response.getResult());
  39. }
  40. public void onError(Throwable t){
  41. logger.warn(t.getMessage());
  42. }
  43. public void onCompleted(){}
  44. };
  45. stub.addPhoneToUser(request,responseObserver);
  46. }
  47. public static void main(String[] args) throws Exception {
  48. GRpcClientAsync client = new GRpcClientAsync("localhost", 50051);
  49. try {
  50. client.addPhoneToUserAsync(1, PhoneType.WORK, "13888888888");
  51. TimeUnit.SECONDS.sleep(5);
  52. } finally {
  53. client.shutdown();
  54. }
  55. }
  56. }


1.2 Streaming 模式服务调用
基于 HTTP/2.0,gRPC 提供了三种 streaming 模式:
1  服务端 streaming
2  客户端 streaming

3  服务端和客户端双向 streaming

服务端streaming demo如下:

  1. import static java.lang.Math.atan2;
  2. import static java.lang.Math.cos;
  3. import static java.lang.Math.max;
  4. import static java.lang.Math.min;
  5. import static java.lang.Math.sin;
  6. import static java.lang.Math.sqrt;
  7. import static java.lang.Math.toRadians;
  8. import static java.util.concurrent.TimeUnit.NANOSECONDS;
  9. import io.grpc.Server;
  10. import io.grpc.ServerBuilder;
  11. import io.grpc.examples.routeguide.Feature;
  12. import io.grpc.examples.routeguide.Point;
  13. import io.grpc.examples.routeguide.Rectangle;
  14. import io.grpc.examples.routeguide.RouteGuideGrpc;
  15. import io.grpc.examples.routeguide.RouteNote;
  16. import io.grpc.examples.routeguide.RouteSummary;
  17. import io.grpc.stub.StreamObserver;
  18. import java.io.IOException;
  19. import java.net.URL;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.Collections;
  23. import java.util.List;
  24. import java.util.concurrent.ConcurrentHashMap;
  25. import java.util.concurrent.ConcurrentMap;
  26. import java.util.logging.Level;
  27. import java.util.logging.Logger;
  28. /**
  29. * A sample gRPC server that serve the RouteGuide (see route_guide.proto) service.
  30. */
  31. public class RouteGuideServer {
  32. private static final Logger logger = Logger.getLogger(RouteGuideServer.class.getName());
  33. private final int port;
  34. private final Server server;
  35. public RouteGuideServer(int port) throws IOException {
  36. this(port, RouteGuideUtil.getDefaultFeaturesFile());
  37. }
  38. /** Create a RouteGuide server listening on {@code port} using {@code featureFile} database. */
  39. public RouteGuideServer(int port, URL featureFile) throws IOException {
  40. this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
  41. }
  42. /** Create a RouteGuide server using serverBuilder as a base and features as data. */
  43. public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  44. this.port = port;
  45. server = serverBuilder.addService(new RouteGuideService(features))
  46. .build();
  47. }
  48. /** Start serving requests. */
  49. public void start() throws IOException {
  50. server.start();
  51. logger.info("Server started, listening on " + port);
  52. Runtime.getRuntime().addShutdownHook(new Thread() {
  53. @Override
  54. public void run() {
  55. // Use stderr here since the logger may has been reset by its JVM shutdown hook.
  56. System.err.println("*** shutting down gRPC server since JVM is shutting down");
  57. RouteGuideServer.this.stop();
  58. System.err.println("*** server shut down");
  59. }
  60. });
  61. }
  62. /** Stop serving requests and shutdown resources. */
  63. public void stop() {
  64. if (server != null) {
  65. server.shutdown();
  66. }
  67. }
  68. /**
  69. * Await termination on the main thread since the grpc library uses daemon threads.
  70. */
  71. private void blockUntilShutdown() throws InterruptedException {
  72. if (server != null) {
  73. server.awaitTermination();
  74. }
  75. }
  76. /**
  77. * Main method. This comment makes the linter happy.
  78. */
  79. public static void main(String[] args) throws Exception {
  80. RouteGuideServer server = new RouteGuideServer(8980);
  81. server.start();
  82. server.blockUntilShutdown();
  83. }
  84. /**
  85. * Our implementation of RouteGuide service.
  86. *
  87. * <p>See route_guide.proto for details of the methods.
  88. */
  89. private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
  90. private final Collection<Feature> features;
  91. private final ConcurrentMap<Point, List<RouteNote>> routeNotes =
  92. new ConcurrentHashMap<Point, List<RouteNote>>();
  93. RouteGuideService(Collection<Feature> features) {
  94. this.features = features;
  95. }
  96. /**
  97. * Gets the {@link Feature} at the requested {@link Point}. If no feature at that location
  98. * exists, an unnamed feature is returned at the provided location.
  99. *
  100. * @param request the requested location for the feature.
  101. * @param responseObserver the observer that will receive the feature at the requested point.
  102. */
  103. @Override
  104. public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  105. responseObserver.onNext(checkFeature(request));
  106. responseObserver.onCompleted();
  107. }
  108. /**
  109. * Gets all features contained within the given bounding {@link Rectangle}.
  110. *
  111. * @param request the bounding rectangle for the requested features.
  112. * @param responseObserver the observer that will receive the features.
  113. */
  114. @Override
  115. public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  116. int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  117. int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  118. int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  119. int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
  120. for (Feature feature : features) {
  121. if (!RouteGuideUtil.exists(feature)) {
  122. continue;
  123. }
  124. int lat = feature.getLocation().getLatitude();
  125. int lon = feature.getLocation().getLongitude();
  126. if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
  127. responseObserver.onNext(feature);
  128. }
  129. }
  130. responseObserver.onCompleted();
  131. }
  132. /**
  133. * Gets a stream of points, and responds with statistics about the "trip": number of points,
  134. * number of known features visited, total distance traveled, and total time spent.
  135. *
  136. * @param responseObserver an observer to receive the response summary.
  137. * @return an observer to receive the requested route points.
  138. */
  139. @Override
  140. public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  141. return new StreamObserver<Point>() {
  142. int pointCount;
  143. int featureCount;
  144. int distance;
  145. Point previous;
  146. final long startTime = System.nanoTime();
  147. @Override
  148. public void onNext(Point point) {
  149. pointCount++;
  150. if (RouteGuideUtil.exists(checkFeature(point))) {
  151. featureCount++;
  152. }
  153. // For each point after the first, add the incremental distance from the previous point to
  154. // the total distance value.
  155. if (previous != null) {
  156. distance += calcDistance(previous, point);
  157. }
  158. previous = point;
  159. }
  160. @Override
  161. public void onError(Throwable t) {
  162. logger.log(Level.WARNING, "recordRoute cancelled");
  163. }
  164. @Override
  165. public void onCompleted() {
  166. long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
  167. responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
  168. .setFeatureCount(featureCount).setDistance(distance)
  169. .setElapsedTime((int) seconds).build());
  170. responseObserver.onCompleted();
  171. }
  172. };
  173. }
  174. /**
  175. * Receives a stream of message/location pairs, and responds with a stream of all previous
  176. * messages at each of those locations.
  177. *
  178. * @param responseObserver an observer to receive the stream of previous messages.
  179. * @return an observer to handle requested message/location pairs.
  180. */
  181. @Override
  182. public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  183. return new StreamObserver<RouteNote>() {
  184. @Override
  185. public void onNext(RouteNote note) {
  186. List<RouteNote> notes = getOrCreateNotes(note.getLocation());
  187. // Respond with all previous notes at this location.
  188. for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
  189. responseObserver.onNext(prevNote);
  190. }
  191. // Now add the new note to the list
  192. notes.add(note);
  193. }
  194. @Override
  195. public void onError(Throwable t) {
  196. logger.log(Level.WARNING, "routeChat cancelled");
  197. }
  198. @Override
  199. public void onCompleted() {
  200. responseObserver.onCompleted();
  201. }
  202. };
  203. }
  204. /**
  205. * Get the notes list for the given location. If missing, create it.
  206. */
  207. private List<RouteNote> getOrCreateNotes(Point location) {
  208. List<RouteNote> notes = Collections.synchronizedList(new ArrayList<RouteNote>());
  209. List<RouteNote> prevNotes = routeNotes.putIfAbsent(location, notes);
  210. return prevNotes != null ? prevNotes : notes;
  211. }
  212. /**
  213. * Gets the feature at the given point.
  214. *
  215. * @param location the location to check.
  216. * @return The feature object at the point. Note that an empty name indicates no feature.
  217. */
  218. private Feature checkFeature(Point location) {
  219. for (Feature feature : features) {
  220. if (feature.getLocation().getLatitude() == location.getLatitude()
  221. && feature.getLocation().getLongitude() == location.getLongitude()) {
  222. return feature;
  223. }
  224. }
  225. // No feature was found, return an unnamed feature.
  226. return Feature.newBuilder().setName("").setLocation(location).build();
  227. }
  228. /**
  229. * Calculate the distance between two points using the "haversine" formula.
  230. * The formula is based on http://mathforum.org/library/drmath/view/51879.html.
  231. *
  232. * @param start The starting point
  233. * @param end The end point
  234. * @return The distance between the points in meters
  235. */
  236. private static int calcDistance(Point start, Point end) {
  237. int r = 6371000; // earth radius in meters
  238. double lat1 = toRadians(RouteGuideUtil.getLatitude(start));
  239. double lat2 = toRadians(RouteGuideUtil.getLatitude(end));
  240. double lon1 = toRadians(RouteGuideUtil.getLongitude(start));
  241. double lon2 = toRadians(RouteGuideUtil.getLongitude(end));
  242. double deltaLat = lat2 - lat1;
  243. double deltaLon = lon2 - lon1;
  244. double a = sin(deltaLat / 2) * sin(deltaLat / 2)
  245. + cos(lat1) * cos(lat2) * sin(deltaLon / 2) * sin(deltaLon / 2);
  246. double c = 2 * atan2(sqrt(a), sqrt(1 - a));
  247. return (int) (r * c);
  248. }
  249. }
  250. }


客户端streaming demo如下:

  1. import com.google.common.annotations.VisibleForTesting;
  2. import com.google.protobuf.Message;
  3. import io.grpc.ManagedChannel;
  4. import io.grpc.ManagedChannelBuilder;
  5. import io.grpc.Status;
  6. import io.grpc.StatusRuntimeException;
  7. import io.grpc.examples.routeguide.Feature;
  8. import io.grpc.examples.routeguide.Point;
  9. import io.grpc.examples.routeguide.Rectangle;
  10. import io.grpc.examples.routeguide.RouteGuideGrpc;
  11. import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideBlockingStub;
  12. import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideStub;
  13. import io.grpc.examples.routeguide.RouteNote;
  14. import io.grpc.examples.routeguide.RouteSummary;
  15. import io.grpc.stub.StreamObserver;
  16. import java.io.IOException;
  17. import java.util.Iterator;
  18. import java.util.List;
  19. import java.util.Random;
  20. import java.util.concurrent.CountDownLatch;
  21. import java.util.concurrent.TimeUnit;
  22. import java.util.logging.Level;
  23. import java.util.logging.Logger;
  24. /**
  25. * Sample client code that makes gRPC calls to the server.
  26. */
  27. public class RouteGuideClient {
  28. private static final Logger logger = Logger.getLogger(RouteGuideClient.class.getName());
  29. private final ManagedChannel channel;
  30. private final RouteGuideBlockingStub blockingStub;
  31. private final RouteGuideStub asyncStub;
  32. private Random random = new Random();
  33. private TestHelper testHelper;
  34. /** Construct client for accessing RouteGuide server at {@code host:port}. */
  35. public RouteGuideClient(String host, int port) {
  36. this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
  37. }
  38. /** Construct client for accessing RouteGuide server using the existing channel. */
  39. public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
  40. channel = channelBuilder.build();
  41. blockingStub = RouteGuideGrpc.newBlockingStub(channel);
  42. asyncStub = RouteGuideGrpc.newStub(channel);
  43. }
  44. public void shutdown() throws InterruptedException {
  45. channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
  46. }
  47. /**
  48. * Blocking unary call example. Calls getFeature and prints the response.
  49. */
  50. public void getFeature(int lat, int lon) {
  51. info("*** GetFeature: lat={0} lon={1}", lat, lon);
  52. Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
  53. Feature feature;
  54. try {
  55. feature = blockingStub.getFeature(request);
  56. if (testHelper != null) {
  57. testHelper.onMessage(feature);
  58. }
  59. } catch (StatusRuntimeException e) {
  60. warning("RPC failed: {0}", e.getStatus());
  61. if (testHelper != null) {
  62. testHelper.onRpcError(e);
  63. }
  64. return;
  65. }
  66. if (RouteGuideUtil.exists(feature)) {
  67. info("Found feature called \"{0}\" at {1}, {2}",
  68. feature.getName(),
  69. RouteGuideUtil.getLatitude(feature.getLocation()),
  70. RouteGuideUtil.getLongitude(feature.getLocation()));
  71. } else {
  72. info("Found no feature at {0}, {1}",
  73. RouteGuideUtil.getLatitude(feature.getLocation()),
  74. RouteGuideUtil.getLongitude(feature.getLocation()));
  75. }
  76. }
  77. /**
  78. * Blocking server-streaming example. Calls listFeatures with a rectangle of interest. Prints each
  79. * response feature as it arrives.
  80. */
  81. public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
  82. info("*** ListFeatures: lowLat={0} lowLon={1} hiLat={2} hiLon={3}", lowLat, lowLon, hiLat,
  83. hiLon);
  84. Rectangle request =
  85. Rectangle.newBuilder()
  86. .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
  87. .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
  88. Iterator<Feature> features;
  89. try {
  90. features = blockingStub.listFeatures(request);
  91. for (int i = 1; features.hasNext(); i++) {
  92. Feature feature = features.next();
  93. info("Result #" + i + ": {0}", feature);
  94. if (testHelper != null) {
  95. testHelper.onMessage(feature);
  96. }
  97. }
  98. } catch (StatusRuntimeException e) {
  99. warning("RPC failed: {0}", e.getStatus());
  100. if (testHelper != null) {
  101. testHelper.onRpcError(e);
  102. }
  103. }
  104. }
  105. /**
  106. * Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code
  107. * features} with a variable delay in between. Prints the statistics when they are sent from the
  108. * server.
  109. */
  110. public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  111. info("*** RecordRoute");
  112. final CountDownLatch finishLatch = new CountDownLatch(1);
  113. StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
  114. @Override
  115. public void onNext(RouteSummary summary) {
  116. info("Finished trip with {0} points. Passed {1} features. "
  117. + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
  118. summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
  119. if (testHelper != null) {
  120. testHelper.onMessage(summary);
  121. }
  122. }
  123. @Override
  124. public void onError(Throwable t) {
  125. warning("RecordRoute Failed: {0}", Status.fromThrowable(t));
  126. if (testHelper != null) {
  127. testHelper.onRpcError(t);
  128. }
  129. finishLatch.countDown();
  130. }
  131. @Override
  132. public void onCompleted() {
  133. info("Finished RecordRoute");
  134. finishLatch.countDown();
  135. }
  136. };
  137. StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  138. try {
  139. // Send numPoints points randomly selected from the features list.
  140. for (int i = 0; i < numPoints; ++i) {
  141. int index = random.nextInt(features.size());
  142. Point point = features.get(index).getLocation();
  143. info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
  144. RouteGuideUtil.getLongitude(point));
  145. requestObserver.onNext(point);
  146. // Sleep for a bit before sending the next one.
  147. Thread.sleep(random.nextInt(1000) + 500);
  148. if (finishLatch.getCount() == 0) {
  149. // RPC completed or errored before we finished sending.
  150. // Sending further requests won't error, but they will just be thrown away.
  151. return;
  152. }
  153. }
  154. } catch (RuntimeException e) {
  155. // Cancel RPC
  156. requestObserver.onError(e);
  157. throw e;
  158. }
  159. // Mark the end of requests
  160. requestObserver.onCompleted();
  161. // Receiving happens asynchronously
  162. if (!finishLatch.await(1, TimeUnit.MINUTES)) {
  163. warning("recordRoute can not finish within 1 minutes");
  164. }
  165. }
  166. /**
  167. * Bi-directional example, which can only be asynchronous. Send some chat messages, and print any
  168. * chat messages that are sent from the server.
  169. */
  170. public CountDownLatch routeChat() {
  171. info("*** RouteChat");
  172. final CountDownLatch finishLatch = new CountDownLatch(1);
  173. StreamObserver<RouteNote> requestObserver =
  174. asyncStub.routeChat(new StreamObserver<RouteNote>() {
  175. @Override
  176. public void onNext(RouteNote note) {
  177. info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
  178. .getLatitude(), note.getLocation().getLongitude());
  179. if (testHelper != null) {
  180. testHelper.onMessage(note);
  181. }
  182. }
  183. @Override
  184. public void onError(Throwable t) {
  185. warning("RouteChat Failed: {0}", Status.fromThrowable(t));
  186. if (testHelper != null) {
  187. testHelper.onRpcError(t);
  188. }
  189. finishLatch.countDown();
  190. }
  191. @Override
  192. public void onCompleted() {
  193. info("Finished RouteChat");
  194. finishLatch.countDown();
  195. }
  196. });
  197. try {
  198. RouteNote[] requests =
  199. {newNote("First message", 0, 0), newNote("Second message", 0, 1),
  200. newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
  201. for (RouteNote request : requests) {
  202. info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
  203. .getLatitude(), request.getLocation().getLongitude());
  204. requestObserver.onNext(request);
  205. }
  206. } catch (RuntimeException e) {
  207. // Cancel RPC
  208. requestObserver.onError(e);
  209. throw e;
  210. }
  211. // Mark the end of requests
  212. requestObserver.onCompleted();
  213. // return the latch while receiving happens asynchronously
  214. return finishLatch;
  215. }
  216. /** Issues several different requests and then exits. */
  217. public static void main(String[] args) throws InterruptedException {
  218. List<Feature> features;
  219. try {
  220. features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
  221. } catch (IOException ex) {
  222. ex.printStackTrace();
  223. return;
  224. }
  225. RouteGuideClient client = new RouteGuideClient("localhost", 8980);
  226. try {
  227. // Looking for a valid feature
  228. client.getFeature(409146138, -746188906);
  229. // Feature missing.
  230. client.getFeature(0, 0);
  231. // Looking for features between 40, -75 and 42, -73.
  232. client.listFeatures(400000000, -750000000, 420000000, -730000000);
  233. // Record a few randomly selected points from the features file.
  234. client.recordRoute(features, 10);
  235. // Send and receive some notes.
  236. CountDownLatch finishLatch = client.routeChat();
  237. if (!finishLatch.await(1, TimeUnit.MINUTES)) {
  238. client.warning("routeChat can not finish within 1 minutes");
  239. }
  240. } finally {
  241. client.shutdown();
  242. }
  243. }
  244. private void info(String msg, Object... params) {
  245. logger.log(Level.INFO, msg, params);
  246. }
  247. private void warning(String msg, Object... params) {
  248. logger.log(Level.WARNING, msg, params);
  249. }
  250. private RouteNote newNote(String message, int lat, int lon) {
  251. return RouteNote.newBuilder().setMessage(message)
  252. .setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build();
  253. }
  254. /**
  255. * Only used for unit test, as we do not want to introduce randomness in unit test.
  256. */
  257. @VisibleForTesting
  258. void setRandom(Random random) {
  259. this.random = random;
  260. }
  261. /**
  262. * Only used for helping unit test.
  263. */
  264. @VisibleForTesting
  265. interface TestHelper {
  266. /**
  267. * Used for verify/inspect message received from server.
  268. */
  269. void onMessage(Message message);
  270. /**
  271. * Used for verify/inspect error received from server.
  272. */
  273. void onRpcError(Throwable exception);
  274. }
  275. @VisibleForTesting
  276. void setTestHelper(TestHelper testHelper) {
  277. this.testHelper = testHelper;
  278. }
  279. }

服务端和客户端双向 streaming,就是上面两个结合了


2 总结

  gRPC 服务调用支持同步和异步方式,同时也支持普通的 RPC 和 streaming 模式,可以最大程度满足业务的需求。 对于 streaming 模式,可以充分利用 HTTP/2.0 协议的多路复用功能,实现在一条 HTTP 链路上并行双向传输数据,有效的解决了 HTTP/1.X 的数据单向传输问题,在大幅减少 HTTP 连接的情况下,充分利用单条链路的性能,可以媲美传统的 RPC 私有长连接协议:更少的链路、更高的性能。

  gRPC 的网络 I/O 通信基于 Netty 构建,服务调用底层统一使用异步方式,同步调用是在异步的基础上做了上层封装。因此,gRPC 的异步化是比较彻底的,对于提升 I/O 密集型业务的吞吐量和可靠性有很大的帮助。


 下一章我会为大家讲解grpc关于SSL部分。



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/382442
推荐阅读
相关标签
  

闽ICP备14008679号