赞
踩
目录
1、在父模块下创建一个springboot的子模块名字为eureka,并且引入依赖
2、在启动类里添加@EnableEurekaServer的注解
<1>我们使用RestTemplate发http请求时把url的ip和端口号换成服务名
<2>在启动类中的RestTemplate上加负载均衡的注解@LoadBalanced
<6>nacos与uereka的区别(nocas的非临时实例)
<2>新建配置类添加@ConfigurationProperties (常用)
<2>在启动类里加开启feign的注解@EnableFeignClients
<3>编写feign的客户端(和编写controller很相似)
<2>使用HttpClient或者OKHttp连接池代替默认的URLConnection
<1>新建模块命名为gateway,并且引入nacos依赖和gateway依赖
目前为止可以做出微服务的流程图仅为这样:编辑3、断言工厂Route Predicate Factory
<2>不适用docker-compose如何将Java项目构成镜像,容器部署。
<1>我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:
<2>在tmp目录里创建一个register-ui的文件夹,里面新建docker-compose.yml文件
SpringCloud实际是分布式的一种解决方案。
远程调用的关键就是发http请求,我们使用RestTemplate这个类来完成这件事。
这个类就是来实现发restful风格的请求的。
关键操作如图所示:
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
- </dependency>
- server:
- port: 10086
- spring:
- application:
- name: eurekaserver #微服务的名字
- #配置这个地址只为了以后多个eureka注册中心交互数据
- eureka:
- client:
- service-url:
- defaultZone : http://127.0.0.1:10086/eureka/
-
-
- <!-- eureka客户端依赖 -->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
这里的配置文件的地址是eureka配置文件的地址是一样的
- spring:
- application:
- name: xxxxx
-
-
- eureka:
- client:
- service-url:
- defaultZone: http://127.0.0.1:10086/eureka #默认访问路径
然后我们运行服务,我们的服务就注册到了eureka上了。
运行服务后,我们点击这里。
然后会跳转到我们的eureka界面:
<1>
<2>
<3>效果展示
如此一来我们便可以发现服务,通过服务名访问服务,并且完成服务的负载均衡。
IRule决定了负载均er
根据Zone来区分服务器所在位置,进而进一步选择轮询的的服务器,默认选择相同Zone的服务器进行轮询。
(1)启动负载均衡在RestTemplate上添加@LoadBalanced
(2)通过代码的方式(所有服务生效)
在启动类里重新定义一个新的IRule,需要什么策略就返回一个什么对象。
注意:这种方式配置的策略作用于全局,以后无论访问任何的其他的服务都会采用这种策略。
- @Bean
- public IRule randomRule(){
- return new RandomRule();
- }
(3)配置文件的方式(设置的服务生效)
在yml配置文件中新增配置完成策略配置。
第一条userservice是要配置负载均衡的服务的名字,以后在访问这个服务的时候,就可以采用这个策略进行访问了。
注意:这个策略只是针对我们设置的服务,其他服务走默认的策略。
- userservice: # 给某个微服务配置负载均衡规则,这里是userservice服务
- ribbon:
- NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则
Ribbon默认加载方式是懒加载模式,就是需要用到某个服务才会加载,第一次加载时间长。
Ribbon的饥饿加载是:在初始化的时候就加载。
- #饥饿加载的配置
- ribbon:
- eager-load:
- enabled: true
- clients: user-server
clients是一个list集合:
如果需要添加多个服务,格式为
- #饥饿加载的配置
- ribbon:
- eager-load:
- enabled: true
- clients:
- - user-server
- - xxxx-server
时使用前记得先打开nacos的服务。
在bin文件下开启cmd输入:startup.cmd -m standalone(windows系统)
- <!-- 管理依赖-->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-alibaba-dependencies</artifactId>
- <version>2.2.5.RELEASE</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- <!-- nacos客户端依赖包 -->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
- </dependency>
- spring:
- # nacos的服务端地址配置
- cloud:
- nacos:
- server-addr: localhost:8848
服务、集群、实例
在需要的集群的服务里加上相同的集群名字配置。
- # nacos的服务端地址配置
- cloud:
- nacos:
- server-addr: localhost:8848
- discovery:
- cluster-name: shanghai #配置集群名字,也就是机房位置
效果图:
即:默认情况下,再进行远程访问时候,优先访问相同的集群
例如:
我们的orderserver设置的是BJ集群,userserver有三个实例。实例1和实例2设置的也是BJ集群,实例3设置为HZ集群,那么orderserver访问的userserver的时候就会优先访问BJ集群的实例1和实例2,在这个两个实例里进行随机访问。
只有当BJ集群的所有实例都无法访问时候,才会去访问其他集群。
- cloud:
- nacos:
- server-addr: localhost:8848
- discovery:
- cluster-name: shanghai #配置集群名字,也就是机房位置
权重越大访问的几率越大。
在代码开发过程中,有不同的环境,生产环境,开发环境,测试环境。这几个环境的代码的服务是不互通的。
如何实现呢?
在Nacos中有环境隔离名为:namespace
每个namespace是一个环境。
设置方式:
在nacos中新建命名空间,获取命名空间id,在yum文件中配置namspace的id
相同点:
都会进行心跳检测,来判断服务是否存活,不存活就干掉。
都会接收消费者的定时拉取服务名和地址
不同点是:
对于nacos的非临时实例来说,nocas会主动询问服务是否存活,服务不健康就i等待。
除此之外,nacos还会主动推送服务的存活信息给服务者
注意:如何设置临时实例为非临时实例呢
对于非临时实例,nacos对其的存活状态更加关注,也会更快发现他是否健康。
根据下面的方式,在nacos的客户端进行服务的配置管理。
新建配置文件和服务进行关联。
微服务要拉取nacos中管理的配置,并且与本地的application.yml配置合并,才能完成项目启动。
但如果尚未读取application.yml,又如何得知nacos地址呢?
因此spring引入了一种新的配置文件:bootstrap.yaml文件,会在application.yml之前被读取,流程如下:
- <!--nacos配置管理依赖-->
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
- </dependency>
注意我们的nacos的配置管理的配置文件名为:user-server-dev.yaml
服务名-环境名.yaml
- spring:
- application:
- name: user-server # 服务名称
- profiles:
- active: dev #开发环境,这里是dev
- cloud:
- nacos:
- server-addr: localhost:8848 # Nacos地址
- config:
- file-extension: yaml # 文件后缀名
配置文件里设置的是时间格式。
我们使用@Value来读取这个配置的信息对日期进行格式化。
结果:
成功读取。
在需要热更新的类上加上@RefreshScope
@Date是lombook的方式实现set和get方法
@Component把配置类送到spring容器配置完成后,PatternPropertises的对象pp,获取的pp.getDadteformat()得到的值就会热更新了。
pattern.dateformat就是我们管理的配置文件的里的日期格式。
微服务启动时,会从nacos读取多个配置文件:
也会是说:
这种的配置文件时多环境共享全部生效的。
以user-server这个服务为例子。注意文件名的格式:服务名.yaml
单环境配置文件>多环境配置文件>本地配置文件
不想搭建。。。
- <!--feign的客户端依赖-->
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-openfeign</artifactId>
- </dependency>
记得把RestTemplate注释下
客户端是接口的形式。
第一:@FeignClient("服务名")
接下来的步骤与springMVC的请求响应形式完全类似。
第二:确定请求方式和参数例如:@GetMapping("/user/{id}")
第三:请求的方法和返回值
以修改日志级别为例:
实际开发,我们需要压测最大连接数和单个路径最大连接数
方案1:
方案二:
回忆下我们原本的feign的是实现方式:
现在的实现方式:
(1)父工程新建一个模块,命名为userFeign,然后引入Feign的starter依赖
(2)在这个子模块里是实现feign客户端和配置(将order-service中的userClient和User和feign的配置文件复制到这个新的模块)记得删除rder-service里的重复的类
(3)在order-service引入userFeign的依赖
(4)重新导入需要的包
如果无法导入,在编译一次工程
(5)在@EnableFeignClients中添加需要的客户端
方式有两种:
我个人比较懒推荐第一种(老师推荐第二种)
总结:
server: port: 10010 # 网关端口 spring: application: name: gateway #服务名 cloud: nacos: server-addr: localhost:8848 # nacos的服务地址 gateway: routes: #网关路由配置 - id: user-server # 路由id,自定义,不重复即可 uri: lb://user-server # 路由的目标地址,lb就是负载均衡后面跟服务名 predicates: #路由断言 就是布尔语句 - Path=/user/** # 这个是路径匹配,只要符合/user/开头就符合要求 - id: order-server uri: lb://order-server predicates: - Path=/order/**
此时我们在访问微服务时候,就不在是直接访问微服务的ip地址而是,网关的IP+服务请求路径
接下来的执行流程如下图:
如果不符合路由规则,会报404的错误。
以AddRequestHeader为例
这个过滤器工厂会,给符合过滤条件的请求添加自定义的请求头
在Spring里,我们可以使用@RequsetHeader("请求头名")去接这个数据
把defalult-filter代替filter
虽然我们上文已经介绍了,全局过滤器的一种配置,但是他的功能单一,有时我们需要在过滤器添加许多功能,这是上面的过滤器所不能实现的。
需求:.请求中必须带有filter参数,且filter参数必须为666才放行,否则401
- package com.wrx;
-
- import org.springframework.cloud.gateway.filter.GatewayFilterChain;
- import org.springframework.cloud.gateway.filter.GlobalFilter;
- import org.springframework.core.annotation.Order;
- import org.springframework.http.HttpStatus;
- import org.springframework.http.server.reactive.ServerHttpRequest;
- import org.springframework.stereotype.Component;
- import org.springframework.util.MultiValueMap;
- import org.springframework.web.server.ServerWebExchange;
- import reactor.core.publisher.Mono;
- @Order(-1)//控制过滤器顺序,int值越小,优先级越高
- @Component
- public class filter_1 implements GlobalFilter {
- @Override
- public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
- // 1、获取参数
- ServerHttpRequest request = exchange.getRequest();
- MultiValueMap<String, String> Params = request.getQueryParams();
- // 2、获取参数中的filter参数
- String filter = Params.getFirst("filter");
- // 3、如果参数值等于666就放行
- if ("666".equals(filter)){
- // 4、是就放行
- return chain.filter(exchange);
- }
- // 5、设置状态码
- exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
- // 6、拦截请求
- return exchange.getResponse().setComplete();
- }
- }

