赞
踩
目录
安装Docker(先克隆出来一个里面只有docker 的虚拟机进入执行)
Rabbitmq就像是一个邮局,投递人(生产者)将消息发送给Rabbitmq,Rabbitmq邮递员(线程)根据设定好的道路送到指定的收信人(消费者),收信人拿到信之后,阅读理解(做处理),最后写个回信(响应)给投递人,还得通过Rabbitmq,如此循环反复
服务解耦这个情况在单体项目的时候,可以不考虑这个问题,服务的解耦是基于两个服务之间进行调用的时候,出现的问题,A产生数据而服务BCD都需要,那么我可以直接在A中调用BCD,从而实现数据的传递,但当在微服务的时候,服务不会像这个例子这么少,成千上百的服务,这样服务间的耦合性太高,维护成本过高,引入Rabbitmq中间商后,A把数据交给Rabbitmq,然后BCD等服务,需要就去rabbitmq拿就可以了
举个栗子,当我们的服务器只有一台的时候,瞬间qps达到了3000那么这个服务器的压力就会剧增,也就是瞬时压力爆棚,那么通过Rabbitmq的消息队列,拉长战线(处理时长),通过rabbitmq的请求慢慢发给服务器进行处理,例如qps从每秒3000通过rabbitmq后变成qps300而其他的在rabbitmq队列中排队等待处理,这样虽说把时间拉长,但可以减轻瞬时压力
链路 A(接收请求的)--- rabbitmq ----- B(处理请求的),就是A的线程只负责接收并发给rabbitmq,之后A的线程就会被释放,继续做接收请求,rabbitmq负责生成一个消息队列,B服务只负责进行业务处理,B服务没处理完的可以在消息队列中等待,之后按顺序进行处理。
消息服务
消息队列
消息中间件
常见的服务器:
1.Activemq 2.Rockermq(阿里) 3.Kafka(大数据) 4.tubemq(腾讯万亿级别) 5.Rabbitmq(spring集成)
VM网段:192.168.64.0
知识点:咋改网段 编辑 虚拟网络编辑器选择VMnet8 左下角子网IP 修改岂可
虚拟机:centos-8-2105 centos-7-1908随便选一个
课前资料设置好的东西
2.安装了三个工具:python pip ansible
3.两个脚本文件,用来设置ip地址
ip-dhcp:自动获取
ip-static:手动获取
4.用vm打开对应的.vmx文件,加载虚拟机镜像
5.启动 按提示“已复制虚拟机”
6.默认用户密码root
7.设置ip
- ./ip-dhcp #执行脚本
-
- ifconfig 看ip
- ifconfig ens33
没有网卡咋办(执行下面的两行代码)
- nmcli n on
- systemctl restart NetworkManager
如果上面的两条不好使就重置虚拟网络设置,之后先还原,在设置网段为64
1.可以从网上下载Docker离线包
https://download.docker.com/linux/static/stable/x86_64/docker-20.10.6.tgz
2.离线安装工具(简化安装,要下载)
https://github.com/Jrohy/docker-install/
3.安装,通过MBX软件,连接,并把下载好的以下文件一块放到/root/里1.阿里的yum安装源
- - docker-20.10.6.tgz
- - install.sh
- - docker.bash
4.执行安装
- # 进入 docker-install 文件夹
- cd docker-install
-
- # 为 docker-install 添加执行权限
- chmod +x install.sh
-
- # 安装
- ./install.sh -f docker-20.10.6.tgz
5.由于国内网络的问题,需要配置加速器来加速
cat下面命令直接生成文件daemon.json
cat <<EOF > /etc/docker/daemon.json
{
"registry-mirrors": [
"https://docker.mirrors.ustc.edu.cn",
"http://hub-mirror.c.163.com"
],
"max-concurrent-downloads": 10,
"log-driver": "json-file",
"log-level": "warn",
"log-opts": {
"max-size": "10m",
"max-file": "3"
},
"data-root": "/var/lib/docker"
}
EOF
6.重新加载docker配置 重启docker
7.测试 docker info
导入镜像: docker load -i rabbit-image.gz
配置管理员用户名与密码
mkdir /etc/rabbitmq vim /etc/rabbitmq/rabbitmq.conf
# 在文件中添加两行配置: default_user = admin default_pass = admin
- docker run -d --name rabbit \
- -p 5672:5672 \
- -p 15672:15672 \
- -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
- -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
- --restart=always \
- rabbitmq:management
-
消息服务器不会长久的储存,只要消费者处理完,消息服务器中的消息会被删除
第一步:新建一个maven 什么依赖都不用添加
第二步:添加依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tedu</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.8.0-alpha2</version>
</dependency>
</dependencies><build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.创建生产者类发送消息
第一步:连接
第二步:创建通信的通道(channel对象)
第三步:在服务器上创建队列
第四步:发送消息
第五步:关闭资源
- package m1;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f= new ConnectionFactory();
- f.setHost("192.168.64.140");
- f.setPort(5672); /*5672是收发消息 15672是一个管理控制台*/
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection(); /*连接*/
- Channel channel = con.createChannel(); /*通信的通道*/
-
- //在服务器上创建队列 hello world
- /*队列已经存在不会去重复创建
- * 使用通信的通道channel去在服务器上操作
- * */
- channel.queueDeclare("helloworld", false, false, false, null);
- /* 第一个boolean 是否是一个持久队列
- * 第二个boolean 是否是一个排他队列或独占队列 多个消费者能否共享这一个队列
- * 第三个boolean 是否自动删除 若没有消费者的情况下队列自动删除
- * null值 其他的参数属性 例如要带一个 map 键值对
- *
- *
- * */
- //发送消息
- channel.basicPublish("", "helloworld", null, "HelloWorld824".getBytes());
-
- /**
- * 空串参数:是一个默认的交换机(exchange)
- * null 消息的其他参数属性
- *
- *
- * */
- //断开连接
- channel.close();
- con.close();
-
-
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
4.创建消费者接收处理数据
- package m1;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection();
- Channel channel = con.createChannel();
- //创建队列
- channel.queueDeclare("helloworld", false, false, false, null);
- //创建回调对象
- // DeliverCallback deliverCallback1 = (consumerTag,message)->{};
- DeliverCallback deliverCallback = new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery/*这个就是massage*/) throws IOException {
- byte[] body = delivery.getBody();
- String s1 = new String(body);
- System.out.println("收到"+s1);
- }
- };
-
- CancelCallback cancelCallback = consumerTag -> {}; //取消接收消息时执行
- //从hello world队列接收消息,收到的消息会传递到回调对象进行处理
- channel.basicConsume("helloworld", true,deliverCallback,cancelCallback);//helloworld 回传到 message
- /**
- * autoAck -acknowledgment
- * false
- * true
- * true自动确认*/
-
-
-
-
-
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
多个消费者从同一个队列进行获取消息,可以并行的处理多条消息,处理速度翻倍
合理分发:
1.设置autoAck = false
2.如何设置qos: qos -pre fetch -预抓取 c.basicQos(1);必须在手动确认模式下才会生效)
不一定所有的数据都要持久化,例如日志是可以丢失的,而订单一定不能丢失
1.队列持久化
当队列被创建后是无法被更改的,要么删除,要么创建一个名字不相同的队列
2.消息持久化
- package m2;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- ;
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f= new ConnectionFactory();
- f.setHost("192.168.64.140");
- f.setPort(5672); /*5672是收发消息 15672是一个管理控制台*/
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection(); /*连接*/
- Channel channel = con.createChannel(); /*通信的通道*/
-
- //创建队列
- channel.queueDeclare("task_queue", true, false, false, null);
- //循环在控制台发送消息
- while (true){
- System.out.println("输入消息:");
- String s = new Scanner(System.in).nextLine();
- channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_BASIC, s.getBytes());
- /*持久化信息 改成这个,消息会存在磁盘上*/
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package m2;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection();
- Channel channel = con.createChannel();
- //创建队列
- channel.queueDeclare("task_queue", true, false, false, null);
-
-
- //创建回调对象
- DeliverCallback deliverCallback = new DeliverCallback() {
- @Override
- public void handle(String s, Delivery delivery) throws IOException {
- String str = new String(delivery.getBody());
- System.out.println("收到"+str);
- for (int i = 0; i < str.length(); i++) {
- if (str.charAt(i) == '.'){
-
- //模拟耗时消息,遍历字符串,每遇到一个‘.’字符暂停1秒
- try {
- Thread.sleep(1000);
-
- }catch (Exception e){
-
- }
- }
- }
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);/*(回执[其实就是Tag,需要通过消息对象进行获取,Tag是一段long型的数字],是否确认收到过的所有消息[一般为false])*/
- System.out.println("消息处理完成=================================================================消息处理完成");
-
- }
- };
- CancelCallback cancelCallback = consumerTag -> {};
-
- //预抓取,只收一条,处理完之前不收下一条
- //只在手动确认模式才有效
- channel.basicQos(1);
- //接收消息
- channel.basicConsume("task_queue", false, deliverCallback, cancelCallback); /*修改为手动确认模式一开始第二个参数为true*/
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
所有消费者得到同一个消息,每个消费者都要有一个自己的队列,队列要求与交换机进行绑定,交换机只会发给与自己绑定的队列
使用的Fanout交换机(扇形交换机)
每个消费者都有自己的队列,这个队列是随机命名,direct交换机,通过关键词来进行匹配队列发送
使用的Taotal交换机,其次使用的关键词变为xxxx.xxxx.xxx()
Spring cloud config + Bus组件(写好的代码)
要在对应的服务中添加该组件
1.添加bus依赖
1.rabbitmq
2.bus
3.bus去操作rabbitmq时用到binder-rabbit
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-bus</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
- </dependency>
amqp是消息服务的协议(数据格式)集成了rabbitmq的依赖
2.配置中心还有去添加一个actuator依赖
2.添加rabbit-mq连接配置
09修改yml
2.3.4修改config目录的三个文件,再上传到远程仓库
4.配置中心暴露bus-refresh刷新路径 m.e.w.e.i = bus-refresh
1.BUS配置刷新
2.Bus发送刷新指令,其他模块接收指令执行配置刷新操作,Rabbitmq主题模式
sleuth +zipkin
sleuth 用来产生链路跟踪日志 会产生日志数据
执行添加依赖,0配置就可以产生依赖
zipkin 对链路跟踪日志进行分析处理 最后用图形进行展示
a-b-c-d 默认日志只有10%会发给zipkin
a,asuidhiasd,asuidhiasd,true
b,asuidhiasd,,true
c,asuidhiasd,asuidhiasd,true
d,asuidhiasd,asuidhiasd,true
sleuth 通过 RABBITMQ发送到zipkin
修改2,3,4,6
1.添加zipkin client 客户端 依赖
2.在06添加rabbitmq依赖
3.修改06的application
4.修改config目录的是哪个文件添加zipkin发送方式
zipkin需要自己下载,之后用cmd
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。