当前位置:   article > 正文

SpringCloud微服务、Docker_spring cloud与docker微服务

spring cloud与docker微服务

目录

一、入门案例

1、远程调用

<1>注入RestTemplate这个类

 <2>发http请求,并且设置返回值类型​编辑

二、eureka注册中心

1、相关概念(原理)

2、搭建Eureka注册中心(引依赖,加注解,加配置)

 1、在父模块下创建一个springboot的子模块名字为eureka,并且引入依赖

2、在启动类里添加@EnableEurekaServer的注解

 3、添加配置文件

3、注册服务(引依赖and加地址)

1、把需要注册的服务里导入eureka的客户端依赖

2、配置文件里编辑服务名和eureka的地址

3、 如何多开服务?(不在注册步骤内)

4、发现服务(使用服务名,调用其他微服务)

<1>我们使用RestTemplate发http请求时把url的ip和端口号换成服务名

<2>在启动类中的RestTemplate上加负载均衡的注解@LoadBalanced

三、Ribbon实现负载均衡

1、负载均衡原理

 2、负载均衡策略及其配置方式

<1>负载均衡是一个叫做IRule的接口来定义的

<3>常见的Ribbon负载均衡规则

 <4>负载均衡策略的配置方式

3、饥饿加载

四、Nacos注册中心(替换eureka注册中心)

 1、注册服务

<1>在父工程导入管理依赖

<2>在微服务工程导入Nacos的依赖

 <3>在服务端的yml配置文件加上nacos的服务地址

2、发现服务(见上文Eureka)

3、那nacos多级存储模型

<1>分级存储

<2>\实例集群的实现方式

 <3>实现集群的负载均衡

 <4>服务实例的权重设置

<5>环境隔离(了解即可)

<6>nacos与uereka的区别(nocas的非临时实例)

五、Nacos的配置管理

1、nacos实现配置管理

2、微服务的配置拉取

<1>引入nacos-config依赖

<2>添加bootstrap.yaml

 <3>测试

 3、配置热更新

<1>方式一(了解)

<2>新建配置类添加@ConfigurationProperties (常用)

4、多环境配置共享

 <1>创建多环境共享的配置文件

 <2>配置文件的优先级

 5、nacos集群搭建

六、Feign实现远程调用

1、feign的配置和基本使用

<1>在需要远程调用的服务里引feign的依赖

 <2>在启动类里加开启feign的注解@EnableFeignClients

 <3>编写feign的客户端(和编写controller很相似)

 2、自定义feign的配置(了解)

<方式一> :基于配置文件修改日志级别

<方式二> 基于java代码的方式

 3、feign的性能优化

<1>日志级别最好为Basic

<2>使用HttpClient或者OKHttp连接池代替默认的URLConnection

 4、feign的最佳实践

1、两种实现方案

 2、对方案二的实现(重点)

七、gateway网关

1、网关的作用

 2、网关的搭建

<1>新建模块命名为gateway,并且引入nacos依赖和gateway依赖 

<2>编写路由配置和nacos的地址

目前为止可以做出微服务的流程图仅为这样:​编辑3、断言工厂Route Predicate Factory

4、路由过滤器配置

<1>GatewayFilter是做什么的

<2>常见的过滤器工厂

<3>过滤器局部单个服务生效的配置

 <4>过滤器全局生效的配置

5、全局过滤器GlobalFilter

<1>案例(请仔细推敲示例代码)

6、过滤器执行顺序

 7、网关的cors跨域问题

<1>什么是跨域问题

 <2>如何解决跨域问题

八、微服务里的负载均衡

1、微服务之间的负载均衡(ribbon实现)

2、网关与微服务之间的负载均衡

九、Docker

1、为什莫使用Docker

 <1>部署项目时遇到的问题 

<2>Docker的解决方案

<3>Docker是什么

2、Docker架构

<1>镜像和容器

<2>Docker的结构C/S

 <3>DockerHub

 <4>总结

3、操作Docker

<1>docker的开启与关闭

<2>镜像名规范

<3>镜像操作命令

 <4>Docker的容器操作命令

<5>Docker数据卷命令

<6> 容器挂载数据卷

<7>docker数据卷操作流程

<8>将宿主机的目录直接挂载到容器

4、自定义镜像

<1>镜像结构

 <3>Dockerfile及其指令

<3>自定义java项目的镜像

5、Docker-Compose

<1>了解docker-Compose

<2>不适用docker-compose如何将Java项目构成镜像,容器部署。

<3>docker-compose容器部署分布式项目

6、镜像私服

<1>我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:

<2>在tmp目录里创建一个register-ui的文件夹,里面新建docker-compose.yml文件

<3>docker-compose up -d运行

<4>镜像的推送和拉取

九、RabbitMQ

1、异步调用

<1>异步调用的解决方案和优势

<2>异步调用优缺点

 2、了解RabbitMQ

 <1>通过docker安装运行RabbitMQ

 <2>RTabbit的结构和概念​编辑

3、RabbitMQ的五种常见消息模型

 <1>基本消息队列

九(2)、SpringAMQP整合RabbitMQ

1、 入门案例

<1>使用AMQP完成基本消息队列的消息发布

<2>使用AMQP完成基本消息队列的监听消息

<2>Work Queue

2、发布和订阅

<1>Fanout广播交换机

<2>Direct路由交换机

<3>Topic话题交换机

<4>消息转换器Json序列化




SpringCloud实际是分布式的一种解决方案。

 

 

一、入门案例

1、远程调用

                 远程调用的关键就是发http请求,我们使用RestTemplate这个类来完成这件事。

这个类就是来实现发restful风格的请求的。

 关键操作如图所示:

<1>注入RestTemplate这个类

 <2>发http请求,并且设置返回值类型

 

二、eureka注册中心

1、相关概念(原理)

 

2、搭建Eureka注册中心(引依赖,加注解,加配置)

 1、在父模块下创建一个springboot的子模块名字为eureka,并且引入依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
  4. </dependency>