cors解决方案,浏览器向服务器发出请求,询问是否允许跨域
会CV会修改即可,不用刻意的记忆。
#全局的跨域处理 globalcors: # 全局的跨域处理 add-to-simple-url-handler-mapping: true # 解决options请求被拦截问题 corsConfigurations: '[/**]': allowedOrigins: # 允许哪些网站的跨域请求 - "http://localhost:10010" #需要那个网站跨域,就写那几个网站即可 allowedMethods: # 允许的跨域ajax的请求方式 - "GET" - "POST" - "DELETE" - "PUT" - "OPTIONS" allowedHeaders: "*" # 允许在请求中携带的头信息 allowCredentials: true # 是否允许携带cookie maxAge: 360000 # 这次跨域检测的有效期
解决环境、依赖兼容的问题
解决不同系统应用的问题
解决方案:系统函数库一同打包
镜像是只用来读数据,不写数据的,数据写在本地文件夹。
基于luinx系统
systemctl start docker # 启动docker服务
systemctl stop docker # 停止docker服务
systemctl restart docker # 重启docker服务
镜像名:[repository]:[tag]
(1)构建镜像 docker build
(2)查看镜像 docker images
(3)删除镜像 docker rmi [image]
(4)镜像推送 docker push [image]
(5)镜像拉取 docker pull [image]
(6)镜像打包 docker save -o [保存的文件名] [image]
(7)镜像加载(压缩包)docker load -i [压缩包]
(8)docker --help 获取所有的docker命令
(9)docker save --help 查看docker save的使用方法
实践一:
实践二:
(1)创建容器 docker run --name [容器名] -p [主机端口]:[镜像端口] -d [image]
(2)删除容器 docker rm
(3)容器暂停 docker pause
(4)容器取消暂停 docker unpause
(5)容器停止 docker stop
(6)容器重启 docker start
(7)进入容器执行命令 docker exec
(8)查看容器运行日志 docker logs -f [容器名]
(9)查看所有运行的容器及状态 docker ps
实践1:
运行daoker容器:
docker run --name [容器命] -p [主机端口号] : 80 -d nginx
示例:
docker run --name mynginx -p 80:80 -d nginx
mynginx: 容器命
-p 80:80 :第一个80是宿主服务器的端口
查看日志:
docker logs [容器名]
docker logs mynginx :查看mynginx的容器的日志
docker logs -f mynginx : 持续跟踪mynginx
实践2:
实践三:
1:创建容器(设置持久化存储)
docker run --name myredis -p 6379:6379 -d redis redis-server --appendonly yes
2:进入redis容器,打开redis-cli
docker exec -it myredis (bash)redis-cli
数据卷其实,是一个doeker的虚拟目录,虚拟目录文件与本地的磁盘的/volume的目录关联,虚拟目录的,每创建一个虚拟文件,就会在本地磁盘的/volume创建一个真实的文件与之关联。
容器可以使用需虚拟目录里的文件,也就是与虚拟目录的文件关联,实际就是间接与本地文件关联,进而实现本地文件与容器文件的同步更新。
(1)docker volume create
(2)docker volume inspect
(3)docker volume ls
(4)docker volume prune
(5)docker volume rm
即使我们不是事先创建虚拟目录html,docker也会帮我们创建。
(1)先创建容器,创建容器时候就可以指定数据卷(虚拟目录)
(2)然后通过操作数据卷的命令操作刚创建好的数据卷,一般来说通过inspect命令来获取数据卷为所有数据信息,包括它在硬盘中关联的文件,也可以进行其他操作,比如删除。
(3)找到实际的目录位置吗,就可以对其进行修改了。
(4)删除容器,删除数据卷。
这种方式,虽然需要自己创建目录,但是也更方便我们找到我们挂载的目录。
实现方式以Mysql为例:
1:上穿mysql的镜像包
2:创建对应目录
3:在conf目录传入配置文件
4:加载镜像
5:运行镜像,并且挂在宿主机文件
docker run --name mysql \
-p 3305:3306 \
-e MYSQL_ROOT_PASSWORD=root \
-v /tmp/mysql/conf/hmy.cnf:/etc/mysql/conf.d/hmy.cnf \
-v /tmp/mysql/data:/var/lib/mysql \
-d mysql:5.7.25
注意:这是一条命令 \ 是换行符,表示命令还没结束
-e 设置mysql的密码(环境变量)
-v 挂载宿主机文件到容器
(1)FROM 指定基础镜像,也就是操作系统类型顶
例如:FROM centos:6 使用Centos:6操作系统
(2)ENV 设置环境变量,设置好的环境变量就可以在镜像的构建中使用
格式:ENV key value 键值对形式
(3)COPY 拷贝本地文件到镜像的指定目录
例如:COPY ./mysql-5.7.rpm/tmp
(4)RUN 执行Linux的shell命令,一般是安装过程的命令
例如:RUN yum install gcc
(5)EXPOSE 指定容器运行时监听的端口,给镜像使用者看的
例如:EXPOSE 8080,我们运行容器时的-p 8080:8080,就是黄色部分的
(6)ENTRYPOINT 镜像中应用启动的命令,容器运行时调用
例如:EXTRYPOINT java -jar ###.jar
初始方案:
文件里导入jar包,jdk的安装包,Dockerfile文件,运行docker build 命令。
Dockerfile:
- # 指定基础镜像
- FROM ubuntu:16.04
- # 配置环境变量,JDK的安装目录
- ENV JAVA_DIR=/usr/local
-
- # 拷贝jdk
- COPY ./jdk8.tar.gz $JAVA_DIR/
-
-
- # 安装JDK
- RUN cd $JAVA_DIR \
- && tar -xf ./jdk8.tar.gz \
- && mv ./jdk1.8.0_144 ./java8
-
- # 配置环境变量
- ENV JAVA_HOME=$JAVA_DIR/java8
- ENV PATH=$PATH:$JAVA_HOME/bin
-
- #拷贝java项目的包(不通用)
- COPY ./docker-demo.jar /tmp/app.jar
-
- # 暴露端口(不通用)
- EXPOSE 8090
-
- # 入口,java项目的启动命令
- ENTRYPOINT java -jar /tmp/app.jar

