当前位置:   article > 正文

Rabbit mq 消息服务器(分布式中非常重要的服务器)

消息服务器

目录

Rabbitmq概述

RabbitMQ使用场景

服务解耦

流量削峰

异步调用

使用步骤

VM版本:16+(Rabbitmq要在容器里使用)

安装Docker(先克隆出来一个里面只有docker 的虚拟机进入执行)

搭建Rabbitmq服务

六种工作模式

简单模式(对应只有一个消费者)

工作模式(对应对个消费者)

如何使消息持久化(防止服务器端的消息丢失)

群发模式(发消息就需要交换机)

路由模式

主题模式

如何实现SCC的配置更新


Rabbitmq概述

Rabbitmq就像是一个邮局,投递人(生产者)将消息发送给Rabbitmq,Rabbitmq邮递员(线程)根据设定好的道路送到指定的收信人(消费者),收信人拿到信之后,阅读理解(做处理),最后写个回信(响应)给投递人,还得通过Rabbitmq,如此循环反复

RabbitMQ使用场景

服务解耦

服务解耦这个情况在单体项目的时候,可以不考虑这个问题,服务的解耦是基于两个服务之间进行调用的时候,出现的问题,A产生数据而服务BCD都需要,那么我可以直接在A中调用BCD,从而实现数据的传递,但当在微服务的时候,服务不会像这个例子这么少,成千上百的服务,这样服务间的耦合性太高,维护成本过高,引入Rabbitmq中间商后,A把数据交给Rabbitmq,然后BCD等服务,需要就去rabbitmq拿就可以了

流量削峰

举个栗子,当我们的服务器只有一台的时候,瞬间qps达到了3000那么这个服务器的压力就会剧增,也就是瞬时压力爆棚,那么通过Rabbitmq的消息队列,拉长战线(处理时长),通过rabbitmq的请求慢慢发给服务器进行处理,例如qps从每秒3000通过rabbitmq后变成qps300而其他的在rabbitmq队列中排队等待处理,这样虽说把时间拉长,但可以减轻瞬时压力

异步调用

链路 A(接收请求的)--- rabbitmq ----- B(处理请求的),就是A的线程只负责接收并发给rabbitmq,之后A的线程就会被释放,继续做接收请求,rabbitmq负责生成一个消息队列,B服务只负责进行业务处理,B服务没处理完的可以在消息队列中等待,之后按顺序进行处理。

消息服务

消息队列

消息中间件

常见的服务器:

1.Activemq  2.Rockermq(阿里)  3.Kafka(大数据) 4.tubemq(腾讯万亿级别) 5.Rabbitmq(spring集成)

使用步骤

VM版本:16+(Rabbitmq要在容器里使用)

VM网段:192.168.64.0

知识点:咋改网段 编辑 虚拟网络编辑器选择VMnet8   左下角子网IP 修改岂可

虚拟机:centos-8-2105 centos-7-1908随便选一个

课前资料设置好的东西

2.安装了三个工具:python pip ansible

3.两个脚本文件,用来设置ip地址

           ip-dhcp:自动获取

           ip-static:手动获取

4.用vm打开对应的.vmx文件,加载虚拟机镜像

5.启动 按提示“已复制虚拟机”

6.默认用户密码root

7.设置ip

  1. ./ip-dhcp #执行脚本
  2. ifconfig 看ip
  3. ifconfig ens33

没有网卡咋办(执行下面的两行代码)

  1. nmcli n on
  2. systemctl restart NetworkManager

如果上面的两条不好使就重置虚拟网络设置,之后先还原,在设置网段为64

安装Docker(先克隆出来一个里面只有docker 的虚拟机进入执行)

1.可以从网上下载Docker离线包

https://download.docker.com/linux/static/stable/x86_64/docker-20.10.6.tgz

2.离线安装工具(简化安装,要下载)

https://github.com/Jrohy/docker-install/

3.安装,通过MBX软件,连接,并把下载好的以下文件一块放到/root/里1.阿里的yum安装源

  1. - docker-20.10.6.tgz
  2. - install.sh
  3. - docker.bash

4.执行安装

  1. # 进入 docker-install 文件夹
  2. cd docker-install
  3. # 为 docker-install 添加执行权限
  4. chmod +x install.sh
  5. # 安装
  6. ./install.sh -f docker-20.10.6.tgz

5.由于国内网络的问题,需要配置加速器来加速

cat下面命令直接生成文件daemon.json

cat <<EOF > /etc/docker/daemon.json
{
  "registry-mirrors": [
    "https://docker.mirrors.ustc.edu.cn",
    "http://hub-mirror.c.163.com"
  ],
  "max-concurrent-downloads": 10,
  "log-driver": "json-file",
  "log-level": "warn",
  "log-opts": {
    "max-size": "10m",
    "max-file": "3"
    },
  "data-root": "/var/lib/docker"
}
EOF