2、在启动类里添加@EnableEurekaServer的注解

 3、添加配置文件

  1. server:
  2. port: 10086
  3. spring:
  4. application:
  5. name: eurekaserver #微服务的名字
  6. #配置这个地址只为了以后多个eureka注册中心交互数据
  7. eureka:
  8. client:
  9. service-url:
  10. defaultZone : http://127.0.0.1:10086/eureka/

3、注册服务(引依赖and加地址)

1、把需要注册的服务里导入eureka的客户端依赖

  1. <!-- eureka客户端依赖 -->
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  5. </dependency>

2、配置文件里编辑服务名和eureka的地址

这里的配置文件的地址是eureka配置文件的地址是一样的

  1. spring:
  2. application:
  3. name: xxxxx
  4. eureka:
  5. client:
  6. service-url:
  7. defaultZone: http://127.0.0.1:10086/eureka #默认访问路径

 然后我们运行服务,我们的服务就注册到了eureka上了。

运行服务后,我们点击这里。

 

 然后会跳转到我们的eureka界面:

3、 如何多开服务?(不在注册步骤内)

<1>

 <2>

 <3>效果展示

4、发现服务(使用服务名,调用其他微服务)

<1>我们使用RestTemplate发http请求时把url的ip和端口号换成服务名

<2>在启动类中的RestTemplate上加负载均衡的注解@LoadBalanced

 如此一来我们便可以发现服务,通过服务名访问服务,并且完成服务的负载均衡。

三、Ribbon实现负载均衡

1、负载均衡原理

 IRule决定了负载均er

 2、负载均衡策略及其配置方式

<1>负载均衡是一个叫做IRule的接口来定义的

         根据Zone来区分服务器所在位置,进而进一步选择轮询的的服务器,默认选择相同Zone的服务器进行轮询。

<3>常见的Ribbon负载均衡规则

 

 <4>负载均衡策略的配置方式

  (1)启动负载均衡在RestTemplate上添加@LoadBalanced

(2)通过代码的方式(所有服务生效)

在启动类里重新定义一个新的IRule,需要什么策略就返回一个什么对象。

注意:这种方式配置的策略作用于全局,以后无论访问任何的其他的服务都会采用这种策略。

  1. @Bean
  2. public IRule randomRule(){
  3. return new RandomRule();
  4. }

(3)配置文件的方式(设置的服务生效)

在yml配置文件中新增配置完成策略配置。

第一条userservice是要配置负载均衡的服务的名字,以后在访问这个服务的时候,就可以采用这个策略进行访问了。

注意:这个策略只是针对我们设置的服务,其他服务走默认的策略。

  1. userservice: # 给某个微服务配置负载均衡规则,这里是userservice服务
  2. ribbon:
  3. NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则

3、饥饿加载

Ribbon默认加载方式是懒加载模式,就是需要用到某个服务才会加载,第一次加载时间长。

Ribbon的饥饿加载是:在初始化的时候就加载。

  1. #饥饿加载的配置
  2. ribbon:
  3. eager-load:
  4. enabled: true
  5. clients: user-server

clients是一个list集合:

如果需要添加多个服务,格式为

  1. #饥饿加载的配置
  2. ribbon:
  3. eager-load:
  4. enabled: true
  5. clients:
  6. - user-server
  7. - xxxx-server

四、Nacos注册中心(替换eureka注册中心)

 时使用前记得先打开nacos的服务。

在bin文件下开启cmd输入:startup.cmd -m standalone(windows系统)

 1、注册服务

<1>在父工程导入管理依赖

  1. <!-- 管理依赖-->
  2. <dependency>
  3. <groupId>com.alibaba.cloud</groupId>
  4. <artifactId>spring-cloud-alibaba-dependencies</artifactId>
  5. <version>2.2.5.RELEASE</version>
  6. <type>pom</type>
  7. <scope>import</scope>
  8. </dependency>

 

<2>在微服务工程导入Nacos的依赖

  1. <!-- nacos客户端依赖包 -->
  2. <dependency>
  3. <groupId>com.alibaba.cloud</groupId>
  4. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  5. </dependency>

 <3>在服务端的yml配置文件加上nacos的服务地址

  1. spring:
  2. # nacos的服务端地址配置
  3. cloud:
  4. nacos:
  5. server-addr: localhost:8848

 

2、发现服务(见上文Eureka)

3、那nacos多级存储模型

<1>分级存储

服务、集群、实例

<2>\实例集群的实现方式

 在需要的集群的服务里加上相同的集群名字配置。

  1. # nacos的服务端地址配置
  2. cloud:
  3. nacos:
  4. server-addr: localhost:8848
  5. discovery:
  6. cluster-name: shanghai #配置集群名字,也就是机房位置

 效果图:

 <3>实现集群的负载均衡

即:默认情况下,再进行远程访问时候,优先访问相同的集群

例如:

        我们的orderserver设置的是BJ集群userserver有三个实例。实例1实例2设置的也是BJ集群实例3设置为HZ集群,那么orderserver访问的userserver的时候就会优先访问BJ集群的实例1和实例2,在这个两个实例里进行随机访问。

        只有当BJ集群的所有实例都无法访问时候,才会去访问其他集群。

  1. cloud:
  2. nacos:
  3. server-addr: localhost:8848
  4. discovery:
  5. cluster-name: shanghai #配置集群名字,也就是机房位置

 <4>服务实例的权重设置

权重越大访问的几率越大。

<5>环境隔离(了解即可)

     在代码开发过程中,有不同的环境,生产环境,开发环境,测试环境。这几个环境的代码的服务是不互通的。

如何实现呢?

在Nacos中有环境隔离名为:namespace

每个namespace是一个环境。

 设置方式:

在nacos中新建命名空间,获取命名空间id,在yum文件中配置namspace的id

 

 

 

<6>nacos与uereka的区别(nocas的非临时实例)

相同点:

        都会进行心跳检测,来判断服务是否存活,不存活就干掉。

        都会接收消费者的定时拉取服务名和地址