我们来看:我们的构建镜像是,主要是依据dockefile这个文件进行构建,我们仔细贯彻就会发现,里面的有一部分内容在构建java项目的时候是通用的。在这个文件里,我们发现安装JDK,配置环境变量基本通用。那么我们怎么来把这些重复的步骤处理下呢???
优化方案:
实际上包上述的过程包装成镜像的过程已经有人帮我们做了,我们只需要把基础镜像换为:
java:8-alpine
新的Dockerfile文件为:
- # 指定基础镜像
- FROM java:8-alpine
-
- #以下内容根据实际情况制定
-
- #拷贝java项目的包(不通用:修改jar包名)
- COPY ./docker-demo.jar /tmp/app.jar
-
- # 暴露端口(不通用:修改端口号)
- EXPOSE 8090
-
- # 入口,java项目的启动命令
- ENTRYPOINT java -jar /tmp/app.jar
新的操作步骤:
此时我们只需要在自定义的文件夹里拷如工程jar包和Dockerfile文件即可。(注意里面的jar包名)
Docker-Compose是用来做集群部署的,也就是同时构建并且运行n个容器。容器之间是互通的。
想想我们之前构建运行容器时的命令很复杂,还没法复用,启动一个写一个。使用Docker-Compose我们就可以同时构建运行多个容器。
Docker-Compose的语法格式与docker run不太一样,但是意思是一样的。
我们看上图:
service的下面第一级就是 -name 就是容器名。例如,mysql
容器名的下级的image就是 -d 镜像名和版本号。 例如,mysql:5.7.25
容器名的下级的environment就是-e 环境变量。 例如,MYSQL_ROOT_PASSWORD=root.
容器名的下级的volumes就是-v 挂载文件。例如上图。
特别的是:
容器名的下级build: . 意思是,通过当前目录的dockerfile来构建镜像,并且使用此镜像。
构建java项目的镜像十分简单:
步骤:文件夹里放一个jar包一个dockerfile,然后在当前目录运行docker build -t web:1.0(随便起的) .
解释:在当目录里,docker使用当前目录里的dockerfile将jar构建成一个镜像。
通过docker里已经存在的镜像构建运行一个容器也很容易例如上面的web:
步骤:直接运行命令docker run --name web -p 8088:8088 -d web:1.0
解释:docker通过web1.0镜像构建并运行名为web的容器,监听本机端口8088
(docker-Compose其实就是将这些命令格式化,并且成为文件运行。)
那么我们如果想通过docker-compose来部署一个分布式项目有哪些步骤呢???
(1)我们先知道,我们的分布式项目有那些微服务。
上图中有:gateway,use-service,order-service三个微服务项目。其实也就是三个的Java的项目
我们知道构建java项目的镜像需要两个东西,一个是jar包,一个是dockerfile
那么我们准备好同名文件夹里面放这两个东西如图:
特别注意因为集群部署可能会在不同的机器上部署,所以项目里的写死的地址,例如
mysql的地址,nacos的地址统统换位,docker-compose的sevices下的服务名。
(2)我们把准备好的几个微服务的同名文件夹放入cloud-demo的问价夹里。上传到虚拟机里。
然后运行docker-compose up -d 来部署项目。
当然这个文件夹里也应该放入一些,容器需要挂载的文件比如,数据库的data和conf。
(3)执行的前提是,写好我们的docker-compose的文件和dockerfile。
- # 打开要修改的文件
- vi /etc/docker/daemon.json
- # 添加内容:这里的地址记住必须和docker-compose里的仓库地址一致
- "insecure-registries":["http://192.168.46.129:8088"]
- # 重加载
- systemctl daemon-reload
- # 重启docker
- systemctl restart docker
文件内容:
- version: '3.0'
- services:
- registry:
- image: registry
- volumes:
- - ./registry-data:/var/lib/registry
- ui:
- image: joxit/docker-registry-ui:static
- ports:
- - 8080:80
- environment:
- - REGISTRY_TITLE=传智教育私有仓库
- - REGISTRY_URL=http://registry:5000
- depends_on:
- - registry
此时可以在192.168.46.129:8088访问图形化界面的私有仓库
事件驱动模式(订阅事件)
直接拉取MQ的镜像,并且运行MQ容器
- #拉取Rabbit的镜像
- docker pull rabbitmq:3-management
- docker run \
- -e RABBITMQ_DEFAULT_USER=root \ #用户名
- -e RABBITMQ_DEFAULT_PASS=root \ #密码
- --name mq \ #容器名
- --hostname mq1 \ #主机名
- -p 15672:15672 \ #访问可视化端口号
- -p 5672:5672 \
- -d \
- rabbitmq:3-management
运行之后可以访问可视化界面:
exchange:交换机
queue:通道
VirtualHost:虚拟主机
我们通过原生代码来了解RabbitMQ的工作流程:
我们以消息发者给接收者发一条信息为例。
消息发布者:
(1)创建连接,首先创建连接工厂,设置连接参数,然后利用连接工厂创建链接。
(2)创建通道channel
(3)创建消息队列
(4)发送单个消息
(5)关闭通道
- package cn.itcast.mq.helloworld;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import org.junit.Test;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class PublisherTest {
- @Test
- public void testSendMessage() throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.46.129");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("root");
- factory.setPassword("root");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
-
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
-
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
-
- // 4.发送消息
- String message = "hello, rabbitmq!";
- channel.basicPublish("", queueName, null, message.getBytes());
- System.out.println("发送消息成功:【" + message + "】");
-
- // 5.关闭通道和连接
- channel.close();
- connection.close();
-
- }
- }

