当前位置:   article > 正文

rabbitmq教程_mq使用教程

mq使用教程

1.Rabbitmq简介

注意本次教程的系统是centos7
开发软件是idea2022
rabbitmq版本是3.10.11
springboot版本是 springboot3

1.1 简介

RabbitMQ 是一个广泛使用的消息服务器,采用 Erlang 语言编写,是一种开源的实现
实现了AMQP(高级消息队列协议)的消息中间件;
RabbitMQ 最初起源于金融系统,它的性能及稳定性都非常出色;
AMQP 协议(http://www.amqp.org),即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

1.2 官方网址

官网:https://www.rabbitmq.com
Github:https://github.com/rabbitmq

1.3消息中间件

简单来说,消息中间件就是指保存数据的一个容器(服务器),可以用于两个系统之间
的数据传递。
消息中间件一般有三个主要角色:生产者、消费者、消息代理(消息队列、消息服务器);
生产者发送消息到消息服务器,然后消费者从消息代理(消息队列)中获取数据
并进行处理
常见的消息中间件:
 RabbitMQ
 kafka(大数据领域)
 RocketMQ(阿里巴巴开源)献给 Apache 组织
 pulsar(最近一两年流行起来的)

2 RabbitMQ 运行环境搭建

RabbitMQ 是使用 Erlang 语言开发的,
所以要先下载安装 Erlang
下载时一定要注意版本兼容性
版本兼容说明地址:https://www.rabbitmq.com/which-erlang.html

图片: ![Alt](https://img-home.csdnimg.cn/images/20220524100510.png这里插入图片描述
去官网查看对应版本

2.1 下载 Erlang

Erlang 官网

https://www.erlang.org/

Linux 下载:

wget
https://github.com/erlang/otp/releases/download/OTP-25.1.1/otp_src_25.1.1
.tar.gz

说明:wget 是 linux 命令,可以用来下载软件

2.2安装 Erlang

2.2.1 安装 erlang 前先安装 Linux 依赖库

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

说明:yum -y install 安装 linux 的一些依赖库的命令 ,-y 表示自动确认;

2.2.2 解压 erlang 压缩包文件
tar -zxvf otp_src_25.1.1.tar.gz
  • 1
2.2.3 配置

切换到解压的目录下,运行相应命令

cd otp_src_25.1.1

./configure
  • 1
  • 2
  • 3
2.2.4 编译
make
  • 1
2.2.5 安装
make install
  • 1

安装好了 erlang 后可以将解压的文件夹删除:

rm -rf otp_src_25.
  • 1

验证 erlang 是否安装成功:

在命令行输入: erl 
  • 1

如果进入了编程命令行则表示安装成功,然后按 ctrl + z 退出编 程命令行;

2.3 下载 RabbitMQ

从 RabbitMQ 官网 https://www.rabbitmq.com

找到下载链接 Linux:

下载 3.10.11

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.11/ra bbitmq-server-generic-unix-3.10.11.tar.xz
  • 1

generic 是通用的意思,这个版本也就是通用的 unix 版本

2.4 安装 RabbitMQ

解压 RabbitMQ 的压缩包,即安装完成,无需再编译

tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz -C /usr/local/
  • 1

-C 选项是指定解压目录,如果不指定会解压到当前目录

2.5 启动及停止 RabbitMQ

2.5.1 配置 path 环境变量

控制台输入

vi /etc/profile
  • 1
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbin
export RABBIT_HOME PATH
  • 1
  • 2
  • 3

刷新环境变量

source /etc/profile
  • 1

配置完环境变量就可以在任意地方启动mq啦

rabbitmq-server -detached
  • 1

-detached 代表后台启动

查看redis状态

rabbitmqctl -n rabbit status
  • 1

2.6 用户管理命令

用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。 这些操作都是通过 rabbitmqctl

查看帮助: rabbitmqctl add_user --help 相应的命令

(1) 查看当前用户列表

rabbitmqctl list_users 
  • 1

(2) 新增一个用户 语法:

rabbitmqctl add_user Username Password 
  • 1

​ 示例:

 rabbitmqctl add_user admin 12345
  • 1

(3)设置用户角色

rabbitmqctl set_user_tags User Tag 

示例:rabbitmqctl set_user_tags admin administrato
此处设置用户的角色为管理员角色
  • 1
  • 2
  • 3
  • 4

(4)设置用户权限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
此操作是设置 admin 用户拥有操作虚拟主机/下的所有权限
查看用户权限
  • 1
  • 2
  • 3

(5)查看用户权限

rabbitmqctl list_permissions
  • 1

2.7 web管理后台

Rabbitmq 有一个 web 管理后台,这个管理后台是以插件的方式提供的,启动后台 web 管理功能,

切换到 sbin 目录下执行

 查看 rabbitmq 的插件列表
./rabbitmq-plugins list
#启用
./rabbitmq-plugins enable rabbitmq_management
#禁用
./rabbitmq-plugins disable rabbitmq_management
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

防火墙操作

把防火墙关闭啦,否则外部无法打开 web管理后台

systemctl status firewalld --检查防火墙状态
systemctl stop firewalld --关闭防火墙,Linux 重启之后会失效
systemctl disable firewalld --防火墙置为不可用,Linux 重启后,防火墙服务不自动启动,
依然是不可用
  • 1
  • 2
  • 3
  • 4

访问web管理后台

http://192.168.131.131:15672
用户名/密码为我们上面创建的 admin/123456
注意上面改成你的虚拟主机的 ip 地址
  • 1
  • 2
  • 3

3 RabbitMQ工作模型

在这里插入图片描述

消息队列有三个核心要素: 消息生产者、消息队列、消息消费者;

生产者(Producer):发送消息的应用;(java 程序,也可能是别的语言写的程序)

消费者(Consumer):接收消息的应用;(java 程序,也可能是别的语言写的程序)

代理(Broker):就是消息服务器,RabbitMQ Server 就是 Message Broker;

连接(Connection):连接 RabbitMQ 服务器的 TCP 长连接;

信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道 进行的

虚拟主机(Virtual host):一个虚拟分组,在代码中就是一个字符串,当多个不同的用 户使用同一个 RabbitMQ 服务时,可以划分出多个 Virtual host,每个用户在自己的 Virtual host 创建 exchange/queue 等;(分类比较清晰、相互隔离)

交换机(Exchange):交换机负责从生产者接收消息,并根据交换机类型分发到对应的消 息队列中,起到一个路由的作用;

路由键(Routing Key):交换机根据路由键来决定消息分发到哪个队列,路由键是消息 的目的地址;

绑定(Binding):绑定是队列和交换机的一个关联连接(关联关系);

队列(Queue):存储消息的缓存;

消息(Message):由生产者通过 RabbitMQ 发送给消费者的信息;(消息可以任何数据, 字符串、user 对象,json 串等等)

4 RabbitMQ的交换机

4.1 类型

1、Fanout Exchange(扇形)

2、Direct Exchange(直连)

3、Topic Exchange(主题)

4、Headers Exchange(头部)

4.1.1 Fanout Exchange(扇形)

Fanout 扇形的,散开的; 扇形交换机 投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;

在这里插入图片描述
依赖

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

配置类


@Configuration
public class RabbitConfig {
    //三部曲
    //定义交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("exchange.fanout");
    }
    //定义队列A
    @Bean
    public Queue queueA(){
        return new Queue("queue.fanout.a");
    }

    //定义队列B
    @Bean
    public Queue queueB(){
        return new Queue("queue.fanout.b");
    }

    //绑定交换机和队列
    @Bean
    public Binding bindingA(FanoutExchange fanoutExchange,Queue queueA){
        //将队列a绑定到扇形交换机
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }
    //绑定交换机和队列
    @Bean
    public Binding bindingB(FanoutExchange fanoutExchange,Queue queueB){
        //将队列b绑定到扇形交换机
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

生产者

@Component
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMess(){

        String msg="hello world";
        Message message = new Message(msg.getBytes());
        rabbitTemplate.convertAndSend("exchange.fanout","",message);
        log.info("消息发送完毕,发送时间为:{}",new Date());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

消费者

@Component
@Slf4j
public class ReceiveMessage {
    @RabbitListener(queues = {"queue.fanout.a","queue.fanout.b"})
    public void receiveMessage(Message message){
        byte[] body = message.getBody();
        String s = new String(body);
        log.info("接收到的消息为:{}",s);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
4.2.2 Direct Exchange(直连)

根据路由键精确匹配(一模一样)进行路由消息队列

在这里插入图片描述

创建springboot项目

导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

修改配置文件:加入host port username passwrod等配置信息

server:
  port: 8080
spring:
  application:
    name: direct-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
my:
  exchangeName: exchange.direct #交换机名字
  queueAName: queue.direct.a	#队列a名字
  queueBName: queue.direct.b	#队列b名字
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

Rabbitmq配置文件

@Configuration
//@ConfigurationProperties(prefix = "my")
public class RabbitMqConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueAName}")
    private String queueAName;
    @Value("${my.queueBName}")
    private String queueBName;
    //创建交换机
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    //创建队列A
    @Bean
    public Queue queueA(){
        return new Queue(queueAName);
    }

    //创建队列B
    @Bean
    public Queue queueB(){
        return new Queue(queueBName);
    }

    //链接A队列
    @Bean
    public Binding bindingA(DirectExchange directExchange,Queue queueA){

        return BindingBuilder.bind(queueA).to(directExchange).with("error");
    }
    //链接B队列
    @Bean
    public Binding bindingB1(DirectExchange directExchange,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("error");
    }
    @Bean
    public Binding bindingB2(DirectExchange directExchange,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("info");
    }
    @Bean
    public Binding bindingB3(DirectExchange directExchange,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("warning");
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

编写生产者发送消息

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void SendMsg(){

        rabbitTemplate.send("exchange.direct","info", MessageBuilder.withBody("hello word".getBytes()).build());
        log.info("消息发送完毕!");
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

启动测试

@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMsg();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
4.3.3 Topic Exchange(主题)

通配符匹配,相当于模糊匹配;

#匹配多个单词,用来表示任意数量(零个或多个)单词

*匹配一个单词(必须有一个,而且只有一个),用.隔开的为一个单词

beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
beijing.* == beijing.queue, beijing.xyz
  • 1
  • 2

在这里插入图片描述

创建springboot项目

导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

修改配置文件:加入host port username passwrod等配置信息

server:
  port: 8080
spring:
  application:
    name: topic-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode

my:
  exchangeName: exchange.topic #交换机的名字
  queueAName: queue.topic.a	#队列名字a
  queueBName: queue.topic.b	#队列名字b
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

编写Rabbitmq配置文件

@Configuration
public class RabbitMQConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueAName}")
    private String queueAName;
    @Value("${my.queueBName}")
    private String queueBName;

    //创建交换机
    @Bean
    public TopicExchange topicExchange(){
      return ExchangeBuilder.topicExchange(exchangeName).build();
    }

    //创建队列A
    @Bean
    public Queue queueA(){
        return QueueBuilder.durable(queueAName).build();
    }

    //创建队列B
    @Bean
    public Queue queueB(){
        return QueueBuilder.durable(queueBName).build();
    }

    //创建绑定
    @Bean
    public Binding bindingA(TopicExchange topicExchange,Queue queueA){
        return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");
    }

    @Bean
    public Binding bindingB1(TopicExchange topicExchange,Queue queueB){
        return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");
    }
    @Bean
    public Binding bindingB2(TopicExchange topicExchange,Queue queueB){
        return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

编写生产者发送消息

@Service
public class MessageService {
    @Resource
    private AmqpTemplate amqpTemplate;


    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello word".getBytes()).build();
        amqpTemplate.convertAndSend("exchange.topic","lazy.orange.rabbit",message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

启动测试

@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMsg();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
4.4.4 Headers Exchange(头部)

基于消息内容中的 headers 属性进行匹配

在这里插入图片描述

5 RabbitMQ的过期消息

过期消息也叫 TTL 消息,TTL:Time To Live

消息的过期时间有两种设置方式:

1.设置单条消息的过期时间

单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;

编写生产者的时候通过MessageProperties设置单条消息过期时间

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(){
      MessageProperties messageProperties = new MessageProperties();
       messageProperties.setExpiration("15000"); //单条消息过期时间
        Message message = new Message("ceshi".getBytes());
        rabbitTemplate.send("directExchange","error",message);
        log.info("发送成功!");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.通过设置队列属性设置单挑消息过期时间

队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;

在rabbitmq的配置文件中

创建队列的时候修改队列属性

@Configuration
public class RabbitMqConfig {

    private String exchangeName="directExchange";
    private String queueAName="queueA";
    //创建直连交换机
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    //创建队列A
    @Bean
    public Queue queueA(){
        Map<String, Object> arguments=new HashMap<>();
        //通过设置队列属性设置过期时间
        arguments.put("x-message-ttl",10000);
        return new Queue(queueAName,true,false,false,arguments);
    }
    //创建链接
    @Bean
    public Binding binding(DirectExchange directExchange,Queue queueA){
        return BindingBuilder.bind(queueA).to(directExchange).with("error");
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

6 RabbitMQ的死信队列

也叫死信交换机、死信邮箱等

DLX: Dead-Letter-Exchange 死信交换器,死信邮箱

在这里插入图片描述

如果消息长时间没有消费者消费,那么我们可以设置让消息进入死信队列

需要2台交换机 一台正常交换机 一台死信交换机

​ 2个队列 正常队列 死信队列

配置文件

server:
  port: 8080
spring:
  application:
    name: dlx-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
my:

  exchangeNormalName: exchange.normal.a
  queueANormalName: queue.normal.a

  exchangeDlxName: exchange.dlx.a
  queueADlxName: queue.dlx.a
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

rabbitmq配置类

@Configuration
public class RabbitMqConfig {
    @Value("${my.exchangeNormalName}")
    private String exchangeNormalName;
    @Value("${my.queueANormalName}")
    private String queueANormalName;
    @Value("${my.exchangeDlxName}")
    private String exchangeDlxName;
    @Value("${my.queueADlxName}")
    private String queueADlxName;

    /**
     * 正常直连交换机
     * @return
     */
    @Bean
    public DirectExchange normalDirectExchange() {
        return ExchangeBuilder.directExchange(exchangeNormalName).build();
    }

    /**
     * 正常队列
     * @return
     */
    @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments=new HashMap<>();
        arguments.put("x-message-ttl",15000);//设置队列过期时间为20秒
        arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置队列的死信交换机
        arguments.put("x-dead-letter-routing-key","dlxOrder"); //设置死信路由key
        return QueueBuilder.durable(queueANormalName)
                .withArguments(arguments)//设置队列过期时间
                .build();
    }

    /**
     * 绑定交换机和队列
     * @param normalDirectExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalDirectExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalDirectExchange).with("order");
    }

    /**
     * 死信交换机
     * @return
     */
    @Bean
    public DirectExchange dlxDirectExchange(){
        return ExchangeBuilder.directExchange(exchangeDlxName).build();
    }

    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue dlxQueue(){
        return QueueBuilder.durable(queueADlxName).build();
    }
    @Bean
    public Binding bindingDlx(DirectExchange dlxDirectExchange,Queue dlxQueue){
        return BindingBuilder.bind(dlxQueue).to(dlxDirectExchange).with("dlxOrder");
    }





}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

生产者

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void SendMessage(){
        Message message = MessageBuilder.withBody("hello".getBytes()).build();
        rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
        log.info("消息发送完毕");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

启动类

@SpringBootApplication
public class SpringbootBootRabbitmqDlxApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringbootBootRabbitmqDlxApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMessage();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

7 RabbitMQ 延迟队列

场景:有一个订单,15 分钟内如果不支付,就把该订单设置为交易关闭,那么就不 能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队 列来实现,也可以有一些其他办法实现

解决办法

采用消息中间件(rabbitmq)

1、RabbitMQ 本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递, 即把 DLX 跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费 者可以从这个队列取走消息

在这里插入图片描述

配置文件

server:
  port: 8080
spring:
  application:
    name: dlx-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode

my:

  exchangeNormalName: exchange.normal.4
  queueANormalName: queue.normal.4

  exchangeDlxName: exchange.dlx.4
  queueADlxName: queue.dlx.4
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

配置类


@Configuration
public class RabbitMqConfig {
    @Value("${my.exchangeNormalName}")
    private String exchangeNormalName;
    @Value("${my.queueANormalName}")
    private String queueANormalName;
    @Value("${my.exchangeDlxName}")
    private String exchangeDlxName;
    @Value("${my.queueADlxName}")
    private String queueADlxName;

    /**
     * 正常直连交换机
     * @return
     */
    @Bean
    public DirectExchange normalDirectExchange() {
        return ExchangeBuilder.directExchange(exchangeNormalName).build();
    }

    /**
     * 正常队列
     * @return
     */
    @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments=new HashMap<>();
        arguments.put("x-message-ttl",15000);//设置队列过期时间为15秒
        arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置队列的死信交换机
        arguments.put("x-dead-letter-routing-key","dlxOrder"); //设置死信路由key
//        arguments.put("x-max-length",5); //队列最大消息数
        return QueueBuilder.durable(queueANormalName)
                .withArguments(arguments)//设置队列擦拭你
                .build();
    }

    /**
     * 绑定交换机和队列
     * @param normalDirectExchange
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(DirectExchange normalDirectExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalDirectExchange).with("order");
    }

    /**
     * 死信交换机
     * @return
     */
    @Bean
    public DirectExchange dlxDirectExchange(){
        return ExchangeBuilder.directExchange(exchangeDlxName).build();
    }

    /**
     * 死信队列
     * @return
     */
    @Bean
    public Queue dlxQueue(){
        return QueueBuilder.durable(queueADlxName).build();
    }
    @Bean
    public Binding bindingDlx(DirectExchange dlxDirectExchange,Queue dlxQueue){
        return BindingBuilder.bind(dlxQueue).to(dlxDirectExchange).with("dlxOrder");
    }





}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

生产者

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void SendMessage(){
//        MessageProperties messageProperties=new MessageProperties();
//        messageProperties.setExpiration("15000");

            String mes="hello";
            Message message = MessageBuilder.withBody(mes.getBytes()).build();
            rabbitTemplate.convertAndSend("exchange.normal.4","order",message);




        log.info("消息发送完毕");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

启动类

@SpringBootApplication
public class SpringbootBootRabbitmqDlxApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringbootBootRabbitmqDlxApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMessage();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

问题:

如果先发送的消息,消息延迟时间长,会影响后面的 延迟时间段的消息的消费;

//解决:不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样

在这里插入图片描述

这个解决方式代码实现是非常混乱复杂的,我们可以采用第二种方式

2.使用 rabbitmq-delayed-message-exchange 延迟插件

选 择 对 应 的 版 本 下 载 rabbitmq-delayed-message-exchange 插 件 ,

下 载 地 址 : http://www.rabbitmq.com/community-plugins.html
  • 1

在这里插入图片描述

在这里插入图片描述

1.插件拷贝到 RabbitMQ 服务器 plugins 目录

解压

unzip rabbitmq_delayed_message_exchange-3.10.2.ez
  • 1

如果 unzip 没有安装,先安装一下

yum install unzip -y
  • 1

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange 
  • 1

查询安装的所有插件

rabbitmq-plugins list 
  • 1

插件运行原理图
在这里插入图片描述

在这里插入图片描述

消息发送后不会直接投递到队列, 而是存储到 Mnesia(嵌入式数据库),检查 x-delay

Mnesia 是一个小型数据库,不适合于大量延迟消息的实现 解决了消息过期时间不一致出现的问题

代码

依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

配置文件

server:
  port: 8080
spring:
  application:
    name: dlx-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode

my:
  exchangeDelayName: exchange.delayed.plugin
  queueADelayName: queue.delayed.plugin
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

配置类

这里设置的交换机类型有所不一样

@Configuration
public class RabbitMqConfig {
    @Value("${my.exchangeDelayName}")
    private String exchangeDelayName;
    @Value("${my.queueADelayName}")
    private String  queueADelayName;


    /**
     * 交换机
     * @return
     */
    @Bean
    public  CustomExchange customExchange() {
        Map<String, Object> arguments=new HashMap<>();
        arguments.put("x-delayed-type", "direct");
// CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
        CustomExchange customExchange = new CustomExchange(exchangeDelayName,"x-delayed-message",true,false, arguments);
        return customExchange;

    }

    /**
     * 正常队列
     * @return
     */
    @Bean
    public Queue normalQueue(){

        return QueueBuilder.durable(queueADelayName).build();
    }

    /**
     * 绑定交换机和队列
     * @param
     * @param normalQueue
     * @return
     */
    @Bean
    public Binding bindingNormal(CustomExchange customExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(customExchange).with("info").noargs();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

生产者

注意这里设置消息的延迟时间不一样,是设置请求头

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;


    public void SendMessage() {
        {
            //设置消息的延迟时间
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay", 10000);


            String mes = "hello";
            Message message = MessageBuilder.withBody(mes.getBytes()).andProperties(messageProperties).build();
            rabbitTemplate.convertAndSend("exchange.delayed.plugin", "info", message);
            log.info("消息发送完毕,时间为{}",new Date());
        }


        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay", 15000);


            String mes = "hello";
            Message message = MessageBuilder.withBody(mes.getBytes()).andProperties(messageProperties).build();
            rabbitTemplate.convertAndSend("exchange.delayed.plugin", "info", message);
            log.info("消息发送完毕,时间为:{}",new Date());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

启动类

@SpringBootApplication
public class SpringbootBootRabbitmqDlxApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringbootBootRabbitmqDlxApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMessage();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

8 消息的可靠性

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一 些性能,性能与可靠性是无法兼得的

在这里插入图片描述

① 代表消息从生产者发送到 Exchange;

② 代表消息从 Exchange 路由到 Queue;

③ 代表消息在 Queue 中存储;

④ 代表消费者监听 Queue 并消费消

消息可靠性的方式

1、确保消息发送到 RabbitMQ 服务器的交换机上

​ confirm 确认机制

2、确保消息路由到正确的队列

​ Return模式

3、确保消息在队列正确地存储

​ 开启持久化

4、集群,镜像队列,高可用

5、确保消息从队列正确地投递到消费者

采用消息消费时的手动 ack 确认机制来保证

8.1、 RabbitMQ的Confirm 模式

消息的 confirm 确认机制,是指生产者投递消息后,到达了消息服务器 Broker 里面的 exchange 交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常 的发送到 Broker 的 exchange 中,这也是消息可靠性投递的重要保障

具体代码设置

1 配置文件 application.yml 开启确认模式:
spring.rabbitmq.publisher-confirm-type=correlated

2 写一个类实现 implements RabbitTemplate.ConfirmCallback,判断成功和失败的 ack
结果,可以根据具体的结果,如果 ack 为 false,对消息进行重新发送或记录日志等处理;
设置 rabbitTemplate 的确认回调方法

3 rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

配置文件

server:
  port: 8080
spring:
  application:
    name: confirm3-to-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated
my:
  exchangeName: exchange.confirm3
  queueAName: queue.confirm3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

rabbitmq的配置类

@Configuration
@Slf4j
public class RabbitMqConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueAName}")
    private String queueAName;

    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    //创建队列A
    @Bean
    public Queue queueA(){
        return new Queue(queueAName);
    }


    //链接A队列
    @Bean
    public Binding bindingA(DirectExchange directExchange,Queue queueA){

        return BindingBuilder.bind(queueA).to(directExchange).with("info");
    }



}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

生产者

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(
//实现implements RabbitTemplate.ConfirmCallback
                (correlationData, ack, cause) -> {
                    if (ack) {
                        log.info("消息成功到达交换机");
                        return;
                    }
                    log.error("消息没有到达交换机,原因是:{}" + cause);
                }

        );
    }

    public void SendMsg() {
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("ackId");
        rabbitTemplate.send("exchange.confirm3", "info", MessageBuilder.withBody("hello word".getBytes()).build(), correlationData);
        log.info("消息发送完毕,时间为:{}", new Date());
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

启动类

@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMsg();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

8.2 RabbitMQ 消息 Return 模式

rabbitmq 整个消息投递的路径为:

producer —> exchange —> queue —> consumer >>

消息从 producer 到 exchange 则会返回一个 confirmCallback;

消息从 exchange –> queue 投递失败则会返回一个 returnCallback; 我们可以利用这两个 callback 控制消息的可靠性投递

开启 确认模式;

1.配置文件中开启

spring.rabbitmq.publisher-returns: true

使用 rabbitTemplate.setConfirmCallback 设置回调函数,当消息发送到 exchange 后回 调 confirm 方法。在方法中判断 ack,如果为 true,则发送成功,如果为 false,则发送失 败,需要处理;

配置文件

server:
  port: 8080
spring:
  application:
    name: return2-to-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated
    publisher-returns: true

my:
  exchangeName: exchange.return2
  queueAName: queue.return2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

配置类

@Configuration
@Slf4j
public class RabbitMqConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueAName}")
    private String queueAName;

    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    //创建队列A
    @Bean
    public Queue queueA(){
        return new Queue(queueAName);
    }


    //链接A队列
    @Bean
    public Binding bindingA(DirectExchange directExchange,Queue queueA){

        return BindingBuilder.bind(queueA).to(directExchange).with("info");
    }



}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

生产者

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Value("${my.exchangeName}")
    private String exchangeName;
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(
                (correlationData, ack, cause) -> {
                    if (ack) {
                        log.info("消息成功到达交换机");
                        return;
                    }
                    log.error("消息没有到达交换机,原因是:{}" + cause);
                }
        );
	//实现rabbitTemplate.setConfirmCallback接口
        rabbitTemplate.setReturnsCallback(
                returnedMessage -> log.error("消息被返回啦,错误原因是:{}",returnedMessage.getMessage())
        );
    }
    public void SendMsg(){
        CorrelationData correlationData=new CorrelationData();
        correlationData.setId("ackId");
        rabbitTemplate.send(exchangeName,"info", MessageBuilder.withBody("hello word".getBytes()).build(),correlationData);
        log.info("消息发送完毕,时间为:{}",new Date());
    }




}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

启动类

@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMsg();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

8.3 RabbitMQ的消息消费时的手动 ack确认机制

采用消息消费时的手动 ack 确认机制来保证;

在这里插入图片描述

如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)

在配置文件中开启手动 ack 消息消费确认

#开启手动 ack 消息消费确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2

消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ 会等待消 费者显式地回复确认信号后才从队列中删除消息,

如果消息消费失败,也可以调用 basicReject()或者 basicNack()来拒绝当前消息而不是确认。 如果 requeue 参数设置为 true,可以把这条消息重新存入队列,以便发给下一个消费者(当 然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的 队列中,或者只打印异常日志);

配置文件

server:
  port: 8080
spring:
  application:
    name: receive-message
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    listener:
      simple:
        acknowledge-mode: manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

消费者

@Component
@Slf4j
public class ReceiveMessage {
    @RabbitListener(queues = {"queue.normal.4"})
    public void receiveMessage(Message message, Channel channel){
        MessageProperties messageProperties = message.getMessageProperties();
        //获取消息的唯一标识
        long deliveryTag = messageProperties.getDeliveryTag();
        try {
            byte[] body = message.getBody();
            String s = new String(body);
            log.info("接收到的消息为:{}",s);
            int i=1/0;
            channel.basicAck(deliveryTag,false); //参数1为 唯一标识 参数2 false只确认当前消息

        }catch (Exception e){
            try {
                channel.basicNack(deliveryTag,false,true);//参数一 消息唯一标识 参数二 只确认当前消息 参数三 是否重新入队
                log.error("错误信息为:{}",e.getMessage());
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

8.4 备用交换机

备用交换机使用场景

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在 rabbitmq 中会默认丢弃消息,

如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息

注意:备用交换机一般使用 fanout 交换机

配置文件

server:
  port: 8080
spring:
  application:
    name: alternate-to-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated
    publisher-returns: true
my:
  exchangeName: exchange.normal
  queueAName: queue.normal

  alternateExchangeName: exchange.alternate
  alternateQueueName: queue.alternate
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

rabbit配置文件

package com.xxp.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Slf4j
public class RabbitMqConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueAName}")
    private String queueAName;
    @Value("${my.alternateExchangeName}")
    private String alternateExchangeName;
    @Value("${my.alternateQueueName}")
    private String alternateQueueName;
    //创建交换机
    @Bean
    public DirectExchange directExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange",alternateExchangeName);

        return  new DirectExchange(exchangeName,true,false,arguments);
    }

    //创建队列
    @Bean
    public Queue normalQueue(){
        return new Queue(queueAName);
    }


    //绑定A队列
    @Bean
    public Binding bindingA(DirectExchange directExchange,Queue normalQueue ){

        return BindingBuilder.bind(normalQueue).to(directExchange).with("info");
    }
    @Bean
    //创建备用交换机:一般使用扇形交换机
    public FanoutExchange alternateFanoutExchange(){
        return ExchangeBuilder.fanoutExchange(alternateExchangeName).build();
    }
    @Bean
    //创建备用队列
    public Queue alternateQueue(){
        return QueueBuilder.durable(alternateQueueName).build();
    }
    @Bean
    //绑定备用交换机和备用队列
    public Binding alternateBinding(FanoutExchange alternateFanoutExchange,Queue alternateQueue){

        return BindingBuilder.bind(alternateQueue).to(alternateFanoutExchange);

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

生产者

package com.xxp.service;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Date;

@Service
@Slf4j
public class MessageService implements RabbitTemplate.ConfirmCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Value("${my.exchangeName}")
    private String exchangeName;
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(
                returnedMessage -> {
                    log.error("消息被返回啦,错误原因是:{}");
                }
        );
    }
    public void SendMsg(){
        CorrelationData correlationData=new CorrelationData();
        correlationData.setId("ackId");
        rabbitTemplate.send(exchangeName,"info", MessageBuilder.withBody("hello word".getBytes()).build(),correlationData);
        log.info("消息发送完毕,时间为:{}",new Date());
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息成功到达交换机");
            return;
        }
        log.error("消息没有到达交换机,原因是:{}" + cause);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

启动类

@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
    @Resource
    private MessageService messageService;
    public static void main(String[] args) {
        SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.SendMsg();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

9 消息的幂等性

消息消费时的幂等性(消息不被重复消费)

同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务, 否则就处理重复了

幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相 同的,不能因为重复的请求而对该资源重复造成影响;

以接口幂等性举例:

接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具 有幂等性的;

如何避免消息的重复消费问题?

全局唯一 ID + Redis 生产者在发送消息时,为每条消息设置一个全局唯一的 messageId,消费者拿到消息后,使 用 setnx 命令,将 messageId 作为 key 放到 redis 中:setnx(messageId, 1),若返回 1,说 明之前没有消费过,正常消费;若返回 0,说明这条消息之前已消费过,抛弃;

配置类

server:
  port: 8080
spring:
  application:
    name: idempotent-to-exchange
  rabbitmq:
    host: 192.168.11.146
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual

  data:
    redis:
      host: 192.168.11.144
      port: 6379
      password: 123456

my:
  exchangeName: exchange.idempotent
  queueAName: queue.idempotent
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {
    private String id;
    private String orderName;
    private BigDecimal orderMoney;
    private Date orderTime;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

配置类

package com.xxp.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMqConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Value("${my.queueAName}")
    private String queueAName;

    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    //创建队列A
    @Bean
    public Queue queueA(){
        return new Queue(queueAName);
    }


    //链接A队列
    @Bean
    public Binding bindingA(DirectExchange directExchange,Queue queueA){

        return BindingBuilder.bind(queueA).to(directExchange).with("info");
    }



}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

生产者

package com.xxp.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xxp.vo.Orders;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.Date;

@Service
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    ObjectMapper objectMapper; //序列化和反序列化
    @Value("${my.exchangeName}")
    private String exchangeName;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(
                (correlationData, ack, cause) -> {
                    if (ack) {
                        log.info("消息成功到达交换机");
                        return;
                    }
                    log.error("消息没有到达交换机,原因是:{}" + cause);
                }
        );

        rabbitTemplate.setReturnsCallback(
                returnedMessage -> log.error("消息被返回啦,错误原因是:{}", returnedMessage.getMessage())
        );
    }

    public void SendMsg() throws JsonProcessingException {
        {
            Orders order1 = Orders.builder()
                            .id("order_123456")
                            .orderName("遥遥领先")
                            .orderMoney(new BigDecimal(8888))
                            .orderTime(new Date()).build();

            String strOrder1 = objectMapper.writeValueAsString(order1);
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId("ackId");
            rabbitTemplate.send(exchangeName, "info", MessageBuilder.withBody(strOrder1.getBytes()).build(), correlationData);
            log.info("消息发送完毕,时间为:{}", new Date());
        }
        {
            Orders order2 = Orders.builder().id("order_123456").orderName("遥遥领先").orderMoney(new BigDecimal(8888)).orderTime(new Date()).build();
            String strOrder2 = objectMapper.writeValueAsString(order2);
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId("ackId");
            rabbitTemplate.send(exchangeName, "info", MessageBuilder.withBody(strOrder2.getBytes()).build(), correlationData);
            log.info("消息发送完毕,时间为:{}", new Date());
        }
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

消费者

package com.xxp.message;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.xxp.vo.Orders;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class ReceiveMessage {
    @Resource
    private ObjectMapper objectMapper;
    @Resource
    StringRedisTemplate stringRedisTemplate;
     @RabbitListener(queues = {"queue.idempotent"})
    public void receiveMessage(Message message, Channel channel){
        MessageProperties properties = message.getMessageProperties();
        //获取消息的唯一标识
        long deliveryTag = properties.getDeliveryTag();

        try{
            //获取消息
            Orders orders = objectMapper.readValue(message.getBody(), Orders.class);
            log.info("获取的消息为{}",orders.toString());
            //TODO 插入订单
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("idempotent" + orders.getId(), orders.getId());
            if (result==true){
                //TODO 像插入数据库
                log.info("插入数据库");
            }
            //参数1为 唯一标识 参数2 false只确认当前消息
            channel.basicAck(deliveryTag,false);
        }catch (Exception e){
            try {
                channel.basicNack(deliveryTag,false,true);//参数一 消息唯一标识 参数二 只确认当前消息 参数三 是否重新入队
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

10 RabbitMQ 集群与高可用

RabbitMQ 的集群分两种模式,一种是默认集群模式,一种是镜像集群模式;

在 RabbitMQ 集群中所有的节点(一个节点就是一个 RabbitMQ 的 broker 服务器) 被归为两 类:

一类是磁盘节点,一类是内存节点;

磁盘节点会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中,而内存 节点只会将这些信息保存到内存中,

如果该节点宕机或重启,内存节点的数据会全部丢失, 而磁盘节点的数据不会丢失

10.1 默认集群模式

默认集群模式也叫 普通集群模式、或者 内置集群模式

RabbitMQ 默认集群模式,只会把交换机、队列、虚拟主机等元数据信息在各 个节点同步,

而具体队列中的消息内容不会在各个节点中同步

元数据的解释:

队列元数据:队列名称和属性(是否可持久化,是否自动删除)

交换器元数据:交换器名称、类型和属性

绑定元数据:交换器和队列的绑定列表

vhost元数据:vhost 内的相关属性,如安全属性等;

当用户访问其中任何一个 RabbitMQ 节点时,查询到的 queue/user/ exchange/vhost 等信息都是相同的;

但集群中队列的具体信息数据只在队列的拥有者节点保存,其他节点只知道队列 的元数据和指向该节点的指针,所以其他节点接收到不属于该节点队列的消息时 会将该消息传递给该队列的拥有者节点上;

为什么集群不复制队列内容和状态到所有节点:

1)存储空间; 2)性能;

如果消息需要复制到集群中每个节点,网络开销不可避免,持久化消息还需要写 磁盘,占用磁盘空间。

在这里插入图片描述

10.2 安装前准备

1 从已经安装好 rabbitmq 的机器 clone 三台机器

2 重新设置三台机器的 mac 地址 注意 clone 完,先不要启动三台机器,三台机器均要重新生成 mac 地址,防止 clone 出 的机器 ip 地址重复

在这里插入图片描述

在这里插入图片描述

3 启动三台机器

并查看ip地址

ip a
  • 1

4 使用 xshell 连接三台机器

5 修改三台机器的/etc/hosts 文件

首先需要配置一下 hosts 文件,因为 RabbitMQ 集群节点名称是读取 hosts 文件得到的;

 vim /etc/hosts
  • 1
192.168.11.145 rabbitmq1
192.168.11.147 rabbitmq2
192.168.11.148 rabbitmq3

  • 1
  • 2
  • 3
  • 4

6 三台机器均重启网络,使节点名生效

systemctl restart network
  • 1

7 三台机器的 xshell 均退出,然后再重新连

8 三台机器的防火墙处理

systemctl status firewalld
systemctl stop firewalld --关闭防火墙
systemctl disable firewalld --开机不启动防火墙
  • 1
  • 2
  • 3

9 三台机器 .erlang.cookie 文件保持一致

由于是 clone 出的三台机器,所以肯定是一样

如果我们使用解压缩方式安装的 RabbitMQ,那么该文件会在${用户名}目录下,

也就是${用户名}/.erlang.cookie;

如果我们使用 rpm 安装包方式进行安装,那么这个文件会在/var/lib/rabbitmq 目录下

注意 .erlang.cookie 的权限为 400,目前已经是 400

10 分别启动三台机器上的 rabbitmq

rabbitmq-server -detached
  • 1

11 查看集群状态

rabbitmqctl cluster_status
  • 1

12 构建集群

在 rabbitmq2 机器上执行命令,让 2 的 rabbitmq 加

./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl join_cluster rabbit@rabbit1 --ram
./rabbitmqctl start_app
  • 1
  • 2
  • 3
  • 4

–ram 参数表示让 rabbitmq2 成为一个内存节点,如果不带参数默认为 disk 磁盘节点

在 rabbit130 节点上也执行同样的命令,使 rabbit3

./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl join_cluster rabbit@rabbit1 --ram
./rabbitmqctl start_app
  • 1
  • 2
  • 3
  • 4

13 操作一个节点,添加用户和权限等

#列出用户
rabbitmqctl list_users
# 添加用户
rabbitmqctl add_user admin 123456
#查看权限
rabbitmqctl list_permissions
#设置权限
rabbitmqctl set_permissions admin ".*" ".*" ".*" #设置角色
rabbitmqctl set_user_tags admin administrator
#启动 web 控制台插件
./rabbitmq-plugins enable rabbitmq_management
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

使用 web 浏览器添加一个虚拟主机 :powernod

14 再次查看集群状态

当执行完操作以后我们在浏览器访问 web 管控台来看看效果;

随便在哪个节点打开 web 管控台都能看到集群环境各节点的信息;

也可以使用 ./rabbitmqctl cluster_status 查看集群状态

Springboot 连接集群

配置文件加入 addresses:

server:
  port: 8080
spring:
  application:
    name: cluster-to-exchange
  rabbitmq:
#    host: 192.168.11.146
#    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
    addresses: 192.168.145:5672,192.168.11.147:5672,192.168.148:5672
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

10.3 镜像模式安装

镜像模式是基于默认集群模式加上一定的配置得来的; 在默认模式下的 RabbitMQ 集群,它会把所有节点的交换机、绑定、队列的元 数据进行复制确保所有节点都有一份相同的元数据信息,但是队列数据分为两种:

一种是队列的元数据信息(比如队列的最大容量,队列的名称等配置信息),

另 一种是队列里面的消息镜像模式,则是把所有的队列数据完全同步,包括元数据信息和消息数据信息, 当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模 式;

实现镜像模式也非常简单,它是在普通集群模式基础之上搭建而成的; 镜像队列配置命令

/rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority] -p Vhost: 可选参数,针对指定 vhost 下的 queue 进行设置;

Name: policy 的名称;(可以自己取个名字就可以)

Pattern: queue 的匹配模式(正则表达式);

Definition:镜像定义,包括三个部分 ha-mode, ha-params, ha-sync-mode;

priority:可选参数,policy 的优先级
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

例子

rabbitmqctl set_policy -p powernode ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/515788
推荐阅读
相关标签
  

闽ICP备14008679号