赞
踩
2 基于 HTTP/2.0 的 streaming 调用方式。
3 异步非阻塞调用,基于 Reactive(Async) 的响应式编程模式,通常实现类是 xxxStub。
普通 rpc demo 如下:
- import java.util.concurrent.TimeUnit;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import com.ylifegroup.protobuf.PhoneServiceGrpc;
- import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
- import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
- import com.ylifegroup.protobuf.Phonebook.PhoneType;
-
- import io.grpc.ManagedChannel;
- import io.grpc.ManagedChannelBuilder;
- import io.grpc.StatusRuntimeException;
-
- /**
- * @describe GRpcClient Block demo
- * @author zhikai.chen
- * @date 2018年5月7日 下午4:00:58
- */
- public class GRpcClientBlock {
-
- private static final Logger logger = LoggerFactory.getLogger(GRpcClientBlock.class);
-
- private final ManagedChannel channel;
-
- private final PhoneServiceGrpc.PhoneServiceBlockingStub blockingStub;
-
- /** Construct client connecting to gRPC server at {@code host:port}. */
- public GRpcClientBlock(String host, int port) {
- ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
- channel = channelBuilder.build();
- blockingStub = PhoneServiceGrpc.newBlockingStub(channel);
- }
-
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /** add phone to user. */
- public void addPhoneToUser(int uid, PhoneType phoneType, String phoneNubmer) {
- logger.info("Will try to add phone to user " + uid);
- AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
- .setPhoneNumber(phoneNubmer).build();
- AddPhoneToUserResponse response;
- try {
- response = blockingStub.addPhoneToUser(request);
- } catch (StatusRuntimeException e) {
- logger.warn("RPC failed: {0} --> "+e.getLocalizedMessage(), e.getStatus());
- return;
- }
- logger.info("Result: " + response.getResult());
- }
-
- public static void main(String[] args) throws Exception {
- GRpcClientBlock client = new GRpcClientBlock("localhost", 50051);
- try {
- client.addPhoneToUser(1, PhoneType.WORK, "13888888888");
- } finally {
- client.shutdown();
- }
- }
-
- }