消息订阅者:
(1)创建连接,首先创建连接工厂,设置连接参数,然后利用连接工厂创建链接。
(2)创建通道channel
(3)创建消息队列
(4)订阅消息处理消息
消息一旦被接受者接收查看,mq的队列里的消息会立刻清除。
- package cn.itcast.mq.helloworld;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class ConsumerTest {
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.建立连接
- ConnectionFactory factory = new ConnectionFactory();
- // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
- factory.setHost("192.168.46.129");
- factory.setPort(5672);
- factory.setVirtualHost("/");
- factory.setUsername("root");
- factory.setPassword("root");
- // 1.2.建立连接
- Connection connection = factory.newConnection();
-
- // 2.创建通道Channel
- Channel channel = connection.createChannel();
-
- // 3.创建队列
- String queueName = "simple.queue";
- channel.queueDeclare(queueName, false, false, false, null);
-
- // 4.订阅消息
- channel.basicConsume(queueName, true, new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 5.处理消息
- String message = new String(body);
- System.out.println("接收到消息:【" + message + "】");
- }
- });
- System.out.println("等待接收消息。。。。");
- }
- }

(1) AMQP的起步依赖
- <!--AMQP依赖,包含RabbitMQ-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
(2)在yml文件里配置RabbitMQ的相关信息
- Spring:
- rabbitmq:
- host: 192.168.46.129 #主机名
- port: 5672 #端口
- virtual-host: / #虚拟主机
- username: root #用户名
- password: root #密码
(3)消息发布
我们通过下面的代码发现,比起原生的代码,这个代码简直不要太简单。
- package cn.itcast.mq.helloworld;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class PublisherTest {
- @Autowired
- private AmqpTemplate amqpTemplate;
- @Test
- public void TestSendMQ(){
- String SendMsg = "hello rabbitMq";
- String queueName = "king";
- amqpTemplate.convertAndSend(queueName,SendMsg);//队列名和信息
- }
-
- }