不同点是:

        对于nacos的非临时实例来说,nocas会主动询问服务是否存活,服务不健康就i等待。

         除此之外,nacos还会主动推送服务的存活信息给服务者

注意:如何设置临时实例为非临时实例呢

        对于非临时实例,nacos对其的存活状态更加关注,也会更快发现他是否健康。

五、Nacos的配置管理

1、nacos实现配置管理

 根据下面的方式,在nacos的客户端进行服务的配置管理。

新建配置文件和服务进行关联。

 

 

2、微服务的配置拉取

微服务要拉取nacos中管理的配置,并且与本地的application.yml配置合并,才能完成项目启动。

但如果尚未读取application.yml,又如何得知nacos地址呢?

因此spring引入了一种新的配置文件:bootstrap.yaml文件,会在application.yml之前被读取,流程如下:

<1>引入nacos-config依赖

  1. <!--nacos配置管理依赖-->
  2. <dependency>
  3. <groupId>com.alibaba.cloud</groupId>
  4. <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
  5. </dependency>

<2>添加bootstrap.yaml

 注意我们的nacos的配置管理的配置文件名为:user-server-dev.yaml

服务名-环境名.yaml

  1. spring:
  2. application:
  3. name: user-server # 服务名称
  4. profiles:
  5. active: dev #开发环境,这里是dev
  6. cloud:
  7. nacos:
  8. server-addr: localhost:8848 # Nacos地址
  9. config:
  10. file-extension: yaml # 文件后缀名

 <3>测试

 

 配置文件里设置的是时间格式。

我们使用@Value来读取这个配置的信息对日期进行格式化。

 结果:

成功读取。

 3、配置热更新

<1>方式一(了解)

在需要热更新的类上加上@RefreshScope 

<2>新建配置类添加@ConfigurationProperties (常用)

@Date是lombook的方式实现set和get方法

@Component把配置类送到spring容器配置完成后,PatternPropertises的对象pp,获取的pp.getDadteformat()得到的值就会热更新了。

pattern.dateformat就是我们管理的配置文件的里的日期格式。

4、多环境配置共享

微服务启动时,会从nacos读取多个配置文件:

 也会是说:

这种的配置文件时多环境共享全部生效的。

 <1>创建多环境共享的配置文件

以user-server这个服务为例子。注意文件名的格式:服务名.yaml

 <2>配置文件的优先级

单环境配置文件>多环境配置文件>本地配置文件

 5、nacos集群搭建

不想搭建。。。

六、Feign实现远程调用

1、feign的配置和基本使用

<1>在需要远程调用的服务里引feign的依赖

  1. <!--feign的客户端依赖-->
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-openfeign</artifactId>
  5. </dependency>

 <2>在启动类里加开启feign的注解@EnableFeignClients

                         记得把RestTemplate注释下

 <3>编写feign的客户端(和编写controller很相似)

客户端是接口的形式。

第一:@FeignClient("服务名")

接下来的步骤与springMVC的请求响应形式完全类似。

第二:确定请求方式和参数例如:@GetMapping("/user/{id}")

第三:请求的方法和返回值

 

 2、自定义feign的配置(了解)

 以修改日志级别为例:

<方式一> :基于配置文件修改日志级别

 

<方式二> 基于java代码的方式

 3、feign的性能优化

<1>日志级别最好为Basic

<2>使用HttpClient或者OKHttp连接池代替默认的URLConnection

 实际开发,我们需要压测最大连接数和单个路径最大连接数

 4、feign的最佳实践

1、两种实现方案

方案1:

 方案二:

 2、对方案二的实现(重点)

 回忆下我们原本的feign的是实现方式:

现在的实现方式:

(1)父工程新建一个模块,命名为userFeign,然后引入Feign的starter依赖

 

(2)在这个子模块里是实现feign客户端和配置(将order-service中的userClient和User和feign的配置文件复制到这个新的模块)记得删除rder-service里的重复的类

 

(3)在order-service引入userFeign的依赖

(4)重新导入需要的包

如果无法导入,在编译一次工程

(5)在@EnableFeignClients中添加需要的客户端

 方式有两种:

我个人比较懒推荐第一种(老师推荐第二种)

 总结:

七、gateway网关

1、网关的作用

 

 2、网关的搭建

<1>新建模块命名为gateway,并且引入nacos依赖和gateway依赖 