6.重新加载docker配置 重启docker

7.测试 docker info

搭建Rabbitmq服务

  • 从docker-base再克隆一个虚拟机: rabbitmq
  • 设置ip:   运行            ./ip-static
  • 将镜像文件复制到/root 下(若本地没有则要去下载镜像)
    • docker pull rabbitmq:management
  • 关闭防火墙
  1.         systemctl stop firewalld
  2.         systemctl disable firewall
  3.         # 重启 docker 系统服务   systemctl restart docker
  • 导入镜像: docker load -i rabbit-image.gz

        

  • 配置管理员用户名与密码

    • mkdir /etc/rabbitmq vim /etc/rabbitmq/rabbitmq.conf

    • # 在文件中添加两行配置: default_user = admin default_pass = admin

  •    通过docker启动镜像  
    1. docker run -d --name rabbit \
    2. -p 5672:5672 \
    3. -p 15672:15672 \
    4. -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
    5. -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
    6. --restart=always \
    7. rabbitmq:management

        消息服务器不会长久的储存,只要消费者处理完,消息服务器中的消息会被删除

六种工作模式

简单模式(对应只有一个消费者)

第一步:新建一个maven 什么依赖都不用添加

第二步:添加依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tedu</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>


        <dependency>  
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.4.3</version>
        </dependency>


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>

      

      <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.8.0-alpha2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3.创建生产者类发送消息

第一步:连接

第二步:创建通信的通道(channel对象)

第三步:在服务器上创建队列

第四步:发送消息

第五步:关闭资源

  1. package m1;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //连接
  10. ConnectionFactory f= new ConnectionFactory();
  11. f.setHost("192.168.64.140");
  12. f.setPort(5672); /*5672是收发消息 15672是一个管理控制台*/
  13. f.setUsername("admin");
  14. f.setPassword("admin");
  15. Connection con = f.newConnection(); /*连接*/
  16. Channel channel = con.createChannel(); /*通信的通道*/
  17. //在服务器上创建队列 hello world
  18. /*队列已经存在不会去重复创建
  19. * 使用通信的通道channel去在服务器上操作
  20. * */
  21. channel.queueDeclare("helloworld", false, false, false, null);
  22. /* 第一个boolean 是否是一个持久队列
  23. * 第二个boolean 是否是一个排他队列或独占队列 多个消费者能否共享这一个队列
  24. * 第三个boolean 是否自动删除 若没有消费者的情况下队列自动删除
  25. * null值 其他的参数属性 例如要带一个 map 键值对
  26. *
  27. *
  28. * */
  29. //发送消息
  30. channel.basicPublish("", "helloworld", null, "HelloWorld824".getBytes());
  31. /**
  32. * 空串参数:是一个默认的交换机(exchange)
  33. * null 消息的其他参数属性
  34. *
  35. *
  36. * */
  37. //断开连接
  38. channel.close();
  39. con.close();
  40. }
  41. }

4.创建消费者接收处理数据

  1. package m1;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //连接
  8. ConnectionFactory f = new ConnectionFactory();
  9. f.setHost("192.168.64.140");
  10. f.setPort(5672);
  11. f.setUsername("admin");
  12. f.setPassword("admin");
  13. Connection con = f.newConnection();
  14. Channel channel = con.createChannel();
  15. //创建队列
  16. channel.queueDeclare("helloworld", false, false, false, null);
  17. //创建回调对象
  18. // DeliverCallback deliverCallback1 = (consumerTag,message)->{};
  19. DeliverCallback deliverCallback = new DeliverCallback() {
  20. @Override
  21. public void handle(String s, Delivery delivery/*这个就是massage*/) throws IOException {
  22. byte[] body = delivery.getBody();
  23. String s1 = new String(body);
  24. System.out.println("收到"+s1);
  25. }
  26. };
  27. CancelCallback cancelCallback = consumerTag -> {}; //取消接收消息时执行
  28. //从hello world队列接收消息,收到的消息会传递到回调对象进行处理
  29. channel.basicConsume("helloworld", true,deliverCallback,cancelCallback);//helloworld 回传到 message
  30. /**
  31. * autoAck -acknowledgment
  32. * false
  33. * true
  34. * true自动确认*/
  35. }
  36. }

工作模式(对应对个消费者)

多个消费者从同一个队列进行获取消息,可以并行的处理多条消息,处理速度翻倍

合理分发:

1.设置autoAck = false  

2.如何设置qos:       qos -pre fetch -预抓取   c.basicQos(1);必须在手动确认模式下才会生效)

如何使消息持久化(防止服务器端的消息丢失)

不一定所有的数据都要持久化,例如日志是可以丢失的,而订单一定不能丢失

1.队列持久化

当队列被创建后是无法被更改的,要么删除,要么创建一个名字不相同的队列