(1)导入依赖
(2)写配置
(3)添加@Component注解
(4)添加@RabbitListener注解,方法参数就是接收到的消息
- package cn.itcast.mq.listener;
-
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
-
- @RestController
- //@Component
- @RequestMapping("listen")
- public class listen {
- @GetMapping
- @RabbitListener(queues = "king")
- public void listenQueue(String msg){
- System.out.println("接收到的消息是:"+msg);
- }
-
-
-
- }

work queue和基本队列模型相比,是可以挂多个消费者,共同处理消息。由于RabbitMQ有消息预取机制,会在未执行前先平均取消息。这里可以用配置prefetch的值为1,即执行完一个才能取一个消息,这样就可以让效率高的多处理消息,效率低的少处理消息。
如股不配置prefetch就会一个接口处理一半消息。
使用线程睡眠模拟接口效率。发送消息要消耗20*50ms,work1(20ms)的效率是work2(200ms)的10倍。
所以work1处理40条消息,work2处理10条消息。
(1)设置prefetch为1
(2)设置两个消费者处理同一个队列的消息。
发送代码:
- package cn.itcast.mq.Send;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.*;
-
- @RestController
- @RequestMapping("send")
- public class sends {
- @Autowired
- private AmqpTemplate amqp;
- @PostMapping
- public void SendMQ1( String queueName, String msg) throws InterruptedException {
- for (int i = 0; i < 50; i++) {
- amqp.convertAndSend(queueName,msg+i);
- System.out.println("发布消息:"+queueName+","+msg+i);
- Thread.sleep(20);
- }
- }
-
-
-
- }