<2>编写路由配置和nacos的地址

  1. server:
  2. port: 10010 # 网关端口
  3. spring:
  4. application:
  5. name: gateway #服务名
  6. cloud:
  7. nacos:
  8. server-addr: localhost:8848 # nacos的服务地址
  9. gateway:
  10. routes: #网关路由配置
  11. - id: user-server # 路由id,自定义,不重复即可
  12. uri: lb://user-server # 路由的目标地址,lb就是负载均衡后面跟服务名
  13. predicates: #路由断言 就是布尔语句
  14. - Path=/user/** # 这个是路径匹配,只要符合/user/开头就符合要求
  15. - id: order-server
  16. uri: lb://order-server
  17. predicates:
  18. - Path=/order/**

 此时我们在访问微服务时候,就不在是直接访问微服务的ip地址而是,网关的IP+服务请求路径

 

 接下来的执行流程如下图:

目前为止可以做出微服务的流程图仅为这样:3、断言工厂Route Predicate Factory

 

 如果不符合路由规则,会报404的错误。

4、路由过滤器配置

<1>GatewayFilter是做什么的

<2>常见的过滤器工厂

<3>过滤器局部单个服务生效的配置

 以AddRequestHeader为例

这个过滤器工厂会,给符合过滤条件的请求添加自定义的请求头

在Spring里,我们可以使用@RequsetHeader("请求头名")去接这个数据

 <4>过滤器全局生效的配置

把defalult-filter代替filter

5、全局过滤器GlobalFilter

     虽然我们上文已经介绍了,全局过滤器的一种配置,但是他的功能单一,有时我们需要在过滤器添加许多功能,这是上面的过滤器所不能实现的。 

<1>案例(请仔细推敲示例代码)

需求:.请求中必须带有filter参数,且filter参数必须为666才放行,否则401

  1. package com.wrx;
  2. import org.springframework.cloud.gateway.filter.GatewayFilterChain;
  3. import org.springframework.cloud.gateway.filter.GlobalFilter;
  4. import org.springframework.core.annotation.Order;
  5. import org.springframework.http.HttpStatus;
  6. import org.springframework.http.server.reactive.ServerHttpRequest;
  7. import org.springframework.stereotype.Component;
  8. import org.springframework.util.MultiValueMap;
  9. import org.springframework.web.server.ServerWebExchange;
  10. import reactor.core.publisher.Mono;
  11. @Order(-1)//控制过滤器顺序,int值越小,优先级越高
  12. @Component
  13. public class filter_1 implements GlobalFilter {
  14. @Override
  15. public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  16. // 1、获取参数
  17. ServerHttpRequest request = exchange.getRequest();
  18. MultiValueMap<String, String> Params = request.getQueryParams();
  19. // 2、获取参数中的filter参数
  20. String filter = Params.getFirst("filter");
  21. // 3、如果参数值等于666就放行
  22. if ("666".equals(filter)){
  23. // 4、是就放行
  24. return chain.filter(exchange);
  25. }
  26. // 5、设置状态码
  27. exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
  28. // 6、拦截请求
  29. return exchange.getResponse().setComplete();
  30. }
  31. }

6、过滤器执行顺序

 7、网关的cors跨域问题

<1>什么是跨域问题

 <2>如何解决跨域问题

 cors解决方案,浏览器向服务器发出请求,询问是否允许跨域

会CV会修改即可,不用刻意的记忆。

  1. #全局的跨域处理
  2. globalcors: # 全局的跨域处理
  3. add-to-simple-url-handler-mapping: true # 解决options请求被拦截问题
  4. corsConfigurations:
  5. '[/**]':
  6. allowedOrigins: # 允许哪些网站的跨域请求
  7. - "http://localhost:10010" #需要那个网站跨域,就写那几个网站即可
  8. allowedMethods: # 允许的跨域ajax的请求方式
  9. - "GET"
  10. - "POST"
  11. - "DELETE"
  12. - "PUT"
  13. - "OPTIONS"
  14. allowedHeaders: "*" # 允许在请求中携带的头信息
  15. allowCredentials: true # 是否允许携带cookie
  16. maxAge: 360000 # 这次跨域检测的有效期

八、微服务里的负载均衡

1、微服务之间的负载均衡(ribbon实现)

2、网关与微服务之间的负载均衡

九、Docker

1、为什莫使用Docker

 <1>部署项目时遇到的问题 

<2>Docker的解决方案

 解决环境、依赖兼容的问题

 解决不同系统应用的问题

 

 

 

 解决方案:系统函数库一同打包

<3>Docker是什么

2、Docker架构

<1>镜像和容器

镜像是只用来读数据,不写数据的,数据写在本地文件夹。

<2>Docker的结构C/S

 <3>DockerHub

 <4>总结

3、操作Docker

<1>docker的开启与关闭

基于luinx系统

systemctl start docker  # 启动docker服务

systemctl stop docker  # 停止docker服务

systemctl restart docker  # 重启docker服务

<2>镜像名规范

镜像名:[repository]:[tag]

<3>镜像操作命令

(1)构建镜像        docker build

(2)查看镜像        docker images

(3)删除镜像        docker rmi [image]

(4)镜像推送        docker push [image]

(5)镜像拉取        docker pull [image]

(6)镜像打包        docker save -o [保存的文件名]  [image] 

(7)镜像加载(压缩包)docker load -i [压缩包]

(8)docker --help  获取所有的docker命令

(9)docker save --help 查看docker save的使用方法

实践一:

实践二:

 <4>Docker的容器操作命令

1)创建容器        docker run --name [容器名] -p [主机端口]:[镜像端口] -d [image]

(2)删除容器        docker rm

(3)容器暂停        docker pause

(4)容器取消暂停        docker unpause

(5)容器停止        docker stop

(6)容器重启        docker start

(7)进入容器执行命令         docker exec

(8)查看容器运行日志         docker logs -f [容器名]

(9)查看所有运行的容器及状态        docker ps       

实践1:

运行daoker容器:

docker run --name [容器命] -p [主机端口号] : 80 -d nginx

示例:

docker run --name mynginx -p 80:80 -d nginx

mynginx: 容器命

-p 80:80 :第一个80是宿主服务器的端口

查看日志:

docker logs [容器名]

docker logs mynginx :查看mynginx的容器的日志

docker logs -f mynginx : 持续跟踪mynginx

实践2:

 实践三:

 

1:创建容器(设置持久化存储)

docker run --name myredis -p 6379:6379 -d redis redis-server --appendonly yes

2:进入redis容器,打开redis-cli

docker exec -it myredis (bash)redis-cli

<5>Docker数据卷命令

         数据卷其实,是一个doeker的虚拟目录,虚拟目录文件与本地的磁盘的/volume的目录关联,虚拟目录的,每创建一个虚拟文件,就会在本地磁盘的/volume创建一个真实的文件与之关联。

容器可以使用需虚拟目录里的文件,也就是与虚拟目录的文件关联,实际就是间接与本地文件关联,进而实现本地文件与容器文件的同步更新。

 (1)docker volume create 

 (2)docker volume inspect

 (3)docker volume ls

 (4)docker volume prune

 (5)docker volume rm

<6> 容器挂载数据卷

即使我们不是事先创建虚拟目录html,docker也会帮我们创建。

<7>docker数据卷操作流程

(1)先创建容器,创建容器时候就可以指定数据卷(虚拟目录)

(2)然后通过操作数据卷的命令操作刚创建好的数据卷,一般来说通过inspect命令来获取数据卷为所有数据信息,包括它在硬盘中关联的文件,也可以进行其他操作,比如删除。

(3)找到实际的目录位置吗,就可以对其进行修改了。

(4)删除容器,删除数据卷。

<8>将宿主机的目录直接挂载到容器

这种方式,虽然需要自己创建目录,但是也更方便我们找到我们挂载的目录。

 实现方式以Mysql为例:

1:上穿mysql的镜像包

2:创建对应目录

3:在conf目录传入配置文件

4:加载镜像

5:运行镜像,并且挂在宿主机文件

 docker run --name mysql \

-p 3305:3306 \

-e  MYSQL_ROOT_PASSWORD=root \

-v /tmp/mysql/conf/hmy.cnf:/etc/mysql/conf.d/hmy.cnf \

-v /tmp/mysql/data:/var/lib/mysql \

-d mysql:5.7.25
注意:这是一条命令 \ 是换行符,表示命令还没结束

-e 设置mysql的密码(环境变量)

-v 挂载宿主机文件到容器

4、自定义镜像

<1>镜像结构

 <3>Dockerfile及其指令

(1)FROM        指定基础镜像,也就是操作系统类型顶

例如:FROM centos:6         使用Centos:6操作系统

(2)ENV        设置环境变量,设置好的环境变量就可以在镜像的构建中使用

格式:ENV key value        键值对形式

(3)COPY        拷贝本地文件到镜像的指定目录

例如:COPY ./mysql-5.7.rpm/tmp

(4)RUN        执行Linux的shell命令,一般是安装过程的命令

例如:RUN yum install gcc

(5)EXPOSE        指定容器运行时监听的端口,给镜像使用者看的

例如:EXPOSE 8080,我们运行容器时的-p 8080:8080,就是黄色部分的

(6)ENTRYPOINT        镜像中应用启动的命令,容器运行时调用

例如:EXTRYPOINT java -jar ###.jar

<3>自定义java项目的镜像

初始方案:

文件里导入jar包,jdk的安装包,Dockerfile文件,运行docker build 命令。

Dockerfile:

  1. # 指定基础镜像
  2. FROM ubuntu:16.04
  3. # 配置环境变量,JDK的安装目录
  4. ENV JAVA_DIR=/usr/local
  5. # 拷贝jdk
  6. COPY ./jdk8.tar.gz $JAVA_DIR/
  7. # 安装JDK
  8. RUN cd $JAVA_DIR \
  9. && tar -xf ./jdk8.tar.gz \
  10. && mv ./jdk1.8.0_144 ./java8
  11. # 配置环境变量
  12. ENV JAVA_HOME=$JAVA_DIR/java8
  13. ENV PATH=$PATH:$JAVA_HOME/bin
  14. #拷贝java项目的包(不通用)
  15. COPY ./docker-demo.jar /tmp/app.jar
  16. # 暴露端口(不通用)
  17. EXPOSE 8090
  18. # 入口,java项目的启动命令
  19. ENTRYPOINT java -jar /tmp/app.jar

 我们来看:我们的构建镜像是,主要是依据dockefile这个文件进行构建,我们仔细贯彻就会发现,里面的有一部分内容在构建java项目的时候是通用的。在这个文件里,我们发现安装JDK,配置环境变量基本通用。那么我们怎么来把这些重复的步骤处理下呢???

优化方案:

实际上包上述的过程包装成镜像的过程已经有人帮我们做了,我们只需要把基础镜像换为:

java:8-alpine

新的Dockerfile文件为:

  1. # 指定基础镜像
  2. FROM java:8-alpine
  3. #以下内容根据实际情况制定
  4. #拷贝java项目的包(不通用:修改jar包名)
  5. COPY ./docker-demo.jar /tmp/app.jar
  6. # 暴露端口(不通用:修改端口号)
  7. EXPOSE 8090
  8. # 入口,java项目的启动命令
  9. ENTRYPOINT java -jar /tmp/app.jar

新的操作步骤:

此时我们只需要在自定义的文件夹里拷如工程jar包Dockerfile文件即可。(注意里面的jar包名)

5、Docker-Compose

<1>了解docker-Compose

 Docker-Compose是用来做集群部署的,也就是同时构建并且运行n个容器。容器之间是互通的。

                想想我们之前构建运行容器时的命令很复杂,还没法复用,启动一个写一个。使用Docker-Compose我们就可以同时构建运行多个容器。

               Docker-Compose的语法格式与docker run不太一样,但是意思是一样的。

我们看上图:

service的下面第一级就是 -name 就是容器名。例如,mysql

容器名的下级的image就是 -d 镜像名和版本号。 例如,mysql:5.7.25

容器名的下级的environment就是-e 环境变量。 例如,MYSQL_ROOT_PASSWORD=root.

容器名的下级的volumes就是-v 挂载文件。例如上图。

特别的是

容器名的下级build: .  意思是,通过当前目录的dockerfile来构建镜像,并且使用此镜像。

<2>不适用docker-compose如何将Java项目构成镜像,容器部署。

构建java项目的镜像十分简单:

步骤:文件夹里放一个jar包一个dockerfile,然后在当前目录运行docker build -t web:1.0(随便起的) . 

解释:在当目录里,docker使用当前目录里的dockerfile将jar构建成一个镜像。

通过docker里已经存在的镜像构建运行一个容器也很容易例如上面的web:

步骤:直接运行命令docker run --name web -p 8088:8088 -d web:1.0

解释:docker通过web1.0镜像构建并运行名为web的容器,监听本机端口8088

(docker-Compose其实就是将这些命令格式化,并且成为文件运行。)

<3>docker-compose容器部署分布式项目

那么我们如果想通过docker-compose来部署一个分布式项目有哪些步骤呢???

 (1)我们先知道,我们的分布式项目有那些微服务。

上图中有:gateway,use-service,order-service三个微服务项目。其实也就是三个的Java的项目

我们知道构建java项目的镜像需要两个东西,一个是jar包,一个是dockerfile

那么我们准备好同名文件夹里面放这两个东西如图:

 特别注意因为集群部署可能会在不同的机器上部署,所以项目里的写死的地址,例如

mysql的地址,nacos的地址统统换位,docker-compose的sevices下的服务名。

(2)我们把准备好的几个微服务的同名文件夹放入cloud-demo的问价夹里。上传到虚拟机里。

然后运行docker-compose up -d 来部署项目。

当然这个文件夹里也应该放入一些,容器需要挂载的文件比如,数据库的data和conf。

(3)执行的前提是,写好我们的docker-compose的文件和dockerfile。

6、镜像私服

<1>我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:

  1. # 打开要修改的文件
  2. vi /etc/docker/daemon.json
  3. # 添加内容:这里的地址记住必须和docker-compose里的仓库地址一致
  4. "insecure-registries":["http://192.168.46.129:8088"]
  5. # 重加载
  6. systemctl daemon-reload
  7. # 重启docker
  8. systemctl restart docker

<2>在tmp目录里创建一个register-ui的文件夹,里面新建docker-compose.yml文件

文件内容:

  1. version: '3.0'
  2. services:
  3. registry:
  4. image: registry
  5. volumes:
  6. - ./registry-data:/var/lib/registry
  7. ui:
  8. image: joxit/docker-registry-ui:static
  9. ports:
  10. - 8080:80
  11. environment:
  12. - REGISTRY_TITLE=传智教育私有仓库
  13. - REGISTRY_URL=http://registry:5000
  14. depends_on:
  15. - registry

<3>docker-compose up -d运行

此时可以在192.168.46.129:8088访问图形化界面的私有仓库

<4>镜像的推送和拉取

九、RabbitMQ

1、异步调用

<1>异步调用的解决方案和优势

 事件驱动模式(订阅事件)

 

 

 

<2>异步调用优缺点

 2、了解RabbitMQ

 <1>通过docker安装运行RabbitMQ

 直接拉取MQ的镜像,并且运行MQ容器

  1. #拉取Rabbit的镜像
  2. docker pull rabbitmq:3-management
  1. docker run \
  2. -e RABBITMQ_DEFAULT_USER=root \ #用户名
  3. -e RABBITMQ_DEFAULT_PASS=root \ #密码
  4. --name mq \ #容器名
  5. --hostname mq1 \ #主机名
  6. -p 15672:15672 \ #访问可视化端口号
  7. -p 5672:5672 \
  8. -d \
  9. rabbitmq:3-management

运行之后可以访问可视化界面:

 <2>RTabbit的结构和概念

 exchange:交换机

queue:通道

VirtualHost:虚拟主机

3、RabbitMQ的五种常见消息模型

 <1>基本消息队列

 我们通过原生代码来了解RabbitMQ的工作流程:

我们以消息发者给接收者发一条信息为例。

消息发布者:

(1)创建连接,首先创建连接工厂,设置连接参数,然后利用连接工厂创建链接。

 

 (2)创建通道channel

 (3)创建消息队列

 

 (4)发送单个消息

 (5)关闭通道

  1. package cn.itcast.mq.helloworld;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import org.junit.Test;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. public class PublisherTest {
  9. @Test
  10. public void testSendMessage() throws IOException, TimeoutException {
  11. // 1.建立连接
  12. ConnectionFactory factory = new ConnectionFactory();
  13. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
  14. factory.setHost("192.168.46.129");
  15. factory.setPort(5672);
  16. factory.setVirtualHost("/");
  17. factory.setUsername("root");
  18. factory.setPassword("root");
  19. // 1.2.建立连接
  20. Connection connection = factory.newConnection();
  21. // 2.创建通道Channel
  22. Channel channel = connection.createChannel();
  23. // 3.创建队列
  24. String queueName = "simple.queue";
  25. channel.queueDeclare(queueName, false, false, false, null);
  26. // 4.发送消息
  27. String message = "hello, rabbitmq!";
  28. channel.basicPublish("", queueName, null, message.getBytes());
  29. System.out.println("发送消息成功:【" + message + "】");
  30. // 5.关闭通道和连接
  31. channel.close();
  32. connection.close();
  33. }
  34. }

消息订阅者:

 (1)创建连接,首先创建连接工厂,设置连接参数,然后利用连接工厂创建链接。

 

 (2)创建通道channel

 (3)创建消息队列

 

(4)订阅消息处理消息

 消息一旦被接受者接收查看,mq的队列里的消息会立刻清除。

  1. package cn.itcast.mq.helloworld;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class ConsumerTest {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. // 1.建立连接
  8. ConnectionFactory factory = new ConnectionFactory();
  9. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
  10. factory.setHost("192.168.46.129");
  11. factory.setPort(5672);
  12. factory.setVirtualHost("/");
  13. factory.setUsername("root");
  14. factory.setPassword("root");
  15. // 1.2.建立连接
  16. Connection connection = factory.newConnection();
  17. // 2.创建通道Channel
  18. Channel channel = connection.createChannel();
  19. // 3.创建队列
  20. String queueName = "simple.queue";
  21. channel.queueDeclare(queueName, false, false, false, null);
  22. // 4.订阅消息
  23. channel.basicConsume(queueName, true, new DefaultConsumer(channel){
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope,
  26. AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. // 5.处理消息
  28. String message = new String(body);
  29. System.out.println("接收到消息:【" + message + "】");
  30. }
  31. });
  32. System.out.println("等待接收消息。。。。");
  33. }
  34. }

九(2)、SpringAMQP整合RabbitMQ

1、 入门案例

<1>使用AMQP完成基本消息队列的消息发布

(1) AMQP的起步依赖

  1. <!--AMQP依赖,包含RabbitMQ-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>

(2)在yml文件里配置RabbitMQ的相关信息

  1. Spring:
  2. rabbitmq:
  3. host: 192.168.46.129 #主机名
  4. port: 5672 #端口
  5. virtual-host: / #虚拟主机
  6. username: root #用户名
  7. password: root #密码

(3)消息发布

 我们通过下面的代码发现,比起原生的代码,这个代码简直不要太简单。

  1. package cn.itcast.mq.helloworld;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import org.junit.Test;
  6. import org.junit.runner.RunWith;
  7. import org.springframework.amqp.core.AmqpTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.test.context.SpringBootTest;
  10. import org.springframework.test.context.junit4.SpringRunner;
  11. import java.io.IOException;
  12. import java.util.concurrent.TimeoutException;
  13. @RunWith(SpringRunner.class)
  14. @SpringBootTest
  15. public class PublisherTest {
  16. @Autowired
  17. private AmqpTemplate amqpTemplate;
  18. @Test
  19. public void TestSendMQ(){
  20. String SendMsg = "hello rabbitMq";
  21. String queueName = "king";
  22. amqpTemplate.convertAndSend(queueName,SendMsg);//队列名和信息
  23. }
  24. }

<2>使用AMQP完成基本消息队列的监听消息

(1)导入依赖

(2)写配置

(3)添加@Component注解

(4)添加@RabbitListener注解,方法参数就是接收到的消息

  1. package cn.itcast.mq.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.web.bind.annotation.GetMapping;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. //@Component
  9. @RequestMapping("listen")
  10. public class listen {
  11. @GetMapping
  12. @RabbitListener(queues = "king")
  13. public void listenQueue(String msg){
  14. System.out.println("接收到的消息是:"+msg);
  15. }
  16. }

<2>Work Queue

        work queue和基本队列模型相比,是可以挂多个消费者,共同处理消息。由于RabbitMQ有消息预取机制,会在未执行前先平均取消息。这里可以用配置prefetch的值为1,即执行完一个才能取一个消息,这样就可以让效率高的多处理消息,效率低的少处理消息。

如股不配置prefetch就会一个接口处理一半消息。

使用线程睡眠模拟接口效率。发送消息要消耗20*50ms,work1(20ms)的效率是work2(200ms)的10倍。

所以work1处理40条消息,work2处理10条消息。

(1)设置prefetch为1

(2)设置两个消费者处理同一个队列的消息。

发送代码:

  1. package cn.itcast.mq.Send;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. @RequestMapping("send")
  7. public class sends {
  8. @Autowired
  9. private AmqpTemplate amqp;
  10. @PostMapping
  11. public void SendMQ1( String queueName, String msg) throws InterruptedException {
  12. for (int i = 0; i < 50; i++) {
  13. amqp.convertAndSend(queueName,msg+i);
  14. System.out.println("发布消息:"+queueName+","+msg+i);
  15. Thread.sleep(20);
  16. }
  17. }
  18. }

消费代码:

  1. package cn.itcast.mq.listener;
  2. import org.omg.CORBA.Current;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.time.LocalTime;
  9. @RestController
  10. //@Component
  11. public class listen {
  12. @RabbitListener(queues = "king")
  13. public void workQueue1(String msg) throws InterruptedException {
  14. System.out.println("work1接收到的消息是:"+msg+"---"+ LocalTime.now());
  15. Thread.sleep(20);
  16. }
  17. @RabbitListener(queues = "king")
  18. public void workQueue2(String msg) throws InterruptedException {
  19. System.out.println("work2接收到的消息是:"+msg+"---"+LocalTime.now());
  20. Thread.sleep(200);
  21. }
  22. }

2、发布和订阅

<1>Fanout广播交换机

   作用:     这个广播交换机,可以绑定多个队列,消息发布者发布的消息,会发给每个被绑定的交换机的队列所有相同的消息。

在操作前我们先了解一份继承图。

(1)在消费者的服务理定义fanout exchange的配置,绑定队列和交换机

(2)在消费者服务里配置两个消费者,各自监听自己的队列

(3)在消息发布者服务里写发布方法

参数换为:exchangeName,RoutingKey,msg

此时routingkry先为空字符串即可。

 我的代码:

(1)配置类绑定交换机和队列

  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutConfig {
  10. // 声明交换机
  11. @Bean
  12. public FanoutExchange fanoutExchange(){
  13. return new FanoutExchange("wrx.fanout");//交换机名
  14. }
  15. // 声明队列1
  16. @Bean
  17. public Queue fanoutQueue1(){
  18. return new Queue("wrx.queue1");
  19. }
  20. // 声明队列2
  21. @Bean
  22. public Queue fanoutQueue2(){
  23. return new Queue("wrx.queue2");
  24. }
  25. // 绑定队列1
  26. @Bean
  27. public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
  28. return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  29. }
  30. // 绑定队列2
  31. @Bean
  32. public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
  33. return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  34. }
  35. }

(2)写两个消费者

  1. package cn.itcast.mq.listener;
  2. import org.omg.CORBA.Current;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.time.LocalTime;
  9. @RestController
  10. //@Component
  11. public class listen {
  12. /**
  13. * 处理wrx.queue1的消息
  14. * @param msg
  15. * @throws InterruptedException
  16. */
  17. @RabbitListener(queues = "wrx.queue1")
  18. public void fanoutQueue1(String msg) throws InterruptedException {
  19. System.out.println("fq1处理消息:"+msg+"---"+ LocalTime.now());
  20. }
  21. /**
  22. * 处理wrx.queue2的消息
  23. * @param msg
  24. * @throws InterruptedException
  25. */
  26. @RabbitListener(queues = "wrx.queue2")
  27. public void fanoutQueue2(String msg) throws InterruptedException {
  28. System.out.println("fq2处理消息:"+msg+"---"+ LocalTime.now());
  29. }
  30. }