Future 模型demo如下:
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
-
- import javax.annotation.Nullable;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import com.google.common.util.concurrent.FutureCallback;
- import com.google.common.util.concurrent.Futures;
- import com.ylifegroup.protobuf.PhoneServiceGrpc;
- import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
- import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
- import com.ylifegroup.protobuf.Phonebook.PhoneType;
-
- import io.grpc.ManagedChannel;
- import io.grpc.ManagedChannelBuilder;
-
- /**
- * @describe GRpcClient Future demo
- * @author zhikai.chen
- * @date 2018年5月7日 下午4:00:58
- */
- public class GRpcClientFuture {
-
- private static final Logger logger = LoggerFactory.getLogger(GRpcClientFuture.class);
-
- private final ManagedChannel channel;
-
- private final PhoneServiceGrpc.PhoneServiceFutureStub futureStub;
-
- /** Construct client connecting to gRPC server at {@code host:port}. */
- public GRpcClientFuture(String host, int port) {
- ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
- channel = channelBuilder.build();
- futureStub = PhoneServiceGrpc.newFutureStub(channel);
- }
-
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /** add phone to user. */
- public void addPhoneToUserFuture1(int uid, PhoneType phoneType, String phoneNubmer) {
- logger.info("Will try to add phone to user " + uid);
- AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
- .setPhoneNumber(phoneNubmer).build();
-
- try {
- com.google.common.util.concurrent.ListenableFuture<AddPhoneToUserResponse>
- listenableFuture = futureStub.addPhoneToUser(request);
- Futures.addCallback(listenableFuture, new FutureCallback<AddPhoneToUserResponse>() {
- @Override
- public void onSuccess(@Nullable AddPhoneToUserResponse result) {
- logger.info("result: " + result.getResult());
- }
- @Override
- public void onFailure(Throwable t) {
- logger.warn(t.getMessage());
- }
- });
- } catch (Exception e) {
- logger.warn("RPC failed: {0}", e);
- return;
- }
- }
-
- /** add phone to user. */
- public void addPhoneToUserFuture2(int uid, PhoneType phoneType, String phoneNubmer) {
- logger.info("Will try to add phone to user " + uid);
- AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
- .setPhoneNumber(phoneNubmer).build();
-
- try {
- com.google.common.util.concurrent.ListenableFuture<AddPhoneToUserResponse>
- listenableFuture = futureStub.addPhoneToUser(request);
- listenableFuture.addListener(()->
- {
- try {
- AddPhoneToUserResponse response = listenableFuture.get();
- logger.info("result: " + response.getResult());
- }
- catch(Exception e)
- {
- e.printStackTrace();
- }
- }, Executors.newFixedThreadPool(1));
- } catch (Exception e) {
- logger.warn("RPC failed: {0}", e);
- return;
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- GRpcClientFuture client = new GRpcClientFuture("localhost", 50051);
- try {
- client.addPhoneToUserFuture1(1, PhoneType.WORK, "13888888888");
- client.addPhoneToUserFuture2(2, PhoneType.WORK, "13888888888");
- TimeUnit.SECONDS.sleep(3);
- //TODO 这个模式需要自己手动退出
- Runtime.getRuntime().exit(0);
- } finally {
- client.shutdown();
- }
- }
-
- }

Reactive(Async)模型demo如下:
- import java.util.concurrent.TimeUnit;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import com.ylifegroup.protobuf.PhoneServiceGrpc;
- import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserRequest;
- import com.ylifegroup.protobuf.Phonebook.AddPhoneToUserResponse;
- import com.ylifegroup.protobuf.Phonebook.PhoneType;
-
- import io.grpc.ManagedChannel;
- import io.grpc.ManagedChannelBuilder;
-
- /**
- * @describe GRpcClient Async demo
- * @author zhikai.chen
- * @date 2018年5月7日 下午4:00:58
- */
- public class GRpcClientAsync {
-
- private static final Logger logger = LoggerFactory.getLogger(GRpcClientAsync.class);
-
- private final ManagedChannel channel;
-
- private final PhoneServiceGrpc.PhoneServiceStub stub;
-
- /** Construct client connecting to gRPC server at {@code host:port}. */
- public GRpcClientAsync(String host, int port) {
- ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext();
- channel = channelBuilder.build();
- stub = PhoneServiceGrpc.newStub(channel);
- }
-
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /** add phone to user. */
- public void addPhoneToUserAsync(int uid, PhoneType phoneType, String phoneNubmer) {
- logger.info("Will try to add phone to user " + uid);
- AddPhoneToUserRequest request = AddPhoneToUserRequest.newBuilder().setUid(uid).setPhoneType(phoneType)
- .setPhoneNumber(phoneNubmer).build();
-
- io.grpc.stub.StreamObserver<AddPhoneToUserResponse> responseObserver =
- new io.grpc.stub.StreamObserver<AddPhoneToUserResponse>()
- {
- public void onNext(AddPhoneToUserResponse response)
- {
- logger.info("Result: " + response.getResult());
- }
- public void onError(Throwable t){
- logger.warn(t.getMessage());
- }
- public void onCompleted(){}
- };
- stub.addPhoneToUser(request,responseObserver);
- }
-
- public static void main(String[] args) throws Exception {
- GRpcClientAsync client = new GRpcClientAsync("localhost", 50051);
- try {
- client.addPhoneToUserAsync(1, PhoneType.WORK, "13888888888");
- TimeUnit.SECONDS.sleep(5);
- } finally {
- client.shutdown();
- }
- }
-
- }

3 服务端和客户端双向 streaming
服务端streaming demo如下:
- import static java.lang.Math.atan2;
- import static java.lang.Math.cos;
- import static java.lang.Math.max;
- import static java.lang.Math.min;
- import static java.lang.Math.sin;
- import static java.lang.Math.sqrt;
- import static java.lang.Math.toRadians;
- import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
- import io.grpc.Server;
- import io.grpc.ServerBuilder;
- import io.grpc.examples.routeguide.Feature;
- import io.grpc.examples.routeguide.Point;
- import io.grpc.examples.routeguide.Rectangle;
- import io.grpc.examples.routeguide.RouteGuideGrpc;
- import io.grpc.examples.routeguide.RouteNote;
- import io.grpc.examples.routeguide.RouteSummary;
- import io.grpc.stub.StreamObserver;
- import java.io.IOException;
- import java.net.URL;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentMap;
- import java.util.logging.Level;
- import java.util.logging.Logger;
-
- /**
- * A sample gRPC server that serve the RouteGuide (see route_guide.proto) service.
- */
- public class RouteGuideServer {
- private static final Logger logger = Logger.getLogger(RouteGuideServer.class.getName());
-
- private final int port;
- private final Server server;
-
- public RouteGuideServer(int port) throws IOException {
- this(port, RouteGuideUtil.getDefaultFeaturesFile());
- }
-
- /** Create a RouteGuide server listening on {@code port} using {@code featureFile} database. */
- public RouteGuideServer(int port, URL featureFile) throws IOException {
- this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
- }
-
- /** Create a RouteGuide server using serverBuilder as a base and features as data. */
- public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
- this.port = port;
- server = serverBuilder.addService(new RouteGuideService(features))
- .build();
- }
-
- /** Start serving requests. */
- public void start() throws IOException {
- server.start();
- logger.info("Server started, listening on " + port);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- // Use stderr here since the logger may has been reset by its JVM shutdown hook.
- System.err.println("*** shutting down gRPC server since JVM is shutting down");
- RouteGuideServer.this.stop();
- System.err.println("*** server shut down");
- }
- });
- }
-
- /** Stop serving requests and shutdown resources. */
- public void stop() {
- if (server != null) {
- server.shutdown();
- }
- }
-
- /**
- * Await termination on the main thread since the grpc library uses daemon threads.
- */
- private void blockUntilShutdown() throws InterruptedException {
- if (server != null) {
- server.awaitTermination();
- }
- }
-
- /**
- * Main method. This comment makes the linter happy.
- */
- public static void main(String[] args) throws Exception {
- RouteGuideServer server = new RouteGuideServer(8980);
- server.start();
- server.blockUntilShutdown();
- }
-
- /**
- * Our implementation of RouteGuide service.
- *
- * <p>See route_guide.proto for details of the methods.
- */
- private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
- private final Collection<Feature> features;
- private final ConcurrentMap<Point, List<RouteNote>> routeNotes =
- new ConcurrentHashMap<Point, List<RouteNote>>();
-
- RouteGuideService(Collection<Feature> features) {
- this.features = features;
- }
-
- /**
- * Gets the {@link Feature} at the requested {@link Point}. If no feature at that location
- * exists, an unnamed feature is returned at the provided location.
- *
- * @param request the requested location for the feature.
- * @param responseObserver the observer that will receive the feature at the requested point.
- */
- @Override
- public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
- responseObserver.onNext(checkFeature(request));
- responseObserver.onCompleted();
- }
-
- /**
- * Gets all features contained within the given bounding {@link Rectangle}.
- *
- * @param request the bounding rectangle for the requested features.
- * @param responseObserver the observer that will receive the features.
- */
- @Override
- public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
- int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
- int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
- int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
- int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
-
- for (Feature feature : features) {
- if (!RouteGuideUtil.exists(feature)) {
- continue;
- }
-
- int lat = feature.getLocation().getLatitude();
- int lon = feature.getLocation().getLongitude();
- if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
- responseObserver.onNext(feature);
- }
- }
- responseObserver.onCompleted();
- }
-
- /**
- * Gets a stream of points, and responds with statistics about the "trip": number of points,
- * number of known features visited, total distance traveled, and total time spent.
- *
- * @param responseObserver an observer to receive the response summary.
- * @return an observer to receive the requested route points.
- */
- @Override
- public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
- return new StreamObserver<Point>() {
- int pointCount;
- int featureCount;
- int distance;
- Point previous;
- final long startTime = System.nanoTime();
-
- @Override
- public void onNext(Point point) {
- pointCount++;
- if (RouteGuideUtil.exists(checkFeature(point))) {
- featureCount++;
- }
- // For each point after the first, add the incremental distance from the previous point to
- // the total distance value.
- if (previous != null) {
- distance += calcDistance(previous, point);
- }
- previous = point;
- }
-
- @Override
- public void onError(Throwable t) {
- logger.log(Level.WARNING, "recordRoute cancelled");
- }
-
- @Override
- public void onCompleted() {
- long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
- responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
- .setFeatureCount(featureCount).setDistance(distance)
- .setElapsedTime((int) seconds).build());
- responseObserver.onCompleted();
- }
- };
- }
-
- /**
- * Receives a stream of message/location pairs, and responds with a stream of all previous
- * messages at each of those locations.
- *
- * @param responseObserver an observer to receive the stream of previous messages.
- * @return an observer to handle requested message/location pairs.
- */
- @Override
- public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
- return new StreamObserver<RouteNote>() {
- @Override
- public void onNext(RouteNote note) {
- List<RouteNote> notes = getOrCreateNotes(note.getLocation());
-
- // Respond with all previous notes at this location.
- for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
- responseObserver.onNext(prevNote);
- }
-
- // Now add the new note to the list
- notes.add(note);
- }
-
- @Override
- public void onError(Throwable t) {
- logger.log(Level.WARNING, "routeChat cancelled");
- }
-
- @Override
- public void onCompleted() {
- responseObserver.onCompleted();
- }
- };
- }
-
- /**
- * Get the notes list for the given location. If missing, create it.
- */
- private List<RouteNote> getOrCreateNotes(Point location) {
- List<RouteNote> notes = Collections.synchronizedList(new ArrayList<RouteNote>());
- List<RouteNote> prevNotes = routeNotes.putIfAbsent(location, notes);
- return prevNotes != null ? prevNotes : notes;
- }
-
- /**
- * Gets the feature at the given point.
- *
- * @param location the location to check.
- * @return The feature object at the point. Note that an empty name indicates no feature.
- */
- private Feature checkFeature(Point location) {
- for (Feature feature : features) {
- if (feature.getLocation().getLatitude() == location.getLatitude()
- && feature.getLocation().getLongitude() == location.getLongitude()) {
- return feature;
- }
- }
-
- // No feature was found, return an unnamed feature.
- return Feature.newBuilder().setName("").setLocation(location).build();
- }
-
- /**
- * Calculate the distance between two points using the "haversine" formula.
- * The formula is based on http://mathforum.org/library/drmath/view/51879.html.
- *
- * @param start The starting point
- * @param end The end point
- * @return The distance between the points in meters
- */
- private static int calcDistance(Point start, Point end) {
- int r = 6371000; // earth radius in meters
- double lat1 = toRadians(RouteGuideUtil.getLatitude(start));
- double lat2 = toRadians(RouteGuideUtil.getLatitude(end));
- double lon1 = toRadians(RouteGuideUtil.getLongitude(start));
- double lon2 = toRadians(RouteGuideUtil.getLongitude(end));
- double deltaLat = lat2 - lat1;
- double deltaLon = lon2 - lon1;
-
- double a = sin(deltaLat / 2) * sin(deltaLat / 2)
- + cos(lat1) * cos(lat2) * sin(deltaLon / 2) * sin(deltaLon / 2);
- double c = 2 * atan2(sqrt(a), sqrt(1 - a));
-
- return (int) (r * c);
- }
- }
- }