消费代码:
- package cn.itcast.mq.listener;
-
- import org.omg.CORBA.Current;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalTime;
-
-
- @RestController
- //@Component
- public class listen {
-
- @RabbitListener(queues = "king")
- public void workQueue1(String msg) throws InterruptedException {
- System.out.println("work1接收到的消息是:"+msg+"---"+ LocalTime.now());
-
- Thread.sleep(20);
- }
-
- @RabbitListener(queues = "king")
- public void workQueue2(String msg) throws InterruptedException {
- System.out.println("work2接收到的消息是:"+msg+"---"+LocalTime.now());
- Thread.sleep(200);
- }
-
- }

作用: 这个广播交换机,可以绑定多个队列,消息发布者发布的消息,会发给每个被绑定的交换机的队列所有相同的消息。
在操作前我们先了解一份继承图。
(1)在消费者的服务理定义fanout exchange的配置,绑定队列和交换机
(2)在消费者服务里配置两个消费者,各自监听自己的队列
(3)在消息发布者服务里写发布方法
参数换为:exchangeName,RoutingKey,msg
此时routingkry先为空字符串即可。
我的代码:
(1)配置类绑定交换机和队列
- package cn.itcast.mq.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.FanoutExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class FanoutConfig {
- // 声明交换机
- @Bean
- public FanoutExchange fanoutExchange(){
- return new FanoutExchange("wrx.fanout");//交换机名
- }
-
-
- // 声明队列1
- @Bean
- public Queue fanoutQueue1(){
- return new Queue("wrx.queue1");
- }
-
- // 声明队列2
- @Bean
- public Queue fanoutQueue2(){
- return new Queue("wrx.queue2");
- }
-
- // 绑定队列1
- @Bean
- public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
- }
-
- // 绑定队列2
- @Bean
- public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
- return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
- }
- }