(3)写消息发布者

  1. package cn.itcast.mq.Send;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. @RequestMapping("send")
  7. public class sends {
  8. @Autowired
  9. private AmqpTemplate amqp;
  10. @PostMapping("fanout")
  11. public void fanoutSend( String exchangeName,String Routingkey ,String msg) {
  12. //发送消息
  13. amqp.convertAndSend(exchangeName,Routingkey,msg);
  14. System.out.println("发布消息:"+exchangeName+","+Routingkey+","+msg);
  15. }
  16. }

<2>Direct路由交换机

       我们需要给每个队列指定bindkey,它可以是一个也可以是多个。

       我们在发送消息时,必须指定routingkey,这个key必须与指定的bindingkey相同,相应的队列才能获取消息。

(1)在消息接收者的@RabbitListener注解里声明Exchange和Queue

  1. @RabbitListener(bindings = @QueueBinding( //绑定队列交换机
  2. value = @Queue("direct.queue1"), //queue名字
  3. exchange = @Exchange("wrx.direct"), //exchange名字
  4. key = {"red","wrx"} //BindingKey
  5. ))

 (2)消息发布者填写对应的RoutingKey

 代码示例:

(1)

  1. package cn.itcast.mq.listener;
  2. import org.springframework.amqp.rabbit.annotation.Exchange;
  3. import org.springframework.amqp.rabbit.annotation.Queue;
  4. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.time.LocalTime;
  8. @RestController
  9. public class DirectListener {
  10. /**
  11. * 处理wrx.direct1的消息
  12. * @param msg
  13. * @throws InterruptedException
  14. */
  15. @RabbitListener(bindings = @QueueBinding(
  16. value = @Queue("direct.queue1"),
  17. exchange = @Exchange("wrx.direct"),
  18. key = {"red","wrx"}//BindingKey
  19. ))
  20. public void DirectQueue1(String msg) throws InterruptedException {
  21. System.out.println("direct1处理消息:"+msg+"---"+ LocalTime.now());
  22. }
  23. /**
  24. * 处理wrx.direct2的消息
  25. * @param msg
  26. * @throws InterruptedException
  27. */
  28. @RabbitListener(bindings = @QueueBinding(
  29. value = @Queue("direct.queue2"),
  30. exchange = @Exchange("wrx.direct"),
  31. key = {"blue","wrx"}//BindingKey
  32. ))
  33. public void DirectQueue2(String msg) throws InterruptedException {
  34. System.out.println("direct2处理消息:"+msg+"---"+ LocalTime.now());
  35. }
  36. }