2.消息持久化

  1. package m2;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. ;
  7. import java.io.IOException;
  8. import java.util.Scanner;
  9. import java.util.concurrent.TimeoutException;
  10. public class Producer {
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. //连接
  13. ConnectionFactory f= new ConnectionFactory();
  14. f.setHost("192.168.64.140");
  15. f.setPort(5672); /*5672是收发消息 15672是一个管理控制台*/
  16. f.setUsername("admin");
  17. f.setPassword("admin");
  18. Connection con = f.newConnection(); /*连接*/
  19. Channel channel = con.createChannel(); /*通信的通道*/
  20. //创建队列
  21. channel.queueDeclare("task_queue", true, false, false, null);
  22. //循环在控制台发送消息
  23. while (true){
  24. System.out.println("输入消息:");
  25. String s = new Scanner(System.in).nextLine();
  26. channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_BASIC, s.getBytes());
  27. /*持久化信息 改成这个,消息会存在磁盘上*/
  28. }
  29. }
  30. }
  1. package m2;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. ConnectionFactory f = new ConnectionFactory();
  8. f.setHost("192.168.64.140");
  9. f.setPort(5672);
  10. f.setUsername("admin");
  11. f.setPassword("admin");
  12. Connection con = f.newConnection();
  13. Channel channel = con.createChannel();
  14. //创建队列
  15. channel.queueDeclare("task_queue", true, false, false, null);
  16. //创建回调对象
  17. DeliverCallback deliverCallback = new DeliverCallback() {
  18. @Override
  19. public void handle(String s, Delivery delivery) throws IOException {
  20. String str = new String(delivery.getBody());
  21. System.out.println("收到"+str);
  22. for (int i = 0; i < str.length(); i++) {
  23. if (str.charAt(i) == '.'){
  24. //模拟耗时消息,遍历字符串,每遇到一个‘.’字符暂停1秒
  25. try {
  26. Thread.sleep(1000);
  27. }catch (Exception e){
  28. }
  29. }
  30. }
  31. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);/*(回执[其实就是Tag,需要通过消息对象进行获取,Tag是一段long型的数字],是否确认收到过的所有消息[一般为false])*/
  32. System.out.println("消息处理完成=================================================================消息处理完成");
  33. }
  34. };
  35. CancelCallback cancelCallback = consumerTag -> {};
  36. //预抓取,只收一条,处理完之前不收下一条
  37. //只在手动确认模式才有效
  38. channel.basicQos(1);
  39. //接收消息
  40. channel.basicConsume("task_queue", false, deliverCallback, cancelCallback); /*修改为手动确认模式一开始第二个参数为true*/
  41. }
  42. }

群发模式(发消息就需要交换机)

所有消费者得到同一个消息,每个消费者都要有一个自己的队列,队列要求与交换机进行绑定,交换机只会发给与自己绑定的队列

使用的Fanout交换机(扇形交换机)

路由模式

每个消费者都有自己的队列,这个队列是随机命名,direct交换机,通过关键词来进行匹配队列发送

主题模式

使用的Taotal交换机,其次使用的关键词变为xxxx.xxxx.xxx()

如何实现SCC的配置更新

Spring cloud config + Bus组件(写好的代码)

要在对应的服务中添加该组件

1.添加bus依赖

        1.rabbitmq

        2.bus

        3.bus去操作rabbitmq时用到binder-rabbit

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.cloud</groupId>
  7. <artifactId>spring-cloud-bus</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.cloud</groupId>
  11. <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
  12. </dependency>

  amqp是消息服务的协议(数据格式)集成了rabbitmq的依赖

2.配置中心还有去添加一个actuator依赖

2.添加rabbit-mq连接配置

        09修改yml

        2.3.4修改config目录的三个文件,再上传到远程仓库

4.配置中心暴露bus-refresh刷新路径   m.e.w.e.i = bus-refresh 

     

1.BUS配置刷新

2.Bus发送刷新指令,其他模块接收指令执行配置刷新操作,Rabbitmq主题模式

链路跟踪

sleuth +zipkin

sleuth 用来产生链路跟踪日志 会产生日志数据

执行添加依赖,0配置就可以产生依赖

zipkin 对链路跟踪日志进行分析处理 最后用图形进行展示

a-b-c-d  默认日志只有10%会发给zipkin

a,asuidhiasd,asuidhiasd,true

b,asuidhiasd,,true

c,asuidhiasd,asuidhiasd,true

d,asuidhiasd,asuidhiasd,true

 

sleuth 通过 RABBITMQ发送到zipkin

修改2,3,4,6

1.添加zipkin client 客户端 依赖

2.在06添加rabbitmq依赖

3.修改06的application

4.修改config目录的是哪个文件添加zipkin发送方式

zipkin需要自己下载,之后用cmd

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/921236
推荐阅读
相关标签
  

闽ICP备14008679号