赞
踩
本文把前面的代码整理一遍,不仅仅是demo层面,而是考虑到放进生产中使用,且尽可能用高版本,关于这块技术,网上的文章真是一言难尽,要么就是个概念,要么就是把官网的demo拿过来跑一遍,质量太差。
我本地有Istio,也安装了K8s和Docker,这些都可以根据官网来安装,我这里就忽略了。
我本地使用的版本情况
- jdk:17
- spring-boot-starter-parent:2.7.9,没有上3.0.0,是因为RocketMQ还不支持,上了之后启动RocketMQ报错,所以用了3.0.0之前的最后一个版本
- spring-cloud-starter-kubernetes-client-all:2.1.6
在K8s容器中安装Istio,默认也会安装Envoy代理/网关,和Kiali dashboard等
把Envoy作为南北向流量网关,负责请求转发,限流等
把Envoy作为东西向业务网关,负责服务之间负载均衡等
service-provider 和 service-consumer 分别暴露gRpc接口,相互调用
service-provider 发送RocketMQ消息,service-consumer 多个消费组同时消费
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.7.9</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.example</groupId>
- <artifactId>service-provider</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>pom</packaging>
- <name>service-provider</name>
- <description>service-provider</description>
- <modules>
- <module>service-provider-proto</module>
- <module>service-provider-start</module>
- <module>service-provider-dto</module>
- </modules>
- <properties>
- <java.version>17</java.version>
- </properties>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-dependencies</artifactId>
- <type>pom</type>
- <scope>import</scope>
- <version>2.7.9</version>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-consumer-proto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-proto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-dto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
- </project>
这个模块里面主要是dto类,方便把dto模块提供给其他服务pom引用,而不是引用整个服务,我这里只有一个Order类
这里是 service-provider 暴露出去的gRpc接口,我这里暴露了一个HelloService接口
同时这里需要配置下gRpc的一些依赖和插件,然后这个模块需要被本服务(接口的实现)或者其他服务(接口的调用)引用,我们看一下 service-provider-proto 模块的pom
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>service-provider</artifactId>
- <groupId>com.example</groupId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <groupId>com.example</groupId>
- <artifactId>service-provider-proto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <name>service-provider-proto</name>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <!-- gRpc -->
- <dependency>
- <groupId>net.devh</groupId>
- <artifactId>grpc-spring-boot-starter</artifactId>
- <version>2.14.0.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf</artifactId>
- <version>1.52.1</version>
- </dependency>
- <dependency> <!-- necessary for Java 9+ -->
- <groupId>org.apache.tomcat</groupId>
- <artifactId>annotations-api</artifactId>
- <version>6.0.53</version>
- </dependency>
- </dependencies>
-
- <build>
- <extensions>
- <extension>
- <groupId>kr.motd.maven</groupId>
- <artifactId>os-maven-plugin</artifactId>
- <version>1.7.1</version>
- </extension>
- </extensions>
- <plugins>
- <plugin>
- <groupId>org.xolstice.maven.plugins</groupId>
- <artifactId>protobuf-maven-plugin</artifactId>
- <version>0.6.1</version>
- <configuration>
- <protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact>
- <pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>compile-custom</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
这里就是启动类了,我也把一些业务代码放进来了,按理说这不规范,但每个公司也有不同的分法。
我直接把代码都贴出来,不做过多的解释,方便有同学跟着写的。
- package com.example.service.provider;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
-
- @SpringBootApplication
- @EnableDiscoveryClient
- public class ServiceProviderApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(ServiceProviderApplication.class, args);
- }
- }
这个是前面暴露HelloService的接口实现
- package com.example.service.provider.facade;
-
- import com.example.service.provider.DemoConfig;
- import com.example.service.provider.api.HelloServiceGrpc;
- import com.example.service.provider.api.SayHelloData;
- import com.example.service.provider.api.SayHelloRequest;
- import com.example.service.provider.api.SayHelloResponse;
- import lombok.extern.slf4j.Slf4j;
- import net.devh.boot.grpc.server.service.GrpcService;
- import org.springframework.beans.factory.annotation.Autowired;
-
- @Slf4j
- @GrpcService
- public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
-
- @Autowired
- private DemoConfig demoConfig;
-
- @Override
- public void sayHello(SayHelloRequest request, io.grpc.stub.StreamObserver<SayHelloResponse> responseObserver) {
- log.info("接收consumer的say hello grpc 请求");
- SayHelloData helloData = SayHelloData.newBuilder()
- .setName("maple")
- .setContent(demoConfig.getMessage())
- .build();
- SayHelloResponse.Builder builder = SayHelloResponse.newBuilder()
- .setCode(0)
- .setMessage("success")
- .setSuccess(true)
- .setData(helloData);
- responseObserver.onNext(builder.build());
- responseObserver.onCompleted();
- }
- }
这个主要是测试动态配置是否生效的
- package com.example.service.provider;
-
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- @ConfigurationProperties(prefix = "bean")
- public class DemoConfig {
-
- private String message;
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
- }
这个是 service-consumer 暴露出的gRpc接口,service-provider 去调用,因为生产中不同服务之间肯定是可以相互rpc调用的。
- package com.example.service.provider;
-
- import com.example.service.consumer.api.MemberQueryRequest;
- import com.example.service.consumer.api.MemberQueryResponse;
- import com.example.service.consumer.api.MemberQueryServiceGrpc;
- import com.google.protobuf.InvalidProtocolBufferException;
- import lombok.extern.slf4j.Slf4j;
- import net.devh.boot.grpc.client.inject.GrpcClient;
- import org.springframework.stereotype.Service;
-
- @Slf4j
- @Service
- public class MemberQueryGrpcClient {
-
- @GrpcClient("service-consumer")
- private MemberQueryServiceGrpc.MemberQueryServiceBlockingStub memberQueryBlockingStub;
-
- public String queryMember() {
-
- MemberQueryRequest request = MemberQueryRequest.newBuilder().setMemberId("111").setUsername("Wang Hong Bo").build();
-
- MemberQueryResponse response = memberQueryBlockingStub.queryMember(request);
-
- log.info("MemberQueryResponse.code:{}", response.getCode());
- log.info("MemberQueryResponse.message:{}", response.getMessage());
- log.info("MemberQueryResponse.success:{}", response.getSuccess());
- log.info("MemberQueryResponse.data:{}", response.getData());
- log.info("MemberQueryResponse.address1:{}", response.getData().getAddressList().get(0).getAddress());
- log.info("MemberQueryResponse.address2:{}", response.getData().getAddressList().get(1).getAddress());
-
- return response.toString();
- }
- }
这个是MQ生产者发送消息的服务,我这里通过这个代码
String sceneStr = SceneEnum.destination(scene) 把消息做了转换,topic中间以“|”分隔,比如说我的Topic为:TP_S_1100|EC_EVENT_0001
但是在阿里云买的RocketMQ产品,它不支持topic中间以“|”分隔,认为它是特殊字符,我是本地安装的RocketMQ,它是支持的。
- package com.example.service.provider.producer;
-
- import com.alibaba.fastjson.JSON;
- import com.example.service.provider.enums.SceneEnum;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- public class RocketMQProducerService {
-
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
-
- /**
- * 普通发送
- *
- * @param scene
- * @param payload
- */
- public void send(SceneEnum scene, Object payload) {
- String sceneStr = SceneEnum.destination(scene);
- log.info("producer.sendMessage: 【{}】,【{}】", sceneStr, JSON.toJSONString(payload));
- rocketMQTemplate.convertAndSend(sceneStr, payload);
- }
-
- /**
- * 同步发送
- *
- * @param scene
- * @param payload
- * @return
- */
- public SendResult sendSync(SceneEnum scene, Object payload) {
-
- String sceneStr = SceneEnum.destination(scene);
-
- SendResult sendResult = rocketMQTemplate.syncSend(sceneStr, payload);
-
- log.info("producer.sendMessage: 【{}】,【{}】, sendResult:{}", sceneStr, JSON.toJSONString(payload), sendResult);
-
- return sendResult;
- }
-
- /**
- * 发送异步消息
- *
- * @param scene
- * @param payload
- */
- public void sendASync(SceneEnum scene, Object payload) {
-
- rocketMQTemplate.asyncSend(SceneEnum.destination(scene), payload, new SendCallback() {
-
- @Override
- public void onSuccess(SendResult sendResult) {
- System.out.println("异步发送成功啦" + sendResult);
- }
-
- @Override
- public void onException(Throwable throwable) {
- System.out.println("异步发送出异常啦" + throwable.getMessage());
- }
- });
-
- }
-
- /**
- * 发送延时消息<br/>
- * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- */
- public void sendDelay(SceneEnum scene, Object payload, int delayLevel) {
- rocketMQTemplate.syncSend(SceneEnum.destination(scene), MessageBuilder.withPayload(payload).build(), 2000, delayLevel);
- }
-
- /**
- * 发送单向消息(不关心发送结果,如日志)
- */
- public void sendOneWayMsg(SceneEnum scene, Object payload) {
- rocketMQTemplate.sendOneWay(SceneEnum.destination(scene), payload);
- }
- }
这个是场景枚举类,生产中肯定有各种场景定义
- package com.example.service.provider.enums;
-
- public enum SceneEnum {
- ORDER_CREATE("11000001", "ORDER_CREATE"),
- ORDER_CONFIRM("11000002", "ORDER_CONFIRM"),
- ;
-
- private String sceneCode;
-
- private String desc;
-
- SceneEnum(String sceneCode, String desc) {
- this.sceneCode = sceneCode;
- this.desc = desc;
- }
-
- public String getSceneCode() {
- return sceneCode;
- }
-
- public String getDesc() {
- return desc;
- }
-
- public static String destination(SceneEnum scene) {
-
- String topic = "TP_S_" + scene.getSceneCode().substring(0, 4) + "|" + "EC_EVENT_" + scene.getSceneCode().substring(4, 8);
-
- String tag = scene.getDesc();
-
- return topic + ":" + tag;
- }
- }
这个类是我用来测试服务功能的
- package com.example.service.provider.controller;
-
- import com.example.Order;
- import com.example.service.provider.DemoConfig;
- import com.example.service.provider.MemberQueryGrpcClient;
- import com.example.service.provider.enums.SceneEnum;
- import com.example.service.provider.producer.RocketMQProducerService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.cloud.context.config.annotation.RefreshScope;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RefreshScope
- @Slf4j
- @RestController
- @RequestMapping("/provider")
- public class ProviderController {
-
- @Value("${maple-test.message}")
- private String mapleTestMessage;
-
- @Autowired
- private DemoConfig demoConfig;
-
- @Autowired
- private RocketMQProducerService producerService;
-
- @Autowired
- private MemberQueryGrpcClient memberQueryGrpcClient;
-
- @GetMapping("/provider-hello")
- public String sayHello() {
- log.info("hello world");
- return "hello world";
- }
-
- @GetMapping("/grpc/queryMember")
- public String queryMember() {
- log.info("消费服务:service-provider grpc 调用 service-consumer 的 query member 接口");
- return memberQueryGrpcClient.queryMember();
- }
-
- @GetMapping("/sendMq")
- public String sendMq() {
- Order order = new Order();
- order.setOrderName("Apple");
-
- for (int i = 0; i < 5; i++) {
- order.setPrice(i);
- producerService.sendSync(SceneEnum.ORDER_CREATE, order);
- }
-
- return "send message complete";
- }
-
- @GetMapping("/provider-value-config")
- public String valueConfig() {
- log.info("直接@Value获取配置:{}", mapleTestMessage);
- return mapleTestMessage;
- }
-
- @GetMapping("/demo-config")
- public String demoConfig() {
- log.info("通过ConfigurationProperties注解获取配置:{}", demoConfig.getMessage());
- return demoConfig.getMessage();
- }
- }
这个是我们的启动引导配置,所以里面只做了K8s的相关配置
- spring:
- application:
- name: service-provider
- cloud:
- kubernetes:
- reload:
- enabled: true #修改K8s的ConfigMap配置之后自动刷新,有默认的刷新策略和刷新时机
- config:
- name: ${spring.application.name} #定义配置文件的名称
- namespace: service-k8s-demo
- sources:
- - name: ${spring.application.name}#真正引用配置文件的名称,根据我们的profile自动找对应的配置文件,该服务专属的配置
- namespace: service-k8s-demo
- - name: service-common-config #一个公共的配置文件,微服务架构中每个服务都可以引用的
- namespace: service-k8s-demo
- management:
- endpoint:
- restart:
- enabled: true
- health:
- enabled: true
- probes:
- enabled: true
- show-details: always
- prometheus:
- enabled: true
- info:
- enabled: true
- endpoints:
- web:
- exposure:
- include: '*'
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>com.example</groupId>
- <artifactId>service-provider</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>service-provider-start</artifactId>
- <packaging>jar</packaging>
- <name>service-provider-start</name>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-consumer-proto</artifactId>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-proto</artifactId>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-dto</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
-
- <!-- kubernetes -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-kubernetes-client-all</artifactId>
- <version>2.1.6</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- RocketMQ -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.3</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- <version>1.18.26</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <classifier>exec</classifier> <!-- 需要打可执行文件 -->
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
在 service-provider-start 模块下面,我们写了个Dockerfile文件,内容很简单。
- FROM arm64v8/openjdk:20-slim-buster
- ADD service-provider-start-0.0.1-SNAPSHOT-exec.jar service-provider.jar
- ENTRYPOINT java -jar service-provider.jar
开发环境的配置文件
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: service-provider-dev
- namespace: service-k8s-demo
- labels:
- spring.cloud.kubernetes.config: "true"
- data:
- application.yml: |-
- spring:
- profiles: dev
- server:
- port: 30000
- grpc:
- server:
- port: 9090
- rocketmq:
- producer:
- group: SERVICE_PRODUCER
- name-server: 124.222.91.116:9876
- bean:
- message: Hello World! --dev
- maple-test:
- message: maple for dev config map in k8s --dev
- redis:
- ip: k8s --dev
生产环境的配置文件
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: service-provider-prod
- namespace: service-k8s-demo
- labels:
- spring.cloud.kubernetes.config: "true"
- data:
- application.yml: |-
- spring:
- profiles: prod
- server:
- port: 30000
- grpc:
- server:
- port: 9090
- rocketmq:
- producer:
- group: SERVICE_PRODUCER
- name-server: 124.222.91.116:9876
- bean:
- message: Hello World! --prod
- maple-test:
- message: maple for dev config map in k8s --prod
- redis:
- ip: k8s --prod
公共的配置文件,我这里主要是定义了gRpc的服务地址与端口,因为生产中服务是要相互调用的,网上的很多文章把 address: 'dns:///service-consumer:9091' 都写为了 address: 'static://service-consumer:9091',但是这会导致服务上下线之后,调用方调不到服务的情况,所以采用DNS来动态的发现服务。
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: service-common-config
- namespace: service-k8s-demo
- labels:
- spring.cloud.kubernetes.config: "true"
- data:
- application.yml: |-
- grpc:
- client:
- GLOBAL:
- negotiation-type: plaintext
- enable-keep-alive: true
- keep-alive-without-calls: true
- service-consumer:
- address: 'dns:///service-consumer:9091'
- service-provider:
- address: 'dns:///service-provider:9090'
官网上也给出了下面这种写法来区分不同环境,但是把所有环境都放进一个文件太过臃肿,而且这个文件也不建议太大, 不能超过1M,如果太大就要考虑挂载到磁盘上的目录。
这个是我们在K8s容器中部署服务的yaml文件
- apiVersion: v1
- kind: Namespace
- metadata:
- name: service-k8s-demo
- labels:
- name: service-k8s-demo
-
- ---
-
- apiVersion: v1
- kind: ServiceAccount
- metadata:
- name: service-k8s-demo
- namespace: service-k8s-demo
-
- ---
-
- kind: ClusterRole
- apiVersion: rbac.authorization.k8s.io/v1
- metadata:
- namespace: service-k8s-demo
- name: service-k8s-demo
- rules:
- - apiGroups:
- - ""
- resources:
- - services
- - configmaps
- - endpoints
- - nodes
- - pods
- - secrets
- - namespaces
- verbs:
- - get
- - list
- - watch
-
- ---
-
- apiVersion: rbac.authorization.k8s.io/v1
- kind: ClusterRoleBinding
- metadata:
- name: service-k8s-demo
- namespace: service-k8s-demo
- subjects:
- - kind: ServiceAccount
- name: service-k8s-demo
- namespace: service-k8s-demo
- roleRef:
- kind: ClusterRole
- name: service-k8s-demo
- apiGroup: rbac.authorization.k8s.io
-
- ---
-
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: service-provider
- namespace: service-k8s-demo
- labels:
- app: service-provider
- spec:
- replicas: 3
- template:
- metadata:
- name: service-provider
- labels:
- app: service-provider
- spec:
- containers:
- - name: service-provider
- image: service-provider:1.0
- imagePullPolicy: IfNotPresent
- env:
- - name: SPRING_PROFILES_ACTIVE
- value: "dev"
- ports:
- - name: http
- protocol: TCP
- containerPort: 30000
- - name: grpc
- protocol: TCP
- containerPort: 9090
- serviceAccountName: service-k8s-demo
- restartPolicy: Always
- selector:
- matchLabels:
- app: service-provider
-
- ---
-
- apiVersion: v1
- kind: Service
- metadata:
- name: service-provider
- namespace: service-k8s-demo
- spec:
- selector:
- app: service-provider
- ports:
- - port: 80
- targetPort: 30000
- name: http
- - port: 9090
- targetPort: 9090
- name: grpc
- type: NodePort
你可以发现我在这里指定了个profiles,实际生产中可以有两份yaml文件,也可以一份yaml文件,把profile给动态传进来,这个我之后再研究过来更新吧。
这个是Envoy充当南北向网关的,负责请求的转发和限流等,我这里目前是测试了转发,限流后面测试完成再过来更新。
- apiVersion: networking.istio.io/v1alpha3
- kind: Gateway
- metadata:
- name: service-k8s-demo-gateway
- namespace: service-k8s-demo
- spec:
- selector:
- istio: ingressgateway # use istio default controller
- servers:
- - port:
- number: 31400
- name: http
- protocol: HTTP
- hosts:
- - "*"
- ---
- apiVersion: networking.istio.io/v1alpha3
- kind: VirtualService
- metadata:
- name: service-k8s-demo-virtual-service
- namespace: service-k8s-demo
- spec:
- hosts:
- - "*"
- gateways:
- - service-k8s-demo-gateway
- http:
- - match:
- - uri:
- prefix: /consumer
- route:
- - destination:
- host: service-consumer
- port:
- number: 80
- - match:
- - uri:
- prefix: /provider
- route:
- - destination:
- host: service-provider
- port:
- number: 80
至此为止,我们的 service-provider 模块的代码都已经全部完成了,不要妄想本地能跑起来哈,因为我们是在yaml文件里指定的profile,所以必须在K8s中执行yaml文件才能让他自动把帮我们去找config文件的,不过可以试着启动看看有没有什么报错,我本地启动是报这俩错,这是因为我们本地没有和K8s打交道,导致没有K8s的环境,所以也找不到配置类,报了RocketMQ的错,但是没有关系,等我们使用K8s时这些问题都没有了。
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.7.9</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.example</groupId>
- <artifactId>service-consumer</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>pom</packaging>
- <name>service-consumer</name>
- <description>service-consumer</description>
- <modules>
- <module>service-consumer-proto</module>
- <module>service-consumer-start</module>
- </modules>
- <properties>
- <java.version>17</java.version>
- </properties>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-dependencies</artifactId>
- <type>pom</type>
- <scope>import</scope>
- <version>2.7.9</version>
- </dependency>
-
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-consumer-proto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-proto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-dto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- </project>
- syntax = "proto3";
-
- option java_multiple_files = true;
- option java_package = "com.example.service.consumer.api";
- option java_outer_classname = "MemberQueryServiceProto";
-
- service MemberQueryService {
- rpc queryMember (MemberQueryRequest) returns (MemberQueryResponse) {
- }
- }
-
- message MemberQueryRequest {
- string memberId = 1;
- string username = 2;
- }
-
- message MemberQueryResponse {
- int32 code = 1;
- string message = 2;
- bool success = 3;
- MemberQueryData data = 4;
- }
-
- message MemberQueryData {
- string memberId = 1;
- string username = 2;
- int32 age = 3;
- repeated AddressData address = 4;
- map<string, string> extMap = 5;
- }
-
- message AddressData {
- string address = 1;
- string phone = 2;
- }
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>service-consumer</artifactId>
- <groupId>com.example</groupId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <groupId>com.example</groupId>
- <artifactId>service-consumer-proto</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <name>service-consumer-proto</name>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <!-- gRpc -->
- <dependency>
- <groupId>net.devh</groupId>
- <artifactId>grpc-spring-boot-starter</artifactId>
- <version>2.14.0.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-protobuf</artifactId>
- <version>1.52.1</version>
- </dependency>
- <dependency> <!-- necessary for Java 9+ -->
- <groupId>org.apache.tomcat</groupId>
- <artifactId>annotations-api</artifactId>
- <version>6.0.53</version>
- </dependency>
- </dependencies>
-
- <build>
- <extensions>
- <extension>
- <groupId>kr.motd.maven</groupId>
- <artifactId>os-maven-plugin</artifactId>
- <version>1.7.1</version>
- </extension>
- </extensions>
- <plugins>
- <plugin>
- <groupId>org.xolstice.maven.plugins</groupId>
- <artifactId>protobuf-maven-plugin</artifactId>
- <version>0.6.1</version>
- <configuration>
- <protocArtifact>com.google.protobuf:protoc:3.21.7:exe:${os.detected.classifier}</protocArtifact>
- <pluginId>grpc-java</pluginId>
- <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.52.1:exe:${os.detected.classifier}</pluginArtifact>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>compile-custom</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>com.example</groupId>
- <artifactId>service-consumer</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>service-consumer-start</artifactId>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-proto</artifactId>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-consumer-proto</artifactId>
- </dependency>
- <dependency>
- <groupId>com.example</groupId>
- <artifactId>service-provider-dto</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
-
- <!-- kubernetes -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-kubernetes-client-all</artifactId>
- <version>2.1.6</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- RocketMQ -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.2.3</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- <version>1.18.26</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <classifier>exec</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- </project>
这个是 service-consumer 暴露接口的实现类
- package com.example.service.consumer.facade;
-
- import com.example.service.consumer.api.*;
- import com.google.common.collect.Maps;
- import lombok.extern.slf4j.Slf4j;
- import net.devh.boot.grpc.server.service.GrpcService;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
-
- @Slf4j
- @GrpcService
- public class MemberQueryServiceImpl extends MemberQueryServiceGrpc.MemberQueryServiceImplBase {
-
- @Override
- public void queryMember(MemberQueryRequest request, io.grpc.stub.StreamObserver<MemberQueryResponse> responseObserver) {
- log.info("接收其他服务的 query member 的 grpc 请求, 请求参数为:{}", request);
- List<AddressData> addressList = new ArrayList<>();
- addressList.add(AddressData.newBuilder().setAddress("杭州").setPhone("110").build());
- addressList.add(AddressData.newBuilder().setAddress("马来西亚").setPhone("911").build());
-
- Map<String, String> extMap = Maps.newHashMap();
- extMap.put("secondField", "2");
- extMap.put("lastField", "last");
-
- MemberQueryData queryData = MemberQueryData.newBuilder()
- .setMemberId(request.getMemberId())
- .setUsername(request.getUsername())
- .setAge(18)
- .addAllAddress(addressList)
- .putExtMap("firstField", "test-maple")
- .putAllExtMap(extMap)
- .build();
-
- MemberQueryResponse.Builder builder = MemberQueryResponse.newBuilder()
- .setCode(0)
- .setMessage("success")
- .setSuccess(true)
- .setData(queryData);
-
- responseObserver.onNext(builder.build());
- responseObserver.onCompleted();
- }
- }
- package com.example.service.consumer;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
-
- @EnableDiscoveryClient
- @SpringBootApplication
- public class ServiceConsumerApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(ServiceConsumerApplication.class, args);
- }
- }
这个是消费订单创建的消息,然后给用户发送通知的
- package com.example.service.consumer.mq;
-
- import com.alibaba.fastjson.JSON;
- import com.example.Order;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 创建订单成功,发送gotone通知
- */
- @Slf4j
- @Component
- @RocketMQMessageListener(topic = "TP_S_1100|EC_EVENT_0001", consumerGroup = "CREATE_ORDER_GOTONE_CONSUMER")
- public class CreateOrderGotoneConsumer implements RocketMQListener<Order> {
- @Override
- public void onMessage(Order order) {
- log.info("consumer message: 【CreateOrderGotoneConsumer】,【{}】", JSON.toJSONString(order));
- }
- }
这个也是消费订单创建的消息,扣减库存的,这两个是不同的consumerGroup,所以都能消费到消息,但是这个情况一定要做好幂等处理,访问其中一个consumerGroup消费失败触发重试一直投递消息,导致处理成功的consumerGroup也一直收到消息,所以要做好幂等。
- package com.example.service.consumer.mq;
-
- import com.alibaba.fastjson.JSON;
- import com.example.Order;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 创建订单成功,扣减库存
- */
- @Slf4j
- @Component
- @RocketMQMessageListener(topic = "TP_S_1100|EC_EVENT_0001", consumerGroup = "DEDUCT_INVENTORY_CONSUMER")
- public class DeductInventoryConsumer implements RocketMQListener<Order> {
- @Override
- public void onMessage(Order order) {
- log.info("consumer message: 【DeductInventoryConsumer】,【{}】", JSON.toJSONString(order));
- }
- }
这个是 service-consumer 调用 service-provider 的gRpc 接口
- package com.example.service.consumer.grpc;
-
- import com.example.service.provider.api.HelloServiceGrpc;
- import com.example.service.provider.api.SayHelloRequest;
- import com.example.service.provider.api.SayHelloResponse;
- import net.devh.boot.grpc.client.inject.GrpcClient;
- import org.springframework.stereotype.Service;
-
- @Service
- public class ProviderServiceGrpcClient {
-
- @GrpcClient("service-provider")
- private HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub;
-
- public String sayHello() {
-
- SayHelloRequest request = SayHelloRequest.newBuilder().setName("maple123").build();
-
- SayHelloResponse sayHelloResponse = helloServiceBlockingStub.sayHello(request);
-
- return sayHelloResponse.toString();
- }
- }
这个是测试consumer服务的入口
- package com.example.service.consumer.controller;
-
- import com.example.service.consumer.grpc.ProviderServiceGrpcClient;
- import com.example.service.consumer.ServiceConsumerApplication;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.SpringApplication;
- import org.springframework.cloud.client.discovery.DiscoveryClient;
- import org.springframework.cloud.context.config.annotation.RefreshScope;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.List;
-
- @Slf4j
- @RequiredArgsConstructor
- @RefreshScope
- @RestController
- @RequestMapping("/consumer")
- public class ConsumerController {
-
- private final DiscoveryClient discoveryClient;
- private final ProviderServiceGrpcClient providerServiceGrpcClient;
-
- public static void main(String[] args) {
- SpringApplication.run(ServiceConsumerApplication.class, args);
- }
-
- @Value("${consumer.body}")
- private String consumerBody;
-
- @GetMapping("/grpc/hello")
- public String sayHello() {
- log.info("消费服务:service-consumer grpc 调用 service-provider");
- return providerServiceGrpcClient.sayHello();
- }
-
- @GetMapping("/consumerBody")
- public String consumerBody() {
- log.info("获取配置consumerBody:{}", consumerBody);
- return consumerBody;
- }
-
- @GetMapping("/consumers/services")
- public List<String> findServices() {
- log.info("当前注册中心下所有服务");
- List<String> services = discoveryClient.getServices();
- services.stream().map(discoveryClient::getInstances).forEach(v ->
- v.forEach(s -> System.out.printf("%s:%s uri:%s%n", s.getHost(), s.getPort(), s.getUri())));
-
- return services;
- }
- }
这个是 service-consumer 的引导类配置,也放了rocketmq 的配置进来,因为对于消费者它启动的时候就会去初始化consumer。
- spring:
- application:
- name: service-consumer
- cloud:
- kubernetes:
- reload:
- enabled: true
- config:
- name: ${spring.application.name}
- namespace: service-k8s-demo
- sources:
- - name: ${spring.application.name}
- namespace: service-k8s-demo
- - name: service-common-config
- namespace: service-k8s-demo
-
- management:
- endpoint:
- restart:
- enabled: true
- health:
- enabled: true
- info:
- enabled: true
- endpoints:
- web:
- exposure:
- include: '*'
- rocketmq:
- name-server: 124.222.91.116:9876
- FROM arm64v8/openjdk:20-slim-buster
- ADD service-consumer-start-0.0.1-SNAPSHOT-exec.jar service-consumer.jar
- ENTRYPOINT java -jar service-consumer.jar
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: service-consumer-dev
- namespace: service-k8s-demo
- labels:
- spring.cloud.kubernetes.config: "true"
- data:
- application.yml: |-
- spring:
- profiles: dev
- server:
- port: 30001
- grpc:
- server:
- port: 9091
- consumer:
- body: "1234567890--dev"
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: service-consumer-prod
- namespace: service-k8s-demo
- labels:
- spring.cloud.kubernetes.config: "true"
- data:
- application.yml: |-
- spring:
- profiles: prod
- server:
- port: 30001
- grpc:
- server:
- port: 9091
- consumer:
- body: "1234567890--prod"
- apiVersion: v1
- kind: Namespace
- metadata:
- name: service-k8s-demo
- labels:
- name: service-k8s-demo
-
- ---
-
- apiVersion: v1
- kind: ServiceAccount
- metadata:
- name: service-k8s-demo
- namespace: service-k8s-demo
-
- ---
-
- kind: ClusterRole
- apiVersion: rbac.authorization.k8s.io/v1
- metadata:
- namespace: service-k8s-demo
- name: service-k8s-demo
- rules:
- - apiGroups:
- - ""
- resources:
- - services
- - configmaps
- - endpoints
- - nodes
- - pods
- - secrets
- - namespaces
- verbs:
- - get
- - list
- - watch
-
- ---
-
- apiVersion: rbac.authorization.k8s.io/v1
- kind: ClusterRoleBinding
- metadata:
- name: service-k8s-demo
- namespace: service-k8s-demo
- subjects:
- - kind: ServiceAccount
- name: service-k8s-demo
- namespace: service-k8s-demo
- roleRef:
- kind: ClusterRole
- name: service-k8s-demo
- apiGroup: rbac.authorization.k8s.io
-
- ---
-
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: service-consumer
- namespace: service-k8s-demo
- labels:
- app: service-consumer
- spec:
- replicas: 1
- template:
- metadata:
- name: service-consumer
- labels:
- app: service-consumer
- spec:
- containers:
- - name: service-consumer
- image: service-consumer:1.0
- imagePullPolicy: IfNotPresent
- env:
- - name: SPRING_PROFILES_ACTIVE
- value: "prod"
- ports:
- - name: http
- protocol: TCP
- containerPort: 30001
- - name: grpc
- protocol: TCP
- containerPort: 9091
- serviceAccountName: service-k8s-demo
- restartPolicy: Always
- selector:
- matchLabels:
- app: service-consumer
-
- ---
-
- apiVersion: v1
- kind: Service
- metadata:
- name: service-consumer
- namespace: service-k8s-demo
- spec:
- selector:
- app: service-consumer
- ports:
- - port: 80
- targetPort: 30001
- name: http
- - port: 9091
- targetPort: 9091
- name: grpc
- type: NodePort
至此为止,我们的 service-consumer 模块的代码也都已经全部完成了。
0. 准备环境:docker、K8s、Istio
service-provider 和 service-consumer 打镜像,我这里直接打到本地仓库,这里是docker的范畴(docker build -t service-provider:1.0 . 等)
把 service-provider 和 service-consumer 的 config 都在K8s容器中执行,这里是K8s的范畴(kubectl apply -f service-consumer-dev.yaml 等)
把 service-provider 和 service-consumer 部署到 K8s 容器中,这里是K8s的范畴(kubectl apply -f service-consumer-deploy.yaml 等),部署完成之后我们查看一下相关的pod,service,config等
这个时候两个服务的端口都有了,就可以使用controller里的rest接口相互进行测试了,可以测试下service-consumer调用service-provide的gRpc接口,可以发现 service-provide 虽然有3台但全打在了一个POD上,这个是因为gRpc是基于HTTP2.0多路复用,L4层基于连接级别的负载均衡,在K8s中负载均衡是失效的,需要借助L7应用层负载均衡来做,Envoy和Linkerd等可以实现,Linkerd实现起来很简单,可以点这里。
把我们的namespace 注入到 Envoy 中
kubectl label namespace service-k8s-demo istio-injection=enabled
启动Istio dashboard Kiali
istioctl dashboard kiali &
做测试,我的Gateway里的端口是31400
测试gRpc相互调用,同时也关注下负载均衡的问题,这里解决了。
测试RocketMQ 生产消费
测试动态配置
至此,测试完成。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。