客户端streaming demo如下:
- import com.google.common.annotations.VisibleForTesting;
- import com.google.protobuf.Message;
- import io.grpc.ManagedChannel;
- import io.grpc.ManagedChannelBuilder;
- import io.grpc.Status;
- import io.grpc.StatusRuntimeException;
- import io.grpc.examples.routeguide.Feature;
- import io.grpc.examples.routeguide.Point;
- import io.grpc.examples.routeguide.Rectangle;
- import io.grpc.examples.routeguide.RouteGuideGrpc;
- import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideBlockingStub;
- import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideStub;
- import io.grpc.examples.routeguide.RouteNote;
- import io.grpc.examples.routeguide.RouteSummary;
- import io.grpc.stub.StreamObserver;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Random;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
- import java.util.logging.Level;
- import java.util.logging.Logger;
-
- /**
- * Sample client code that makes gRPC calls to the server.
- */
- public class RouteGuideClient {
- private static final Logger logger = Logger.getLogger(RouteGuideClient.class.getName());
-
- private final ManagedChannel channel;
- private final RouteGuideBlockingStub blockingStub;
- private final RouteGuideStub asyncStub;
-
- private Random random = new Random();
- private TestHelper testHelper;
-
- /** Construct client for accessing RouteGuide server at {@code host:port}. */
- public RouteGuideClient(String host, int port) {
- this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
- }
-
- /** Construct client for accessing RouteGuide server using the existing channel. */
- public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
- channel = channelBuilder.build();
- blockingStub = RouteGuideGrpc.newBlockingStub(channel);
- asyncStub = RouteGuideGrpc.newStub(channel);
- }
-
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /**
- * Blocking unary call example. Calls getFeature and prints the response.
- */
- public void getFeature(int lat, int lon) {
- info("*** GetFeature: lat={0} lon={1}", lat, lon);
-
- Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
-
- Feature feature;
- try {
- feature = blockingStub.getFeature(request);
- if (testHelper != null) {
- testHelper.onMessage(feature);
- }
- } catch (StatusRuntimeException e) {
- warning("RPC failed: {0}", e.getStatus());
- if (testHelper != null) {
- testHelper.onRpcError(e);
- }
- return;
- }
- if (RouteGuideUtil.exists(feature)) {
- info("Found feature called \"{0}\" at {1}, {2}",
- feature.getName(),
- RouteGuideUtil.getLatitude(feature.getLocation()),
- RouteGuideUtil.getLongitude(feature.getLocation()));
- } else {
- info("Found no feature at {0}, {1}",
- RouteGuideUtil.getLatitude(feature.getLocation()),
- RouteGuideUtil.getLongitude(feature.getLocation()));
- }
- }
-
- /**
- * Blocking server-streaming example. Calls listFeatures with a rectangle of interest. Prints each
- * response feature as it arrives.
- */
- public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
- info("*** ListFeatures: lowLat={0} lowLon={1} hiLat={2} hiLon={3}", lowLat, lowLon, hiLat,
- hiLon);
-
- Rectangle request =
- Rectangle.newBuilder()
- .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
- .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
- Iterator<Feature> features;
- try {
- features = blockingStub.listFeatures(request);
- for (int i = 1; features.hasNext(); i++) {
- Feature feature = features.next();
- info("Result #" + i + ": {0}", feature);
- if (testHelper != null) {
- testHelper.onMessage(feature);
- }
- }
- } catch (StatusRuntimeException e) {
- warning("RPC failed: {0}", e.getStatus());
- if (testHelper != null) {
- testHelper.onRpcError(e);
- }
- }
- }
-
- /**
- * Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code
- * features} with a variable delay in between. Prints the statistics when they are sent from the
- * server.
- */
- public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
- info("*** RecordRoute");
- final CountDownLatch finishLatch = new CountDownLatch(1);
- StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
- @Override
- public void onNext(RouteSummary summary) {
- info("Finished trip with {0} points. Passed {1} features. "
- + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
- summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
- if (testHelper != null) {
- testHelper.onMessage(summary);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- warning("RecordRoute Failed: {0}", Status.fromThrowable(t));
- if (testHelper != null) {
- testHelper.onRpcError(t);
- }
- finishLatch.countDown();
- }
-
- @Override
- public void onCompleted() {
- info("Finished RecordRoute");
- finishLatch.countDown();
- }
- };
-
- StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
- try {
- // Send numPoints points randomly selected from the features list.
- for (int i = 0; i < numPoints; ++i) {
- int index = random.nextInt(features.size());
- Point point = features.get(index).getLocation();
- info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
- RouteGuideUtil.getLongitude(point));
- requestObserver.onNext(point);
- // Sleep for a bit before sending the next one.
- Thread.sleep(random.nextInt(1000) + 500);
- if (finishLatch.getCount() == 0) {
- // RPC completed or errored before we finished sending.
- // Sending further requests won't error, but they will just be thrown away.
- return;
- }
- }
- } catch (RuntimeException e) {
- // Cancel RPC
- requestObserver.onError(e);
- throw e;
- }
- // Mark the end of requests
- requestObserver.onCompleted();
-
- // Receiving happens asynchronously
- if (!finishLatch.await(1, TimeUnit.MINUTES)) {
- warning("recordRoute can not finish within 1 minutes");
- }
- }
-
- /**
- * Bi-directional example, which can only be asynchronous. Send some chat messages, and print any
- * chat messages that are sent from the server.
- */
- public CountDownLatch routeChat() {
- info("*** RouteChat");
- final CountDownLatch finishLatch = new CountDownLatch(1);
- StreamObserver<RouteNote> requestObserver =
- asyncStub.routeChat(new StreamObserver<RouteNote>() {
- @Override
- public void onNext(RouteNote note) {
- info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
- .getLatitude(), note.getLocation().getLongitude());
- if (testHelper != null) {
- testHelper.onMessage(note);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- warning("RouteChat Failed: {0}", Status.fromThrowable(t));
- if (testHelper != null) {
- testHelper.onRpcError(t);
- }
- finishLatch.countDown();
- }
-
- @Override
- public void onCompleted() {
- info("Finished RouteChat");
- finishLatch.countDown();
- }
- });
-
- try {
- RouteNote[] requests =
- {newNote("First message", 0, 0), newNote("Second message", 0, 1),
- newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
-
- for (RouteNote request : requests) {
- info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
- .getLatitude(), request.getLocation().getLongitude());
- requestObserver.onNext(request);
- }
- } catch (RuntimeException e) {
- // Cancel RPC
- requestObserver.onError(e);
- throw e;
- }
- // Mark the end of requests
- requestObserver.onCompleted();
-
- // return the latch while receiving happens asynchronously
- return finishLatch;
- }
-
- /** Issues several different requests and then exits. */
- public static void main(String[] args) throws InterruptedException {
- List<Feature> features;
- try {
- features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
- } catch (IOException ex) {
- ex.printStackTrace();
- return;
- }
-
- RouteGuideClient client = new RouteGuideClient("localhost", 8980);
- try {
- // Looking for a valid feature
- client.getFeature(409146138, -746188906);
-
- // Feature missing.
- client.getFeature(0, 0);
-
- // Looking for features between 40, -75 and 42, -73.
- client.listFeatures(400000000, -750000000, 420000000, -730000000);
-
- // Record a few randomly selected points from the features file.
- client.recordRoute(features, 10);
-
- // Send and receive some notes.
- CountDownLatch finishLatch = client.routeChat();
-
- if (!finishLatch.await(1, TimeUnit.MINUTES)) {
- client.warning("routeChat can not finish within 1 minutes");
- }
- } finally {
- client.shutdown();
- }
- }
-
- private void info(String msg, Object... params) {
- logger.log(Level.INFO, msg, params);
- }
-
- private void warning(String msg, Object... params) {
- logger.log(Level.WARNING, msg, params);
- }
-
- private RouteNote newNote(String message, int lat, int lon) {
- return RouteNote.newBuilder().setMessage(message)
- .setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build();
- }
-
- /**
- * Only used for unit test, as we do not want to introduce randomness in unit test.
- */
- @VisibleForTesting
- void setRandom(Random random) {
- this.random = random;
- }
-
- /**
- * Only used for helping unit test.
- */
- @VisibleForTesting
- interface TestHelper {
- /**
- * Used for verify/inspect message received from server.
- */
- void onMessage(Message message);
-
- /**
- * Used for verify/inspect error received from server.
- */
- void onRpcError(Throwable exception);
- }
-
- @VisibleForTesting
- void setTestHelper(TestHelper testHelper) {
- this.testHelper = testHelper;
- }
- }

服务端和客户端双向 streaming,就是上面两个结合了
gRPC 服务调用支持同步和异步方式,同时也支持普通的 RPC 和 streaming 模式,可以最大程度满足业务的需求。 对于 streaming 模式,可以充分利用 HTTP/2.0 协议的多路复用功能,实现在一条 HTTP 链路上并行双向传输数据,有效的解决了 HTTP/1.X 的数据单向传输问题,在大幅减少 HTTP 连接的情况下,充分利用单条链路的性能,可以媲美传统的 RPC 私有长连接协议:更少的链路、更高的性能。
gRPC 的网络 I/O 通信基于 Netty 构建,服务调用底层统一使用异步方式,同步调用是在异步的基础上做了上层封装。因此,gRPC 的异步化是比较彻底的,对于提升 I/O 密集型业务的吞吐量和可靠性有很大的帮助。
下一章我会为大家讲解grpc关于SSL部分。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。