(2)我们在postman发post请求就可以发布消息了

 routingkey为wrx会发给两个队列

 routingkey为red会发给direct.queue1队列

  1. package cn.itcast.mq.Send;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.autoconfigure.web.client.AutoConfigureMockRestServiceServer;
  5. import org.springframework.web.bind.annotation.PostMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.time.LocalDateTime;
  9. import java.time.LocalTime;
  10. @RestController
  11. @RequestMapping("direct")
  12. public class DirectSend {
  13. @Autowired
  14. private AmqpTemplate amqp;
  15. @PostMapping
  16. public void DirectSend(String exchangeName,String routingKey,String msg ){
  17. amqp.convertAndSend(exchangeName,routingKey,msg);
  18. System.out.println("发送消息:"+msg+ "-----"+LocalTime.now());
  19. }
  20. }

<3>Topic话题交换机

 说明:话题交换机和路由交换机最大的区别在于,他们的BindingKey用法不同。

         Topic的BindingKey允许使用通配符   #   和   *

          #:代表一个或多个单词

         *:代表一个单词

可以有的形式为:1、china.#

                             2、#.news

                             3、china.*

                             4、*.news

发送消息时,RoutingKey只要匹配通配符之外的字符就可以和Queue队列建立联系。

(1)、在消费者里声明队列和交换机的绑定关系,定义BindingKey

 (2)、根据对应的交换机名字和BindingKey来确定RoutingKey来发送消息