(2)写两个消费者
- package cn.itcast.mq.listener;
-
- import org.omg.CORBA.Current;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalTime;
-
-
- @RestController
- //@Component
- public class listen {
-
-
- /**
- * 处理wrx.queue1的消息
- * @param msg
- * @throws InterruptedException
- */
- @RabbitListener(queues = "wrx.queue1")
- public void fanoutQueue1(String msg) throws InterruptedException {
- System.out.println("fq1处理消息:"+msg+"---"+ LocalTime.now());
-
- }
- /**
- * 处理wrx.queue2的消息
- * @param msg
- * @throws InterruptedException
- */
- @RabbitListener(queues = "wrx.queue2")
- public void fanoutQueue2(String msg) throws InterruptedException {
- System.out.println("fq2处理消息:"+msg+"---"+ LocalTime.now());
-
- }
-
-
- }

(3)写消息发布者
- package cn.itcast.mq.Send;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.*;
-
- @RestController
- @RequestMapping("send")
- public class sends {
- @Autowired
- private AmqpTemplate amqp;
-
- @PostMapping("fanout")
- public void fanoutSend( String exchangeName,String Routingkey ,String msg) {
- //发送消息
- amqp.convertAndSend(exchangeName,Routingkey,msg);
- System.out.println("发布消息:"+exchangeName+","+Routingkey+","+msg);
-
- }
-
-
-
- }

