赞
踩
注意本次教程的系统是centos7
开发软件是idea2022
rabbitmq版本是3.10.11
springboot版本是 springboot3
RabbitMQ 是一个广泛使用的消息服务器,采用 Erlang 语言编写,是一种开源的实现
实现了AMQP(高级消息队列协议)的消息中间件;
RabbitMQ 最初起源于金融系统,它的性能及稳定性都非常出色;
AMQP 协议(http://www.amqp.org),即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
官网:https://www.rabbitmq.com
Github:https://github.com/rabbitmq
简单来说,消息中间件就是指保存数据的一个容器(服务器),可以用于两个系统之间
的数据传递。
消息中间件一般有三个主要角色:生产者、消费者、消息代理(消息队列、消息服务器);
生产者发送消息到消息服务器,然后消费者从消息代理(消息队列)中获取数据
并进行处理
常见的消息中间件:
RabbitMQ
kafka(大数据领域)
RocketMQ(阿里巴巴开源)献给 Apache 组织
pulsar(最近一两年流行起来的)
RabbitMQ 是使用 Erlang 语言开发的,
所以要先下载安装 Erlang
下载时一定要注意版本兼容性
版本兼容说明地址:https://www.rabbitmq.com/which-erlang.html
图片: ![Alt](https://img-home.csdnimg.cn/images/20220524100510.png
去官网查看对应版本
Erlang 官网
https://www.erlang.org/
Linux 下载:
wget
https://github.com/erlang/otp/releases/download/OTP-25.1.1/otp_src_25.1.1
.tar.gz
说明:wget 是 linux 命令,可以用来下载软件
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
说明:yum -y install 安装 linux 的一些依赖库的命令 ,-y 表示自动确认;
tar -zxvf otp_src_25.1.1.tar.gz
切换到解压的目录下,运行相应命令
cd otp_src_25.1.1
./configure
make
make install
安装好了 erlang 后可以将解压的文件夹删除:
rm -rf otp_src_25.
验证 erlang 是否安装成功:
在命令行输入: erl
如果进入了编程命令行则表示安装成功,然后按 ctrl + z 退出编 程命令行;
从 RabbitMQ 官网 https://www.rabbitmq.com
找到下载链接 Linux:
下载 3.10.11
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.11/ra bbitmq-server-generic-unix-3.10.11.tar.xz
generic 是通用的意思,这个版本也就是通用的 unix 版本
解压 RabbitMQ 的压缩包,即安装完成,无需再编译
tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz -C /usr/local/
-C 选项是指定解压目录,如果不指定会解压到当前目录
控制台输入
vi /etc/profile
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbin
export RABBIT_HOME PATH
刷新环境变量
source /etc/profile
配置完环境变量就可以在任意地方启动mq啦
rabbitmq-server -detached
-detached 代表后台启动
查看redis状态
rabbitmqctl -n rabbit status
用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。 这些操作都是通过 rabbitmqctl
查看帮助: rabbitmqctl add_user --help 相应的命令
(1) 查看当前用户列表
rabbitmqctl list_users
(2) 新增一个用户 语法:
rabbitmqctl add_user Username Password
示例:
rabbitmqctl add_user admin 12345
(3)设置用户角色
rabbitmqctl set_user_tags User Tag
示例:rabbitmqctl set_user_tags admin administrato
此处设置用户的角色为管理员角色
(4)设置用户权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
此操作是设置 admin 用户拥有操作虚拟主机/下的所有权限
查看用户权限
(5)查看用户权限
rabbitmqctl list_permissions
Rabbitmq 有一个 web 管理后台,这个管理后台是以插件的方式提供的,启动后台 web 管理功能,
切换到 sbin 目录下执行
查看 rabbitmq 的插件列表
./rabbitmq-plugins list
#启用
./rabbitmq-plugins enable rabbitmq_management
#禁用
./rabbitmq-plugins disable rabbitmq_management
防火墙操作
把防火墙关闭啦,否则外部无法打开 web管理后台
systemctl status firewalld --检查防火墙状态
systemctl stop firewalld --关闭防火墙,Linux 重启之后会失效
systemctl disable firewalld --防火墙置为不可用,Linux 重启后,防火墙服务不自动启动,
依然是不可用
访问web管理后台
http://192.168.131.131:15672
用户名/密码为我们上面创建的 admin/123456
注意上面改成你的虚拟主机的 ip 地址
消息队列有三个核心要素: 消息生产者、消息队列、消息消费者;
生产者(Producer):发送消息的应用;(java 程序,也可能是别的语言写的程序)
消费者(Consumer):接收消息的应用;(java 程序,也可能是别的语言写的程序)
代理(Broker):就是消息服务器,RabbitMQ Server 就是 Message Broker;
连接(Connection):连接 RabbitMQ 服务器的 TCP 长连接;
信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道 进行的
虚拟主机(Virtual host):一个虚拟分组,在代码中就是一个字符串,当多个不同的用 户使用同一个 RabbitMQ 服务时,可以划分出多个 Virtual host,每个用户在自己的 Virtual host 创建 exchange/queue 等;(分类比较清晰、相互隔离)
交换机(Exchange):交换机负责从生产者接收消息,并根据交换机类型分发到对应的消 息队列中,起到一个路由的作用;
路由键(Routing Key):交换机根据路由键来决定消息分发到哪个队列,路由键是消息 的目的地址;
绑定(Binding):绑定是队列和交换机的一个关联连接(关联关系);
队列(Queue):存储消息的缓存;
消息(Message):由生产者通过 RabbitMQ 发送给消费者的信息;(消息可以任何数据, 字符串、user 对象,json 串等等)
1、Fanout Exchange(扇形)
2、Direct Exchange(直连)
3、Topic Exchange(主题)
4、Headers Exchange(头部)
Fanout 扇形的,散开的; 扇形交换机 投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;
依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
配置类
@Configuration public class RabbitConfig { //三部曲 //定义交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("exchange.fanout"); } //定义队列A @Bean public Queue queueA(){ return new Queue("queue.fanout.a"); } //定义队列B @Bean public Queue queueB(){ return new Queue("queue.fanout.b"); } //绑定交换机和队列 @Bean public Binding bindingA(FanoutExchange fanoutExchange,Queue queueA){ //将队列a绑定到扇形交换机 return BindingBuilder.bind(queueA).to(fanoutExchange); } //绑定交换机和队列 @Bean public Binding bindingB(FanoutExchange fanoutExchange,Queue queueB){ //将队列b绑定到扇形交换机 return BindingBuilder.bind(queueB).to(fanoutExchange); } }
生产者
@Component
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMess(){
String msg="hello world";
Message message = new Message(msg.getBytes());
rabbitTemplate.convertAndSend("exchange.fanout","",message);
log.info("消息发送完毕,发送时间为:{}",new Date());
}
}
消费者
@Component
@Slf4j
public class ReceiveMessage {
@RabbitListener(queues = {"queue.fanout.a","queue.fanout.b"})
public void receiveMessage(Message message){
byte[] body = message.getBody();
String s = new String(body);
log.info("接收到的消息为:{}",s);
}
}
根据路由键精确匹配(一模一样)进行路由消息队列
创建springboot项目
导入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
修改配置文件:加入host port username passwrod等配置信息
server:
port: 8080
spring:
application:
name: direct-exchange
rabbitmq:
host: 192.168.11.146
port: 5672
username: admin
password: 123456
virtual-host: powernode
my:
exchangeName: exchange.direct #交换机名字
queueAName: queue.direct.a #队列a名字
queueBName: queue.direct.b #队列b名字
Rabbitmq配置文件
@Configuration //@ConfigurationProperties(prefix = "my") public class RabbitMqConfig { @Value("${my.exchangeName}") private String exchangeName; @Value("${my.queueAName}") private String queueAName; @Value("${my.queueBName}") private String queueBName; //创建交换机 @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange(exchangeName).build(); } //创建队列A @Bean public Queue queueA(){ return new Queue(queueAName); } //创建队列B @Bean public Queue queueB(){ return new Queue(queueBName); } //链接A队列 @Bean public Binding bindingA(DirectExchange directExchange,Queue queueA){ return BindingBuilder.bind(queueA).to(directExchange).with("error"); } //链接B队列 @Bean public Binding bindingB1(DirectExchange directExchange,Queue queueB){ return BindingBuilder.bind(queueB).to(directExchange).with("error"); } @Bean public Binding bindingB2(DirectExchange directExchange,Queue queueB){ return BindingBuilder.bind(queueB).to(directExchange).with("info"); } @Bean public Binding bindingB3(DirectExchange directExchange,Queue queueB){ return BindingBuilder.bind(queueB).to(directExchange).with("warning"); } }
编写生产者发送消息
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void SendMsg(){
rabbitTemplate.send("exchange.direct","info", MessageBuilder.withBody("hello word".getBytes()).build());
log.info("消息发送完毕!");
}
}
启动测试
@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMsg();
}
}
通配符匹配,相当于模糊匹配;
#匹配多个单词,用来表示任意数量(零个或多个)单词
*匹配一个单词(必须有一个,而且只有一个),用.隔开的为一个单词
beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
beijing.* == beijing.queue, beijing.xyz
创建springboot项目
导入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
修改配置文件:加入host port username passwrod等配置信息
server: port: 8080 spring: application: name: topic-exchange rabbitmq: host: 192.168.11.146 port: 5672 username: admin password: 123456 virtual-host: powernode my: exchangeName: exchange.topic #交换机的名字 queueAName: queue.topic.a #队列名字a queueBName: queue.topic.b #队列名字b
编写Rabbitmq配置文件
@Configuration public class RabbitMQConfig { @Value("${my.exchangeName}") private String exchangeName; @Value("${my.queueAName}") private String queueAName; @Value("${my.queueBName}") private String queueBName; //创建交换机 @Bean public TopicExchange topicExchange(){ return ExchangeBuilder.topicExchange(exchangeName).build(); } //创建队列A @Bean public Queue queueA(){ return QueueBuilder.durable(queueAName).build(); } //创建队列B @Bean public Queue queueB(){ return QueueBuilder.durable(queueBName).build(); } //创建绑定 @Bean public Binding bindingA(TopicExchange topicExchange,Queue queueA){ return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*"); } @Bean public Binding bindingB1(TopicExchange topicExchange,Queue queueB){ return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit"); } @Bean public Binding bindingB2(TopicExchange topicExchange,Queue queueB){ return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#"); } }
编写生产者发送消息
@Service
public class MessageService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendMsg(){
Message message = MessageBuilder.withBody("hello word".getBytes()).build();
amqpTemplate.convertAndSend("exchange.topic","lazy.orange.rabbit",message);
}
}
启动测试
@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMsg();
}
}
基于消息内容中的 headers 属性进行匹配
过期消息也叫 TTL 消息,TTL:Time To Live
消息的过期时间有两种设置方式:
1.设置单条消息的过期时间
单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;
编写生产者的时候通过MessageProperties设置单条消息过期时间
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000"); //单条消息过期时间
Message message = new Message("ceshi".getBytes());
rabbitTemplate.send("directExchange","error",message);
log.info("发送成功!");
}
}
2.通过设置队列属性设置单挑消息过期时间
队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;
在rabbitmq的配置文件中
创建队列的时候修改队列属性
@Configuration public class RabbitMqConfig { private String exchangeName="directExchange"; private String queueAName="queueA"; //创建直连交换机 @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange(exchangeName).build(); } //创建队列A @Bean public Queue queueA(){ Map<String, Object> arguments=new HashMap<>(); //通过设置队列属性设置过期时间 arguments.put("x-message-ttl",10000); return new Queue(queueAName,true,false,false,arguments); } //创建链接 @Bean public Binding binding(DirectExchange directExchange,Queue queueA){ return BindingBuilder.bind(queueA).to(directExchange).with("error"); } }
也叫死信交换机、死信邮箱等
DLX: Dead-Letter-Exchange 死信交换器,死信邮箱
如果消息长时间没有消费者消费,那么我们可以设置让消息进入死信队列
需要2台交换机 一台正常交换机 一台死信交换机
2个队列 正常队列 死信队列
配置文件
server: port: 8080 spring: application: name: dlx-exchange rabbitmq: host: 192.168.11.146 port: 5672 username: admin password: 123456 virtual-host: powernode my: exchangeNormalName: exchange.normal.a queueANormalName: queue.normal.a exchangeDlxName: exchange.dlx.a queueADlxName: queue.dlx.a
rabbitmq配置类
@Configuration public class RabbitMqConfig { @Value("${my.exchangeNormalName}") private String exchangeNormalName; @Value("${my.queueANormalName}") private String queueANormalName; @Value("${my.exchangeDlxName}") private String exchangeDlxName; @Value("${my.queueADlxName}") private String queueADlxName; /** * 正常直连交换机 * @return */ @Bean public DirectExchange normalDirectExchange() { return ExchangeBuilder.directExchange(exchangeNormalName).build(); } /** * 正常队列 * @return */ @Bean public Queue normalQueue(){ Map<String, Object> arguments=new HashMap<>(); arguments.put("x-message-ttl",15000);//设置队列过期时间为20秒 arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置队列的死信交换机 arguments.put("x-dead-letter-routing-key","dlxOrder"); //设置死信路由key return QueueBuilder.durable(queueANormalName) .withArguments(arguments)//设置队列过期时间 .build(); } /** * 绑定交换机和队列 * @param normalDirectExchange * @param normalQueue * @return */ @Bean public Binding bindingNormal(DirectExchange normalDirectExchange,Queue normalQueue){ return BindingBuilder.bind(normalQueue).to(normalDirectExchange).with("order"); } /** * 死信交换机 * @return */ @Bean public DirectExchange dlxDirectExchange(){ return ExchangeBuilder.directExchange(exchangeDlxName).build(); } /** * 死信队列 * @return */ @Bean public Queue dlxQueue(){ return QueueBuilder.durable(queueADlxName).build(); } @Bean public Binding bindingDlx(DirectExchange dlxDirectExchange,Queue dlxQueue){ return BindingBuilder.bind(dlxQueue).to(dlxDirectExchange).with("dlxOrder"); } }
生产者
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void SendMessage(){
Message message = MessageBuilder.withBody("hello".getBytes()).build();
rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
log.info("消息发送完毕");
}
}
启动类
@SpringBootApplication
public class SpringbootBootRabbitmqDlxApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringbootBootRabbitmqDlxApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMessage();
}
}
场景:有一个订单,15 分钟内如果不支付,就把该订单设置为交易关闭,那么就不 能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队 列来实现,也可以有一些其他办法实现
解决办法
采用消息中间件(rabbitmq)
1、RabbitMQ 本身不支持延迟队列,可以使用 TTL 结合 DLX 的方式来实现消息的延迟投递, 即把 DLX 跟某个队列绑定,到了指定时间,消息过期后,就会从 DLX 路由到这个队列,消费 者可以从这个队列取走消息
配置文件
server: port: 8080 spring: application: name: dlx-exchange rabbitmq: host: 192.168.11.146 port: 5672 username: admin password: 123456 virtual-host: powernode my: exchangeNormalName: exchange.normal.4 queueANormalName: queue.normal.4 exchangeDlxName: exchange.dlx.4 queueADlxName: queue.dlx.4
配置类
@Configuration public class RabbitMqConfig { @Value("${my.exchangeNormalName}") private String exchangeNormalName; @Value("${my.queueANormalName}") private String queueANormalName; @Value("${my.exchangeDlxName}") private String exchangeDlxName; @Value("${my.queueADlxName}") private String queueADlxName; /** * 正常直连交换机 * @return */ @Bean public DirectExchange normalDirectExchange() { return ExchangeBuilder.directExchange(exchangeNormalName).build(); } /** * 正常队列 * @return */ @Bean public Queue normalQueue(){ Map<String, Object> arguments=new HashMap<>(); arguments.put("x-message-ttl",15000);//设置队列过期时间为15秒 arguments.put("x-dead-letter-exchange",exchangeDlxName);//设置队列的死信交换机 arguments.put("x-dead-letter-routing-key","dlxOrder"); //设置死信路由key // arguments.put("x-max-length",5); //队列最大消息数 return QueueBuilder.durable(queueANormalName) .withArguments(arguments)//设置队列擦拭你 .build(); } /** * 绑定交换机和队列 * @param normalDirectExchange * @param normalQueue * @return */ @Bean public Binding bindingNormal(DirectExchange normalDirectExchange,Queue normalQueue){ return BindingBuilder.bind(normalQueue).to(normalDirectExchange).with("order"); } /** * 死信交换机 * @return */ @Bean public DirectExchange dlxDirectExchange(){ return ExchangeBuilder.directExchange(exchangeDlxName).build(); } /** * 死信队列 * @return */ @Bean public Queue dlxQueue(){ return QueueBuilder.durable(queueADlxName).build(); } @Bean public Binding bindingDlx(DirectExchange dlxDirectExchange,Queue dlxQueue){ return BindingBuilder.bind(dlxQueue).to(dlxDirectExchange).with("dlxOrder"); } }
生产者
@Service @Slf4j public class MessageService { @Resource private RabbitTemplate rabbitTemplate; public void SendMessage(){ // MessageProperties messageProperties=new MessageProperties(); // messageProperties.setExpiration("15000"); String mes="hello"; Message message = MessageBuilder.withBody(mes.getBytes()).build(); rabbitTemplate.convertAndSend("exchange.normal.4","order",message); log.info("消息发送完毕"); } }
启动类
@SpringBootApplication
public class SpringbootBootRabbitmqDlxApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringbootBootRabbitmqDlxApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMessage();
}
}
问题:
如果先发送的消息,消息延迟时间长,会影响后面的 延迟时间段的消息的消费;
//解决:不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样
这个解决方式代码实现是非常混乱复杂的,我们可以采用第二种方式
2.使用 rabbitmq-delayed-message-exchange 延迟插件
选 择 对 应 的 版 本 下 载 rabbitmq-delayed-message-exchange 插 件 ,
下 载 地 址 : http://www.rabbitmq.com/community-plugins.html
1.插件拷贝到 RabbitMQ 服务器 plugins 目录
解压
unzip rabbitmq_delayed_message_exchange-3.10.2.ez
如果 unzip 没有安装,先安装一下
yum install unzip -y
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查询安装的所有插件
rabbitmq-plugins list
插件运行原理图
在这里插入图片描述
消息发送后不会直接投递到队列, 而是存储到 Mnesia(嵌入式数据库),检查 x-delay
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现 解决了消息过期时间不一致出现的问题
代码
依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
配置文件
server:
port: 8080
spring:
application:
name: dlx-exchange
rabbitmq:
host: 192.168.11.146
port: 5672
username: admin
password: 123456
virtual-host: powernode
my:
exchangeDelayName: exchange.delayed.plugin
queueADelayName: queue.delayed.plugin
配置类
这里设置的交换机类型有所不一样
@Configuration public class RabbitMqConfig { @Value("${my.exchangeDelayName}") private String exchangeDelayName; @Value("${my.queueADelayName}") private String queueADelayName; /** * 交换机 * @return */ @Bean public CustomExchange customExchange() { Map<String, Object> arguments=new HashMap<>(); arguments.put("x-delayed-type", "direct"); // CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) CustomExchange customExchange = new CustomExchange(exchangeDelayName,"x-delayed-message",true,false, arguments); return customExchange; } /** * 正常队列 * @return */ @Bean public Queue normalQueue(){ return QueueBuilder.durable(queueADelayName).build(); } /** * 绑定交换机和队列 * @param * @param normalQueue * @return */ @Bean public Binding bindingNormal(CustomExchange customExchange,Queue normalQueue){ return BindingBuilder.bind(normalQueue).to(customExchange).with("info").noargs(); } }
生产者
注意这里设置消息的延迟时间不一样,是设置请求头
@Service @Slf4j public class MessageService { @Resource private RabbitTemplate rabbitTemplate; public void SendMessage() { { //设置消息的延迟时间 MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay", 10000); String mes = "hello"; Message message = MessageBuilder.withBody(mes.getBytes()).andProperties(messageProperties).build(); rabbitTemplate.convertAndSend("exchange.delayed.plugin", "info", message); log.info("消息发送完毕,时间为{}",new Date()); } { MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay", 15000); String mes = "hello"; Message message = MessageBuilder.withBody(mes.getBytes()).andProperties(messageProperties).build(); rabbitTemplate.convertAndSend("exchange.delayed.plugin", "info", message); log.info("消息发送完毕,时间为:{}",new Date()); } } }
启动类
@SpringBootApplication
public class SpringbootBootRabbitmqDlxApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringbootBootRabbitmqDlxApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMessage();
}
}
消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一 些性能,性能与可靠性是无法兼得的
① 代表消息从生产者发送到 Exchange;
② 代表消息从 Exchange 路由到 Queue;
③ 代表消息在 Queue 中存储;
④ 代表消费者监听 Queue 并消费消
消息可靠性的方式
1、确保消息发送到 RabbitMQ 服务器的交换机上
confirm 确认机制
2、确保消息路由到正确的队列
Return模式
3、确保消息在队列正确地存储
开启持久化
4、集群,镜像队列,高可用
5、确保消息从队列正确地投递到消费者
采用消息消费时的手动 ack 确认机制来保证
消息的 confirm 确认机制,是指生产者投递消息后,到达了消息服务器 Broker 里面的 exchange 交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常 的发送到 Broker 的 exchange 中,这也是消息可靠性投递的重要保障
具体代码设置
1 配置文件 application.yml 开启确认模式:
spring.rabbitmq.publisher-confirm-type=correlated
2 写一个类实现 implements RabbitTemplate.ConfirmCallback,判断成功和失败的 ack
结果,可以根据具体的结果,如果 ack 为 false,对消息进行重新发送或记录日志等处理;
设置 rabbitTemplate 的确认回调方法
3 rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
配置文件
server:
port: 8080
spring:
application:
name: confirm3-to-exchange
rabbitmq:
host: 192.168.11.146
port: 5672
username: admin
password: 123456
virtual-host: powernode
publisher-confirm-type: correlated
my:
exchangeName: exchange.confirm3
queueAName: queue.confirm3
rabbitmq的配置类
@Configuration @Slf4j public class RabbitMqConfig { @Value("${my.exchangeName}") private String exchangeName; @Value("${my.queueAName}") private String queueAName; @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange(exchangeName).build(); } //创建队列A @Bean public Queue queueA(){ return new Queue(queueAName); } //链接A队列 @Bean public Binding bindingA(DirectExchange directExchange,Queue queueA){ return BindingBuilder.bind(queueA).to(directExchange).with("info"); } }
生产者
@Service @Slf4j public class MessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback( //实现implements RabbitTemplate.ConfirmCallback (correlationData, ack, cause) -> { if (ack) { log.info("消息成功到达交换机"); return; } log.error("消息没有到达交换机,原因是:{}" + cause); } ); } public void SendMsg() { CorrelationData correlationData = new CorrelationData(); correlationData.setId("ackId"); rabbitTemplate.send("exchange.confirm3", "info", MessageBuilder.withBody("hello word".getBytes()).build(), correlationData); log.info("消息发送完毕,时间为:{}", new Date()); } }
启动类
@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMsg();
}
}
rabbitmq 整个消息投递的路径为:
producer —> exchange —> queue —> consumer >>
消息从 producer 到 exchange 则会返回一个 confirmCallback;
消息从 exchange –> queue 投递失败则会返回一个 returnCallback; 我们可以利用这两个 callback 控制消息的可靠性投递
开启 确认模式;
1.配置文件中开启
spring.rabbitmq.publisher-returns: true
使用 rabbitTemplate.setConfirmCallback 设置回调函数,当消息发送到 exchange 后回 调 confirm 方法。在方法中判断 ack,如果为 true,则发送成功,如果为 false,则发送失 败,需要处理;
配置文件
server: port: 8080 spring: application: name: return2-to-exchange rabbitmq: host: 192.168.11.146 port: 5672 username: admin password: 123456 virtual-host: powernode publisher-confirm-type: correlated publisher-returns: true my: exchangeName: exchange.return2 queueAName: queue.return2
配置类
@Configuration @Slf4j public class RabbitMqConfig { @Value("${my.exchangeName}") private String exchangeName; @Value("${my.queueAName}") private String queueAName; @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange(exchangeName).build(); } //创建队列A @Bean public Queue queueA(){ return new Queue(queueAName); } //链接A队列 @Bean public Binding bindingA(DirectExchange directExchange,Queue queueA){ return BindingBuilder.bind(queueA).to(directExchange).with("info"); } }
生产者
@Service @Slf4j public class MessageService { @Resource private RabbitTemplate rabbitTemplate; @Value("${my.exchangeName}") private String exchangeName; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) -> { if (ack) { log.info("消息成功到达交换机"); return; } log.error("消息没有到达交换机,原因是:{}" + cause); } ); //实现rabbitTemplate.setConfirmCallback接口 rabbitTemplate.setReturnsCallback( returnedMessage -> log.error("消息被返回啦,错误原因是:{}",returnedMessage.getMessage()) ); } public void SendMsg(){ CorrelationData correlationData=new CorrelationData(); correlationData.setId("ackId"); rabbitTemplate.send(exchangeName,"info", MessageBuilder.withBody("hello word".getBytes()).build(),correlationData); log.info("消息发送完毕,时间为:{}",new Date()); } }
启动类
@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMsg();
}
}
采用消息消费时的手动 ack 确认机制来保证;
如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。
为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)
在配置文件中开启手动 ack 消息消费确认
#开启手动 ack 消息消费确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ 会等待消 费者显式地回复确认信号后才从队列中删除消息,
如果消息消费失败,也可以调用 basicReject()或者 basicNack()来拒绝当前消息而不是确认。 如果 requeue 参数设置为 true,可以把这条消息重新存入队列,以便发给下一个消费者(当 然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的 队列中,或者只打印异常日志);
配置文件
server:
port: 8080
spring:
application:
name: receive-message
rabbitmq:
host: 192.168.11.146
port: 5672
username: admin
password: 123456
virtual-host: powernode
listener:
simple:
acknowledge-mode: manual
消费者
@Component @Slf4j public class ReceiveMessage { @RabbitListener(queues = {"queue.normal.4"}) public void receiveMessage(Message message, Channel channel){ MessageProperties messageProperties = message.getMessageProperties(); //获取消息的唯一标识 long deliveryTag = messageProperties.getDeliveryTag(); try { byte[] body = message.getBody(); String s = new String(body); log.info("接收到的消息为:{}",s); int i=1/0; channel.basicAck(deliveryTag,false); //参数1为 唯一标识 参数2 false只确认当前消息 }catch (Exception e){ try { channel.basicNack(deliveryTag,false,true);//参数一 消息唯一标识 参数二 只确认当前消息 参数三 是否重新入队 log.error("错误信息为:{}",e.getMessage()); } catch (IOException ex) { throw new RuntimeException(ex); } } } }
备用交换机使用场景
当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在 rabbitmq 中会默认丢弃消息,
如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息
注意:备用交换机一般使用 fanout 交换机
配置文件
server: port: 8080 spring: application: name: alternate-to-exchange rabbitmq: host: 192.168.11.146 port: 5672 username: admin password: 123456 virtual-host: powernode publisher-confirm-type: correlated publisher-returns: true my: exchangeName: exchange.normal queueAName: queue.normal alternateExchangeName: exchange.alternate alternateQueueName: queue.alternate
rabbit配置文件
package com.xxp.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration @Slf4j public class RabbitMqConfig { @Value("${my.exchangeName}") private String exchangeName; @Value("${my.queueAName}") private String queueAName; @Value("${my.alternateExchangeName}") private String alternateExchangeName; @Value("${my.alternateQueueName}") private String alternateQueueName; //创建交换机 @Bean public DirectExchange directExchange(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("alternate-exchange",alternateExchangeName); return new DirectExchange(exchangeName,true,false,arguments); } //创建队列 @Bean public Queue normalQueue(){ return new Queue(queueAName); } //绑定A队列 @Bean public Binding bindingA(DirectExchange directExchange,Queue normalQueue ){ return BindingBuilder.bind(normalQueue).to(directExchange).with("info"); } @Bean //创建备用交换机:一般使用扇形交换机 public FanoutExchange alternateFanoutExchange(){ return ExchangeBuilder.fanoutExchange(alternateExchangeName).build(); } @Bean //创建备用队列 public Queue alternateQueue(){ return QueueBuilder.durable(alternateQueueName).build(); } @Bean //绑定备用交换机和备用队列 public Binding alternateBinding(FanoutExchange alternateFanoutExchange,Queue alternateQueue){ return BindingBuilder.bind(alternateQueue).to(alternateFanoutExchange); } }
生产者
package com.xxp.service; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.Date; @Service @Slf4j public class MessageService implements RabbitTemplate.ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate; @Value("${my.exchangeName}") private String exchangeName; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback( returnedMessage -> { log.error("消息被返回啦,错误原因是:{}"); } ); } public void SendMsg(){ CorrelationData correlationData=new CorrelationData(); correlationData.setId("ackId"); rabbitTemplate.send(exchangeName,"info", MessageBuilder.withBody("hello word".getBytes()).build(),correlationData); log.info("消息发送完毕,时间为:{}",new Date()); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息成功到达交换机"); return; } log.error("消息没有到达交换机,原因是:{}" + cause); } }
启动类
@SpringBootApplication
public class SpringBootRabbitmqDirectApplication implements ApplicationRunner {
@Resource
private MessageService messageService;
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitmqDirectApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.SendMsg();
}
}
消息消费时的幂等性(消息不被重复消费)
同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务, 否则就处理重复了
幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相 同的,不能因为重复的请求而对该资源重复造成影响;
以接口幂等性举例:
接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具 有幂等性的;
如何避免消息的重复消费问题?
全局唯一 ID + Redis 生产者在发送消息时,为每条消息设置一个全局唯一的 messageId,消费者拿到消息后,使 用 setnx 命令,将 messageId 作为 key 放到 redis 中:setnx(messageId, 1),若返回 1,说 明之前没有消费过,正常消费;若返回 0,说明这条消息之前已消费过,抛弃;
配置类
server: port: 8080 spring: application: name: idempotent-to-exchange rabbitmq: host: 192.168.11.146 port: 5672 username: admin password: 123456 virtual-host: powernode publisher-confirm-type: correlated publisher-returns: true listener: simple: acknowledge-mode: manual data: redis: host: 192.168.11.144 port: 6379 password: 123456 my: exchangeName: exchange.idempotent queueAName: queue.idempotent
实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {
private String id;
private String orderName;
private BigDecimal orderMoney;
private Date orderTime;
}
配置类
package com.xxp.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class RabbitMqConfig { @Value("${my.exchangeName}") private String exchangeName; @Value("${my.queueAName}") private String queueAName; @Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange(exchangeName).build(); } //创建队列A @Bean public Queue queueA(){ return new Queue(queueAName); } //链接A队列 @Bean public Binding bindingA(DirectExchange directExchange,Queue queueA){ return BindingBuilder.bind(queueA).to(directExchange).with("info"); } }
生产者
package com.xxp.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.xxp.vo.Orders; import jakarta.annotation.PostConstruct; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.Date; @Service @Slf4j public class MessageService { @Resource private RabbitTemplate rabbitTemplate; @Resource ObjectMapper objectMapper; //序列化和反序列化 @Value("${my.exchangeName}") private String exchangeName; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) -> { if (ack) { log.info("消息成功到达交换机"); return; } log.error("消息没有到达交换机,原因是:{}" + cause); } ); rabbitTemplate.setReturnsCallback( returnedMessage -> log.error("消息被返回啦,错误原因是:{}", returnedMessage.getMessage()) ); } public void SendMsg() throws JsonProcessingException { { Orders order1 = Orders.builder() .id("order_123456") .orderName("遥遥领先") .orderMoney(new BigDecimal(8888)) .orderTime(new Date()).build(); String strOrder1 = objectMapper.writeValueAsString(order1); CorrelationData correlationData = new CorrelationData(); correlationData.setId("ackId"); rabbitTemplate.send(exchangeName, "info", MessageBuilder.withBody(strOrder1.getBytes()).build(), correlationData); log.info("消息发送完毕,时间为:{}", new Date()); } { Orders order2 = Orders.builder().id("order_123456").orderName("遥遥领先").orderMoney(new BigDecimal(8888)).orderTime(new Date()).build(); String strOrder2 = objectMapper.writeValueAsString(order2); CorrelationData correlationData = new CorrelationData(); correlationData.setId("ackId"); rabbitTemplate.send(exchangeName, "info", MessageBuilder.withBody(strOrder2.getBytes()).build(), correlationData); log.info("消息发送完毕,时间为:{}", new Date()); } } }
消费者
package com.xxp.message; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import com.xxp.vo.Orders; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; @Component @Slf4j public class ReceiveMessage { @Resource private ObjectMapper objectMapper; @Resource StringRedisTemplate stringRedisTemplate; @RabbitListener(queues = {"queue.idempotent"}) public void receiveMessage(Message message, Channel channel){ MessageProperties properties = message.getMessageProperties(); //获取消息的唯一标识 long deliveryTag = properties.getDeliveryTag(); try{ //获取消息 Orders orders = objectMapper.readValue(message.getBody(), Orders.class); log.info("获取的消息为{}",orders.toString()); //TODO 插入订单 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("idempotent" + orders.getId(), orders.getId()); if (result==true){ //TODO 像插入数据库 log.info("插入数据库"); } //参数1为 唯一标识 参数2 false只确认当前消息 channel.basicAck(deliveryTag,false); }catch (Exception e){ try { channel.basicNack(deliveryTag,false,true);//参数一 消息唯一标识 参数二 只确认当前消息 参数三 是否重新入队 } catch (IOException ex) { throw new RuntimeException(ex); } } } }
RabbitMQ 的集群分两种模式,一种是默认集群模式,一种是镜像集群模式;
在 RabbitMQ 集群中所有的节点(一个节点就是一个 RabbitMQ 的 broker 服务器) 被归为两 类:
一类是磁盘节点,一类是内存节点;
磁盘节点会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中,而内存 节点只会将这些信息保存到内存中,
如果该节点宕机或重启,内存节点的数据会全部丢失, 而磁盘节点的数据不会丢失
默认集群模式也叫 普通集群模式、或者 内置集群模式
RabbitMQ 默认集群模式,只会把交换机、队列、虚拟主机等元数据信息在各 个节点同步,
而具体队列中的消息内容不会在各个节点中同步
元数据的解释:
队列元数据:队列名称和属性(是否可持久化,是否自动删除)
交换器元数据:交换器名称、类型和属性
绑定元数据:交换器和队列的绑定列表
vhost元数据:vhost 内的相关属性,如安全属性等;
当用户访问其中任何一个 RabbitMQ 节点时,查询到的 queue/user/ exchange/vhost 等信息都是相同的;
但集群中队列的具体信息数据只在队列的拥有者节点保存,其他节点只知道队列 的元数据和指向该节点的指针,所以其他节点接收到不属于该节点队列的消息时 会将该消息传递给该队列的拥有者节点上;
为什么集群不复制队列内容和状态到所有节点:
1)存储空间; 2)性能;
如果消息需要复制到集群中每个节点,网络开销不可避免,持久化消息还需要写 磁盘,占用磁盘空间。
1 从已经安装好 rabbitmq 的机器 clone 三台机器
2 重新设置三台机器的 mac 地址 注意 clone 完,先不要启动三台机器,三台机器均要重新生成 mac 地址,防止 clone 出 的机器 ip 地址重复
3 启动三台机器
并查看ip地址
ip a
4 使用 xshell 连接三台机器
5 修改三台机器的/etc/hosts 文件
首先需要配置一下 hosts 文件,因为 RabbitMQ 集群节点名称是读取 hosts 文件得到的;
vim /etc/hosts
192.168.11.145 rabbitmq1
192.168.11.147 rabbitmq2
192.168.11.148 rabbitmq3
6 三台机器均重启网络,使节点名生效
systemctl restart network
7 三台机器的 xshell 均退出,然后再重新连
8 三台机器的防火墙处理
systemctl status firewalld
systemctl stop firewalld --关闭防火墙
systemctl disable firewalld --开机不启动防火墙
9 三台机器 .erlang.cookie 文件保持一致
由于是 clone 出的三台机器,所以肯定是一样
如果我们使用解压缩方式安装的 RabbitMQ,那么该文件会在${用户名}目录下,
也就是${用户名}/.erlang.cookie;
如果我们使用 rpm 安装包方式进行安装,那么这个文件会在/var/lib/rabbitmq 目录下
注意 .erlang.cookie 的权限为 400,目前已经是 400
10 分别启动三台机器上的 rabbitmq
rabbitmq-server -detached
11 查看集群状态
rabbitmqctl cluster_status
12 构建集群
在 rabbitmq2 机器上执行命令,让 2 的 rabbitmq 加
./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl join_cluster rabbit@rabbit1 --ram
./rabbitmqctl start_app
–ram 参数表示让 rabbitmq2 成为一个内存节点,如果不带参数默认为 disk 磁盘节点
在 rabbit130 节点上也执行同样的命令,使 rabbit3
./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl join_cluster rabbit@rabbit1 --ram
./rabbitmqctl start_app
13 操作一个节点,添加用户和权限等
#列出用户
rabbitmqctl list_users
# 添加用户
rabbitmqctl add_user admin 123456
#查看权限
rabbitmqctl list_permissions
#设置权限
rabbitmqctl set_permissions admin ".*" ".*" ".*" #设置角色
rabbitmqctl set_user_tags admin administrator
#启动 web 控制台插件
./rabbitmq-plugins enable rabbitmq_management
使用 web 浏览器添加一个虚拟主机 :powernod
14 再次查看集群状态
当执行完操作以后我们在浏览器访问 web 管控台来看看效果;
随便在哪个节点打开 web 管控台都能看到集群环境各节点的信息;
也可以使用 ./rabbitmqctl cluster_status 查看集群状态
Springboot 连接集群
配置文件加入 addresses:
server: port: 8080 spring: application: name: cluster-to-exchange rabbitmq: # host: 192.168.11.146 # port: 5672 username: admin password: 123456 virtual-host: powernode publisher-confirm-type: correlated publisher-returns: true listener: simple: acknowledge-mode: manual addresses: 192.168.145:5672,192.168.11.147:5672,192.168.148:5672
镜像模式是基于默认集群模式加上一定的配置得来的; 在默认模式下的 RabbitMQ 集群,它会把所有节点的交换机、绑定、队列的元 数据进行复制确保所有节点都有一份相同的元数据信息,但是队列数据分为两种:
一种是队列的元数据信息(比如队列的最大容量,队列的名称等配置信息),
另 一种是队列里面的消息镜像模式,则是把所有的队列数据完全同步,包括元数据信息和消息数据信息, 当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模 式;
实现镜像模式也非常简单,它是在普通集群模式基础之上搭建而成的; 镜像队列配置命令
/rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority] -p Vhost: 可选参数,针对指定 vhost 下的 queue 进行设置;
Name: policy 的名称;(可以自己取个名字就可以)
Pattern: queue 的匹配模式(正则表达式);
Definition:镜像定义,包括三个部分 ha-mode, ha-params, ha-sync-mode;
priority:可选参数,policy 的优先级
例子
rabbitmqctl set_policy -p powernode ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。