符合通配符的消费者队列都会接收到消息。 

我的代码:

(1)绑定交换机和队列,设置交换机类型Type,规定BindingKey

  1. package cn.itcast.mq.listener;
  2. import org.springframework.amqp.core.ExchangeTypes;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.web.bind.annotation.PostMapping;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import java.time.LocalTime;
  11. @RestController
  12. public class TopicListener {
  13. /**
  14. * 处理topic.queue1的消息
  15. * @param msg
  16. * @throws InterruptedException
  17. */
  18. @RabbitListener(bindings = @QueueBinding(
  19. value = @Queue("topic.queue1"),
  20. exchange = @Exchange(value = "wrx.topic",type = "topic"),//topic类型
  21. key = {"china.#"}
  22. ))
  23. public void TopicQueue1(String msg) throws InterruptedException {
  24. System.out.println("topic1处理消息:"+msg+"---"+ LocalTime.now());
  25. }
  26. /**
  27. * 处理wrx.direct2的消息
  28. * @param msg
  29. * @throws InterruptedException
  30. */
  31. @RabbitListener(bindings = @QueueBinding(
  32. value = @Queue("topic.queue2"),
  33. exchange = @Exchange(value = "wrx.topic",type = ExchangeTypes.TOPIC),//topic类型
  34. key = {"#.news"}
  35. ))
  36. public void TopicQueue2(String msg) throws InterruptedException {
  37. System.out.println("topic2处理消息:"+msg+"---"+ LocalTime.now());
  38. }
  39. }