我们需要给每个队列指定bindkey,它可以是一个也可以是多个。
我们在发送消息时,必须指定routingkey,这个key必须与指定的bindingkey相同,相应的队列才能获取消息。
(1)在消息接收者的@RabbitListener注解里声明Exchange和Queue
- @RabbitListener(bindings = @QueueBinding( //绑定队列交换机
- value = @Queue("direct.queue1"), //queue名字
- exchange = @Exchange("wrx.direct"), //exchange名字
- key = {"red","wrx"} //BindingKey
- ))
(2)消息发布者填写对应的RoutingKey
代码示例:
(1)
- package cn.itcast.mq.listener;
-
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalTime;
-
- @RestController
- public class DirectListener {
-
- /**
- * 处理wrx.direct1的消息
- * @param msg
- * @throws InterruptedException
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue("direct.queue1"),
- exchange = @Exchange("wrx.direct"),
- key = {"red","wrx"}//BindingKey
- ))
- public void DirectQueue1(String msg) throws InterruptedException {
- System.out.println("direct1处理消息:"+msg+"---"+ LocalTime.now());
-
- }
-
-
- /**
- * 处理wrx.direct2的消息
- * @param msg
- * @throws InterruptedException
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue("direct.queue2"),
- exchange = @Exchange("wrx.direct"),
- key = {"blue","wrx"}//BindingKey
- ))
- public void DirectQueue2(String msg) throws InterruptedException {
- System.out.println("direct2处理消息:"+msg+"---"+ LocalTime.now());
-
- }
-
-
-
- }

(2)我们在postman发post请求就可以发布消息了
routingkey为wrx会发给两个队列
routingkey为red会发给direct.queue1队列
- package cn.itcast.mq.Send;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.autoconfigure.web.client.AutoConfigureMockRestServiceServer;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalDateTime;
- import java.time.LocalTime;
-
- @RestController
- @RequestMapping("direct")
- public class DirectSend {
- @Autowired
- private AmqpTemplate amqp;
- @PostMapping
- public void DirectSend(String exchangeName,String routingKey,String msg ){
- amqp.convertAndSend(exchangeName,routingKey,msg);
- System.out.println("发送消息:"+msg+ "-----"+LocalTime.now());
- }
-
-
- }

说明:话题交换机和路由交换机最大的区别在于,他们的BindingKey用法不同。
Topic的BindingKey允许使用通配符 # 和 *
#:代表一个或多个单词
*:代表一个单词
可以有的形式为:1、china.#
2、#.news
3、china.*
4、*.news
发送消息时,RoutingKey只要匹配通配符之外的字符就可以和Queue队列建立联系。
(1)、在消费者里声明队列和交换机的绑定关系,定义BindingKey
(2)、根据对应的交换机名字和BindingKey来确定RoutingKey来发送消息
符合通配符的消费者队列都会接收到消息。
我的代码:
(1)绑定交换机和队列,设置交换机类型Type,规定BindingKey
- package cn.itcast.mq.listener;
-
- import org.springframework.amqp.core.ExchangeTypes;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalTime;
-
- @RestController
- public class TopicListener {
-
-
- /**
- * 处理topic.queue1的消息
- * @param msg
- * @throws InterruptedException
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue("topic.queue1"),
- exchange = @Exchange(value = "wrx.topic",type = "topic"),//topic类型
- key = {"china.#"}
- ))
- public void TopicQueue1(String msg) throws InterruptedException {
- System.out.println("topic1处理消息:"+msg+"---"+ LocalTime.now());
-
- }
- /**
- * 处理wrx.direct2的消息
- * @param msg
- * @throws InterruptedException
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue("topic.queue2"),
- exchange = @Exchange(value = "wrx.topic",type = ExchangeTypes.TOPIC),//topic类型
- key = {"#.news"}
- ))
- public void TopicQueue2(String msg) throws InterruptedException {
- System.out.println("topic2处理消息:"+msg+"---"+ LocalTime.now());
-
- }
-
-
-
- }

(2)发送消息
- package cn.itcast.mq.Send;
-
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.time.LocalTime;
- @RestController
- @RequestMapping("topic")
- public class TopicSend {
- @Autowired
- private AmqpTemplate amqp;
- @PostMapping
- public void DirectSend(String exchangeName,String routingKey,String msg ){
- amqp.convertAndSend(exchangeName,routingKey,msg);//routingKey= china.weather,展示通配符效果
- System.out.println("发送消息:"+msg+ "-----"+ LocalTime.now());
- }
-
- }

在发送消息时,如果数据类型不是字节类型的就会出问题,因为SpringAMQP的序列化器默认使用的是java的jdk的序列化器。我们将它改为Json的即可。详细见下图。
(1)引用以来,加配置(发布者)
引用依赖
- <!-- Json序列化依赖-->
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- </dependency>
加配置
- //使用Json的序列化方式
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
(2)引依赖,加配置 (消费者)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。