赞
踩
书接上回,我们讲到了RabbitMQ的基本使用和进阶用法,这篇文章我们来讲讲什么是RabbitMQ集群
基本诉求:
工作机制:
节点之间能够互相发现
CentOS发行版的版本≥CentOS 8 Stream
镜像下载地址:https://mirrors.163.com/centos/8-stream/isos/x86_64/CentOS-Stream-8-20240318.0-x86_64-dvd1.iso
RabbitMQ安装方式官方指南:
vim /etc/yum.repos.d/rabbitmq.repo
以下内容来自官方文档:https://www.rabbitmq.com/docs/install-rpm
# In /etc/yum.repos.d/rabbitmq.repo ## ## Zero dependency Erlang RPM ## [modern-erlang] name=modern-erlang-el8 # uses a Cloudsmith mirror @ yum.novemberain.com in addition to its Cloudsmith upstream. # Unlike Cloudsmith, the mirror does not have any traffic quotas baseurl=https://yum1.novemberain.com/erlang/el/8/$basearch https://yum2.novemberain.com/erlang/el/8/$basearch https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/$basearch repo_gpgcheck=1 enabled=1 gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key gpgcheck=1 sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 pkg_gpgcheck=1 autorefresh=1 type=rpm-md [modern-erlang-noarch] name=modern-erlang-el8-noarch # uses a Cloudsmith mirror @ yum.novemberain.com. # Unlike Cloudsmith, it does not have any traffic quotas baseurl=https://yum1.novemberain.com/erlang/el/8/noarch https://yum2.novemberain.com/erlang/el/8/noarch https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/noarch repo_gpgcheck=1 enabled=1 gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck=1 sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 pkg_gpgcheck=1 autorefresh=1 type=rpm-md [modern-erlang-source] name=modern-erlang-el8-source # uses a Cloudsmith mirror @ yum.novemberain.com. # Unlike Cloudsmith, it does not have any traffic quotas baseurl=https://yum1.novemberain.com/erlang/el/8/SRPMS https://yum2.novemberain.com/erlang/el/8/SRPMS https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/SRPMS repo_gpgcheck=1 enabled=1 gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck=1 sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 pkg_gpgcheck=1 autorefresh=1 ## ## RabbitMQ Server ## [rabbitmq-el8] name=rabbitmq-el8 baseurl=https://yum2.novemberain.com/rabbitmq/el/8/$basearch https://yum1.novemberain.com/rabbitmq/el/8/$basearch https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/$basearch repo_gpgcheck=1 enabled=1 # Cloudsmith's repository key and RabbitMQ package signing key gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck=1 sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 pkg_gpgcheck=1 autorefresh=1 type=rpm-md [rabbitmq-el8-noarch] name=rabbitmq-el8-noarch baseurl=https://yum2.novemberain.com/rabbitmq/el/8/noarch https://yum1.novemberain.com/rabbitmq/el/8/noarch https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/noarch repo_gpgcheck=1 enabled=1 # Cloudsmith's repository key and RabbitMQ package signing key gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck=1 sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 pkg_gpgcheck=1 autorefresh=1 type=rpm-md [rabbitmq-el8-source] name=rabbitmq-el8-source baseurl=https://yum2.novemberain.com/rabbitmq/el/8/SRPMS https://yum1.novemberain.com/rabbitmq/el/8/SRPMS https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/SRPMS repo_gpgcheck=1 enabled=1 gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key gpgcheck=0 sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300 pkg_gpgcheck=1 autorefresh=1 type=rpm-md
–nobest表示所需安装包即使不是最佳选择也接受
yum update -y --nobest
yum install -y erlang
# 导入GPG密钥
rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc'
rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key'
rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key'
# 下载 RPM 包
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm
# 安装
rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm
# 启用管理界面插件 rabbitmq-plugins enable rabbitmq_management # 启动 RabbitMQ 服务: systemctl start rabbitmq-server # 将 RabbitMQ 服务设置为开机自动启动 systemctl enable rabbitmq-server # 新增登录账号密码 rabbitmqctl add_user hanson 123456 # 设置登录账号权限 rabbitmqctl set_user_tags hanson administrator rabbitmqctl set_permissions -p / hanson ".*" ".*" ".*" # 配置所有稳定功能 flag 启用 rabbitmqctl enable_feature_flag all # 重启RabbitMQ服务生效 systemctl restart rabbitmq-server
rm -rf /etc/yum.repos.d/rabbitmq.repo
通过克隆操作,一共准备三台VMWare虚拟机
集群节点名称 | 虚拟机 IP 地址 |
---|---|
node01 | 192.168.200.100 |
node02 | 192.168.200.150 |
node03 | 192.168.200.200 |
在CentOS 7中,可以使用nmcli
命令行工具修改IP地址。以下是具体步骤:
nmcli con show
<connection_name>
替换为实际的网络连接名称):nmcli con down <connection_name>
<connection_name>
替换为实际的网络连接名称,将<new_ip_address>
替换为新的IP地址,将<subnet_mask>
替换为子网掩码,将<gateway>
替换为网关):# <new_ip_address>/<subnet_mask>这里是 CIDR 表示法
nmcli con mod <connection_name> ipv4.addresses <new_ip_address>/<subnet_mask>
nmcli con mod <connection_name> ipv4.gateway <gateway>
nmcli con mod <connection_name> ipv4.method manual
nmcli con up <connection_name>
ip addr show
主机名称会被RabbitMQ作为集群中的节点名称,后面会用到,所以需要设置一下。
修改方式如下:
vim /etc/hostname
为了在后续操作过程中,万一遇到操作失误,友情建议拍摄快照。
修改文件/etc/hosts,追加如下内容:
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03
[root@node01 ~]# cat /var/lib/rabbitmq/.erlang.cookie
NOTUPTIZIJONXDWWQPOJ
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
修改文件/etc/hosts,追加如下内容:
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03
node02和node03都改成和node01一样:
vim /var/lib/rabbitmq/.erlang.cookie
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node01
rabbitmqctl start_app
修改文件/etc/hosts,追加如下内容:
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03
node02和node03都改成和node01一样:
vim /var/lib/rabbitmq/.erlang.cookie
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node01
rabbitmqctl start_app
rabbitmqctl cluster_status
如有需要踢出某个节点,则按下面操作执行:
# 被踢出的节点:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
# 节点1
rabbitmqctl forget_cluster_node rabbit@node02
两个需要暴露的端口:
目前集群方案:
管理界面负载均衡:
核心功能负载均衡:
yum install -y haproxy
haproxy -v
systemctl start haproxy
systemctl enable haproxy
配置文件位置:
/etc/haproxy/haproxy.cfg
在配置文件末尾增加如下内容:
frontend rabbitmq_ui_frontend
bind 192.168.200.100:22222
mode http
default_backend rabbitmq_ui_backendbackend rabbitmq_ui_backend
mode http
balance roundrobin
option httpchk GET /
server rabbitmq_ui1 192.168.200.100:15672 check
server rabbitmq_ui2 192.168.200.150:15672 check
server rabbitmq_ui3 192.168.200.200:15672 check
设置SELinux策略,允许HAProxy拥有权限连接任意端口:
setsebool -P haproxy_connect_any=1
SELinux是Linux系统中的安全模块,它可以限制进程的权限以提高系统的安全性。在某些情况下,SELinux可能会阻止HAProxy绑定指定的端口,这就需要通过设置域(domain)的安全策略来解决此问题。
通过执行
setsebool -P haproxy_connect_any=1
命令,您已经为HAProxy设置了一个布尔值,允许HAProxy连接到任意端口。这样,HAProxy就可以成功绑定指定的socket,并正常工作。
重启HAProxy:
systemctl restart haproxy
frontend rabbitmq_frontend
bind 192.168.200.100:11111
mode tcp
default_backend rabbitmq_backendbackend rabbitmq_backend
mode tcp
balance roundrobin
server rabbitmq1 192.168.200.100:5672 check
server rabbitmq2 192.168.200.150:5672 check
server rabbitmq3 192.168.200.200:5672 check
重启HAProxy服务:
systemctl restart haproxy
[1]配置POM
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
[2]主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQProducerMainType {
public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerMainType.class, args);
}
}
[3]配置YAML
spring:
rabbitmq:
host: 192.168.200.100
port: 11111
username: hanson
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
com.hanson.mq.config.MQProducerAckConfig: info
[4]配置类
import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息发送到交换机成功!数据:" + correlationData); } else { log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause); } } @Override public void returnedMessage(ReturnedMessage returned) { log.info("消息主体: " + new String(returned.getMessage().getBody())); log.info("应答码: " + returned.getReplyCode()); log.info("描述:" + returned.getReplyText()); log.info("消息使用的交换器 exchange : " + returned.getExchange()); log.info("消息使用的路由键 routing : " + returned.getRoutingKey()); } }
[5] Junit测试类
import jakarta.annotation.Resource; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest public class RabbitMQTest { @Resource private RabbitTemplate rabbitTemplate; public static final String EXCHANGE_CLUSTER_TEST = "exchange.cluster.test"; public static final String ROUTING_KEY_CLUSTER_TEST = "routing.key.cluster.test"; @Test public void testSendMessage() { rabbitTemplate.convertAndSend(EXCHANGE_CLUSTER_TEST, ROUTING_KEY_CLUSTER_TEST, "message test cluster~~~"); } }
[1]配置POM
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
[2]主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQProducerMainType {
public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerMainType.class, args);
}
}
[3]配置YAML
spring:
rabbitmq:
host: 192.168.200.100
port: 11111
username: hanson
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual
logging:
level:
com.hanson.mq.listener.MyProcessor: info
[4]监听器
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @Slf4j public class MyProcessor { @RabbitListener(queues = {"queue.cluster.test"}) public void processNormalQueueMessage(String data, Message message, Channel channel) throws IOException { log.info("消费端:" + data); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
[5]运行效果
集群化分布:
创建仲裁队列后,会自动的分布在不同的实例上面
说明
:鉴于仲裁队列的功能,肯定是需要在前面集群的基础上操作!
和仲裁队列绑定的交换机没有特殊,我们还是创建一个direct交换机即可
交换机名称:exchange.quorum.test
队列名称:queue.quorum.test
路由键:routing.key.quorum.test
像使用经典队列一样发送消息、消费消息
public static final String EXCHANGE_QUORUM_TEST = "exchange.quorum.test";
public static final String ROUTING_KEY_QUORUM_TEST = "routing.key.quorum.test";
@Test
public void testSendMessageToQuorum() {
rabbitTemplate.convertAndSend(EXCHANGE_QUORUM_TEST, ROUTING_KEY_QUORUM_TEST, "message test quorum ~~~");
}
public static final String QUEUE_QUORUM_TEST = "queue.quorum.test";
@RabbitListener(queues = {QUEUE_QUORUM_TEST})
public void quorumMessageProcess(String data, Message message, Channel channel) throws IOException {
log.info("消费端:" + data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
# 停止rabbit应用
rabbitmqctl stop_app
收发消息仍然正常
核心机制
总体评价
像Kafka但远远比不上Kafka
说明
:只有启用了Stream插件,才能使用流式队列的完整功能
在集群每个节点中依次执行如下操作:
# 启用Stream插件
rabbitmq-plugins enable rabbitmq_stream
# 重启rabbit应用
rabbitmqctl stop_app
rabbitmqctl start_app
# 查看插件状态
rabbitmq-plugins list
在文件/etc/haproxy/haproxy.cfg末尾追加:
frontend rabbitmq_stream_frontend
bind 192.168.200.100:33333
mode tcp
default_backend rabbitmq_stream_backend
backend rabbitmq_stream_backend
mode tcp
balance roundrobin
server rabbitmq1 192.168.200.100:5552 check
server rabbitmq2 192.168.200.150:5552 check
server rabbitmq3 192.168.200.200:5552 check
Stream 专属 Java 客户端官方网址:https://github.com/rabbitmq/rabbitmq-stream-java-client
Stream 专属 Java 客户端官方文档网址:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>stream-client</artifactId> <version>0.15.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> </dependencies>
说明
:不需要创建交换机
Environment environment = Environment.builder()
.host("192.168.47.100")
.port(33333)
.username("hanson")
.password("123456")
.build();
environment.streamCreator().stream("stream.atguigu.test").create();
environment.close();
[1]官方文档
Internally, the
Environment
will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.
翻译:
在内部,Environment将查询broker以了解流的拓扑结构,并将创建或重用连接以发布到流的 leader 节点。
[2]解析
[3]配置
为了让本机的应用程序知道 Leader 节点名称对应的 IP 地址,我们需要在本地配置 hosts 文件,建立从节点名称到 IP 地址的映射关系
Environment environment = Environment.builder() .host("192.168.200.100") .port(33333) .username("hanson") .password("123456") .build(); Producer producer = environment.producerBuilder() .stream("stream.atguigu.test") .build(); byte[] messagePayload = "hello rabbit stream".getBytes(StandardCharsets.UTF_8); CountDownLatch countDownLatch = new CountDownLatch(1); producer.send( producer.messageBuilder().addData(messagePayload).build(), confirmationStatus -> { if (confirmationStatus.isConfirmed()) { System.out.println("[生产者端]the message made it to the broker"); } else { System.out.println("[生产者端]the message did not make it to the broker"); } countDownLatch.countDown(); }); countDownLatch.await(); producer.close(); environment.close();
Environment environment = Environment.builder() .host("192.168.200.100") .port(33333) .username("hanson") .password("123456") .build(); environment.consumerBuilder() .stream("stream.atguigu.test") .name("stream.atguigu.test.consumer") .autoTrackingStrategy() .builder() .messageHandler((offset, message) -> { byte[] bodyAsBinary = message.getBodyAsBinary(); String messageContent = new String(bodyAsBinary); System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset()); }) .build();
The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:
- OffsetSpecification.first(): starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).
- OffsetSpecification.last(): starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty).
- OffsetSpecification.next(): starting from the next offset to be written. Contrary to
OffsetSpecification.last()
, consuming withOffsetSpecification.next()
will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream.- OffsetSpecification.offset(offset): starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application.
- OffsetSpecification.timestamp(timestamp): starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary.
Environment environment = Environment.builder() .host("192.168.200.100") .port(33333) .username("hanson") .password("123456") .build(); CountDownLatch countDownLatch = new CountDownLatch(1); Consumer consumer = environment.consumerBuilder() .stream("stream.atguigu.test") .offset(OffsetSpecification.first()) .messageHandler((offset, message) -> { byte[] bodyAsBinary = message.getBodyAsBinary(); String messageContent = new String(bodyAsBinary); System.out.println("[消费者端]messageContent = " + messageContent); countDownLatch.countDown(); }) .build(); countDownLatch.await(); consumer.close();
Federation插件的设计目标是使RabbitMQ在不同的Broker节点之间进行消息传递而无须建立集群。
它可以在不同的管理域中的Broker或集群间传递消息,这些管理域可能设置了不同的用户和vhost,也可能运行在不同版本的RabbitMQ和Erlang上。Federation基于AMQP 0-9-1协议在不同的Broker之间进行通信,并且设计成能够容忍不稳定的网络连接情况。
为了执行相关测试,我们使用Docker创建两个RabbitMQ实例。
特别提示:由于Federation机制的最大特点就是跨集群同步数据,所以这两个Docker容器中的RabbitMQ实例不加入集群!!!是两个独立的broker实例。
docker run -d \ --name rabbitmq-shenzhen \ -p 51000:5672 \ -p 52000:15672 \ -v rabbitmq-plugin:/plugins \ -e RABBITMQ_DEFAULT_USER=guest \ -e RABBITMQ_DEFAULT_PASS=123456 \ rabbitmq:3.13-management docker run -d \ --name rabbitmq-shanghai \ -p 61000:5672 \ -p 62000:15672 \ -v rabbitmq-plugin:/plugins \ -e RABBITMQ_DEFAULT_USER=guest \ -e RABBITMQ_DEFAULT_PASS=123456 \ rabbitmq:3.13-management
在上游、下游节点中都需要开启。
Docker容器中的RabbitMQ已经开启了rabbitmq_federation,还需要开启rabbitmq_federation_management
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
rabbitmq_federation_management插件启用后会在Management UI的Admin选项卡下看到:
在下游节点填写上游节点的连接信息:
特别提示:
所在机房 | 交换机名称 | 路由键 | 队列名称 |
---|---|---|---|
深圳机房(上游) | federated.exchange.demo | routing.key.demo.test | queue.normal.shenzhen |
上海机房(下游) | federated.exchange.demo | routing.key.demo.test | queue.normal.shanghai |
创建组件后可以查看一下联邦状态,连接成功的联邦状态如下:
在上游节点向交换机发布消息:
看到下游节点接收到了消息:
Federation队列和Federation交换机的最核心区别就是:
上游节点和下游节点中队列名称是相同的,只是下游队列中的节点附加了联邦策略而已
所在机房 | 交换机 | 路由键 | 队列 |
---|---|---|---|
深圳机房(上游) | exchange.normal.shenzhen | routing.key.normal.shenzhen | fed.queue.demo |
上海机房(下游) | —— | —— | fed.queue.demo |
上游节点都是常规操作,此处省略。重点需要关注的是下游节点的联邦队列创建时需要指定相关参数:
创建组件后可以查看一下联邦状态,连接成功的联邦状态如下:
在上游节点向交换机发布消息:
但此时发现下游节点中联邦队列并没有接收到消息,这是为什么呢?这里就体现出了联邦队列和联邦交换机工作逻辑的区别。
对联邦队列来说,如果没有监听联邦队列的消费端程序,它是不会到上游去拉取消息的!
如果有消费端监听联邦队列,那么首先消费联邦队列自身的消息;如果联邦队列为空,这时候才会到上游队列节点中拉取消息。
所以现在的测试效果需要消费端程序配合才能看到:
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
节点 | 交换机 | 路由键 | 队列 |
---|---|---|---|
深圳节点 | exchange.shovel.test | exchange.shovel.test | queue.shovel.demo.shenzhen |
上海节点 | —— | —— | queue.shovel.demo.shanghai |
到此,RabbitMQ内容结束,如果对你有帮助,希望给个三连
文章代码:GitHub
如果这篇文章对您,希望可以点赞、收藏支持一下
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。