赞
踩
普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。
此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。
当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。
这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。
大致的流程图如下图:
它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。
大致流程图如下图:
RabbitMQ 中的节点类型有两种:
RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。
搭建之前,有两个预备知识需要大家了解:
1.搭建集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie,我们在用 docker 创建 RabbitMQ 容器时,可以为之设置相应的 Cookie 值。
2.RabbitMQ 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。如果主机名 ping 不通,RabbitMQ 服务启动会失败(如果我们是在不同的服务器上搭建 RabbitMQ 集群,大家需要注意这一点,接下来的 2.2 小结,我们将通过 Docker 的容器连接 link 来实现容器之间的访问,略有不同)。
$ docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq:3.9.22
$ docker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq:3.9.22
$ docker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq:3.9.22
添加完参数之后会出现警告:RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
准备一个.erlang.cookie
文件,内容为 rabbitmq_cookie
。修改权限为读写,不修改会报权限错误 erlang.cookie must be accessible by owner only
$ chmod 600 .erlang.cookie
$ docker run -d --name mq01 --hostname rabbit01 -p 15671:15672 -p 5671:5672 rabbitmq:3.9.22-management
$ docker run -d --name mq02 --hostname rabbit02 -p 15672:15672 -p 5672:5672 --link mq01:mylink01 rabbitmq:3.9.22-management
$ docker run -d --name mq03 --hostname rabbit03 -p 15673:15672 -p 5673:5672 --link mq01:mylink02 --link mq02:mylink03 rabbitmq:3.9.22-management
$ docker cp .erlang.cookie mq01:/var/lib/rabbitmq
$ docker cp .erlang.cookie mq02:/var/lib/rabbitmq
$ docker cp .erlang.cookie mq03:/var/lib/rabbitmq
重启三个节点
$ docker restart mq01 mq02 mq03
三个节点现在就启动好了,注意在 mq02 和 mq03 中,分别使用了 --link
参数来实现容器连接,另外还需要注意,mq03 容器中要既能够连接 mq01 也能够连接 mq02。
--link
说明:
\1. 当新建一个容器时,如果没有显示指定其使用的网络,那么默认会使用 bridge 网络
\2. 当一个容器 link 到另一个容器时,该容器可以通过 IP 或容器名称访问被 link 的容器,而被 link 容器可以通过 IP 访问该容器,但是无法通过容器名称访问
\3. 当被 link 的容器被删除时,创建 link 的容器也无法正常使用
\4. 如果两个容器被加入到我们手动创建的网络时,那么该网络内的容器相互直接可以通过 IP 和名称同时访问。
接下来开始集群的配置
进入 mq01
$ docker exec -it mq01 /bin/bash
$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl start_app
分别进入 mq02 和 mq03,执行如下命令。
$ docker exec -it mq02 /bin/bash
$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl join_cluster --ram rabbit@rabbit01
$ rabbitmqctl start_app
进入任意一个容器内,输入以下命令查看集群状态
$ rabbitmqctl cluster_status
application.yml 配置如下
server:
port: 9092
spring:
rabbitmq:
host: localhost
port: 5671
password: guest
username: guest
virtual-host: /
rabbitTemplate 配置如下
@Configuration public class RabbitTemplateConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 必须是 prototype 类型 public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } }
创建队列、交换机以及绑定关系
@Configuration public class DirectRabbitMQConfig { public static final String CLUSTER_EXCHANGE_NAME = "cluster_direct_exchange"; public static final String CLUSTER_QUEUE_NAME = "cluster_direct_queue"; public static final String CLUSTER_ROUTING_KEY = "cluster"; @Bean public DirectExchange clusterDirectExchange() { return new DirectExchange(CLUSTER_EXCHANGE_NAME,true,false); } @Bean public Queue clusterDirectQueue() { return new Queue(CLUSTER_QUEUE_NAME,true,false,false); } @Bean public Binding clusterDirectBinding(@Qualifier("clusterDirectQueue") Queue clusterDirectQueue, @Qualifier("clusterDirectExchange") DirectExchange clusterDirectExchange) { return BindingBuilder.bind(clusterDirectQueue).to(clusterDirectExchange).with(CLUSTER_ROUTING_KEY); } }
在单元测试中进行消息发送测试
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void directQueueTest() {
rabbitTemplate.convertAndSend(DirectRabbitMQConfig.CLUSTER_EXCHANGE_NAME,DirectRabbitMQConfig.CLUSTER_ROUTING_KEY,"Hello Rabbitmq cluster");
}
}
消费者端代码
@Service
public class DirectClusterConsumer {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues = {"cluster_direct_queue"})
public void receiveClusterMessage(String message) {
logger.info("接收到消息: " + message);
}
}
在三个节点都启动的状态下,先发布一条消息,然后停掉主节点,也就是 mq01
然后动消费者服务,可以看到服务报错,无法获取到队列消息
此时,停止任意子节点,这里停止 mq02
节点
可以看到,消息队列的各项功能都不受影响。
镜像集群不需要额外搭建,只需要将队列配置为镜像队列即可。这个配置可以通过网页配置,也可以通过命令行配置。
点击 Admin 选项卡,然后点击右边的 Policies
,再点击 Add/update a policy
,如下图:
然后添加策略
各参数含义如下:
Name
:policy 的名称
Pattern
:queue 的匹配模式(正则表达式)
Definition
:镜像定义,主要由三个参数:ha-mode
,ha-params
,ha-sync-mode
。
ha-mode
:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。ha-params
:ha-mode 模式需要用到的参数ha-sync-mode
:进行队列中消息的同步方式,有效值为 automatic 和 manual。priority
为可选参数,表示 policy 的优先级添加完效果如下
命令行的配置格式如下:
$ rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
$ docker exec -it rabbit01 bash
$ rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
停止 mq01
节点,可以看到此时消息队列集群没有任何影响
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。