(2)发送消息

  1. package cn.itcast.mq.Send;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.PostMapping;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.time.LocalTime;
  8. @RestController
  9. @RequestMapping("topic")
  10. public class TopicSend {
  11. @Autowired
  12. private AmqpTemplate amqp;
  13. @PostMapping
  14. public void DirectSend(String exchangeName,String routingKey,String msg ){
  15. amqp.convertAndSend(exchangeName,routingKey,msg);//routingKey= china.weather,展示通配符效果
  16. System.out.println("发送消息:"+msg+ "-----"+ LocalTime.now());
  17. }
  18. }

<4>消息转换器Json序列化

在发送消息时,如果数据类型不是字节类型的就会出问题,因为SpringAMQP的序列化器默认使用的是java的jdk的序列化器。我们将它改为Json的即可。详细见下图。

(1)引用以来,加配置(发布者)

引用依赖

  1. <!-- Json序列化依赖-->
  2. <dependency>
  3. <groupId>com.fasterxml.jackson.dataformat</groupId>
  4. <artifactId>jackson-dataformat-xml</artifactId>
  5. </dependency>

 加配置

  1. //使用Json的序列化方式
  2. @Bean
  3. public MessageConverter messageConverter(){
  4. return new Jackson2JsonMessageConverter();
  5. }

 (2)引依赖,加配置 (消费者)

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/502372
推荐阅读
相关标签
  

闽ICP备14008679号