当前位置:   article > 正文

RabbitMQ——常见问题(笔记)_correlationdata

correlationdata

一、MQ的一些常见问题

  1. 消息可靠性问题:如何确保发送的消息至少被消费一次
  2. 延迟消息问题:如何实现消息的延迟投递
  3. 高可用性问题:如何解决数百万消息堆积,无法及时消费的问题
  4. 消息堆积问题:如何避免单点的MQ故障而导致的不可用问题

二、 消息可靠性问题

  1. 发送时丢失:
    • 没有到交换机
    • 到达交换机但没有到队列
  2. MQ宕机,queue将消息丢失
  3. consumer接收到消息后未消费就宕机
publisher
exchange
queue1
queue2
consumer1
consumer2

2.1 生产者消息确认

生产者确认机制

RabbitMQ提供了publisher confirm机制,来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
每一个消息有一个全局唯一ID,以区分不同的消息,避免ACK冲突

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,且成功投递到队列,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但没有路由到队列。返回ACK,及路由失败原因

接下来我们实现该逻辑,部署部分可以参考

MessageQueue消息队列——基础(笔记)

docker run -e RABBITMQ_DEFAULT_USER=yjx23332 -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

创建如下结构
在这里插入图片描述

为publisher配置好yml文件

spring:
  rabbitmq:
    host: IP
    port: 5672
    username: yjx23332
    password: 123456
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • publisher-confirm-type:生产者确认类型
    • simple:同步等待confirm结果,直到超时
    • correlated:一部回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-returns功能,同样基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义路由失败时的策略。true:则调用ReturnCallback,false:则直接丢弃消息。

Spring必须掌握的Bean增强扩展点与加载流程

每一个RabbitTemplate只能配置一个ReturnCallback(publish-returns),因此需要在项目启动过程中配置。我们在publisher中配置

package com.yjx23332.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
/**
 * 但Spring容器创建完成后,会通知该方法
 * */
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.error("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    returnedMessage.getReplyCode(),
                    returnedMessage.getReplyText(),
                    returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey(),
                    returnedMessage.getMessage());
            //如果失败了,还可以通知管理员、重发等
        });

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

但是不同于publish-returns对于每一个RabbitTemplate 只能有一个的特点,publisher-confirm让每一个发送都可以有一个实现
我们接下来,在test中进行配置,记得创建对应的交换机(topic)和绑定对应的队列。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

package com.yjx23332.mq;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.UUID;

import static java.lang.Thread.sleep;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue()throws InterruptedException{
        String routingKey = "simple.test";
        String message = "hello,Spring amqp!";
        String exchange = "amq.topic";
        //消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(result -> {
            //判断结果
            if(result.isAck()){
                //ACK
                log.info("投递到交换机成功!消息ID:{}",correlationData.getId());
            }
            else{
                //NACK
                log.error("投递到交换机失败!消息ID:{}",correlationData.getId());
            }
        }, ex -> {
            //记录日志
            log.error("消息发送失败",ex);
            //也可以重发
        });
        rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
        //避免结束直接关闭,但是异步没有结束
        sleep(1000);
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

成功
在这里插入图片描述

填错交换机,投递交换机失败
在这里插入图片描述
routinkey填错,放入队列失败
在这里插入图片描述

2.2 消息的持久化

Durable,代表持久化的意思。
在这里插入图片描述
在创建的时候选择
在这里插入图片描述
代码部分,想要持久化,可以

	//交换机
	@Bean
	public DirectExchange simpleExchange(){
	/**
	*@param	交换机名称,是否持久化,当没有queue与其绑定时是否自动删除
	*/
		return new DirectExchange("simple.direct",true,false);
	}
	//队列
	public Queue simpleQueue(){
		return QueueBuilder.durable("simple.queue").build();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

消息默认时持久化的,我们可以自己修改

		Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))//消息体
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//持久化模式
                .build();
        rabbitTemplate.convertAndSend(exchange,routingKey,msg,correlationData);
  • 1
  • 2
  • 3
  • 4

2.3 消费者消息确认

消费者处理消息后可以向MQ发送ACK绘制,MQ收到ACK绘制后,才会删除该消息。
允许三种确认模式:

  • manual:手动ACK,需要在业务代码结束后,调用API发送ACK
  • auto:自动ACK,由Spring检测listener代码是否出异常,没有异常则返回ACK,抛出异常则返回NACK(环绕增强)。
  • none:关闭ACK,MQ假定消费这获取消息后会成功处理,因此投递后立即删除。

接下来我们配置消费者的yml文件

spring:
  rabbitmq:
    host: IP
    port: 5672
    username: yjx23332
    password: 123456
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 每次最多取多少数据
        acknowledge-mode: auto # 确认策略
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
package com.yjx23332.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple.queue"),
            exchange = @Exchange(value = "amq.topic",type = ExchangeTypes.TOPIC),
            key = {"simple.#"}
    ))
    public void listenSimpleQueue(String msg){
        System.out.println(1/0);
        System.out.println("消费者接收到simple.queue的消息:【"+msg+"】");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

此时会不断的循环处理。因为出错,消息会requeue(重新入队),然后重新被取出
在这里插入图片描述

换为none,不会不断的执行,执行失败了也没有问题
在这里插入图片描述
换为manual,队列会卡住。可以看到uncaked,在等待答复。
在这里插入图片描述

2.4 失败重试机制

当出现异常后,requeue会让MQ消息处理飙升,带来压力
利用Spring的retry机制,在消息异常时,本地重试。

对消费者进行配置

spring:
  rabbitmq:
    host: IP地址
    port: 5672
    username: yjx23332
    password: 123456
    virtual-host: /
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: manual
        retry:
          enabled: true # 开启消费者失败重试机制
          initial-interval: 1000 # 初次的失败等待时间为1000毫秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval(最新等待时长)
          max-attempts: 3 # 最大重试次数
          stateless: true # 是否有状态?事务则是用,false有状态
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

最后如果还是失败了(Retries exhausted 重试耗尽),消息就会被拒绝。默认为丢弃,即返回了一个Reject。

当开启重试模式后,重试耗尽,失败消息处理策略用MessageRecoverer接口处理,包含三种实现。

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接丢弃消息,默认使用。
  • ImmediateRequeueRecoverer:耗尽后,返回nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定交换机。
    重发机制,可以把它发到error.queue,随后有安排消费者去处理。

在消费中,我们准备一个错误处理的交换机和队列

package com.yjx23332.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
/**
 * 但Spring容器创建完成后,会通知该方法
 * */
public class CommonConfig{
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean Queue errorQueue(){
        return new Queue("error.queue",true);
    }
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

使用auto,然后重启。

在这里插入图片描述
在这里插入图片描述

三、延迟消息问题

3.1 死信交换机

队列中的消息满足下列情况之一时,就可以称为死信(dead letter)

  • 消费者使用basic.reject或者basic,nack声明消费失败,且消息的requeue的参数被设置为了false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息成为死信

死信交换机

  • 如果该队列配置了dead-letter-exchange属性,制定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,DLX)。死信交换机把消息送到的队列被称为死信队列。

与RepublishMessageRecoverer区别是

  1. RepublishMessageRecoverer:由消费者投递消息
  2. dead-letter-exchange:由队列投递消息
  3. RepublishMessageRecoverer无法处理队列中的死信
  4. 如果只是做异常消息的兜底,那么建议用RepublishMessageRecoverer

queue需要配置

  1. dead-letter-exchange
  2. dead-letter-routing-key

3.2 超时机制TTL

Time-To-Live:如果一个消息在TTL结束后仍未消费,则会变为死信,ttl超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间

消息和队列都有TTL,则以TTL更短的时间为准

通过存活时间超时,然后由死信交换机投递到对应的队列和消费者,来做到延迟处理。
我们用监听器监听如下内容

	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue"),
            exchange = @Exchange(value = "dl.direct",type = ExchangeTypes.DIRECT),
            key = {"dl"}
    ))
    public void listendeadQueue(String msg){
        log.info("消费者接收到dl的延迟消息消息:【"+msg+"】");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

我们建立如下交换机、队列

	@Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.direct");
    }
    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }
    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

对于发布者,我们新增一个

	@Test
    public void testTTLMessage(){
        Message message = MessageBuilder
                .withBody("hello ttl message".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
        log.info("消息已成功发送");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

可以看到大致是10秒左右
在这里插入图片描述
在这里插入图片描述
我们也可给消息设置时间

	Message message = MessageBuilder
                .withBody("hello ttl message".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setExpiration("5000")
                .build();
  • 1
  • 2
  • 3
  • 4
  • 5

3.3 延迟队列

利用TLL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列使用场景包括:

  • 延迟发送短信
  • 用户下单,用户15分钟未支付,则自动取消
  • 预约工作场景,20分钟后自动通知所有参会人员

RabbitMQ准备了DelayExchange插件,避免我们自己配置使用很麻烦。它不是基于队列,而是基于交换机。

官方原生插件地址
我们在里面找到如下内容后下载
注意版本,笔者用的版本只支持到3.9.0
在这里插入图片描述
通过如下命令,找到数据卷位置

docker volume inspect mq-plugins

上传到那个位置即可。
进入容器,开启插件

docker exec -it mq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启一下,就可以用了。

该插件将交换机功能进行了升级,是对官方交换机进行了一层封装,让交换机存储消息。
在这里插入图片描述
手动指定类
在这里插入图片描述

发消息时需要加一个消息头,类似如下图
在这里插入图片描述

使用上来说


/**
* 注解方式声明
*/
	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue",durable = "true"),
            exchange = @Exchange(value = "dl.direct",type = ExchangeTypes.DIRECT,delayed = "true"),
            key = {"dl"}
    ))
    public void listendeadQueue(String msg){
        log.info("消费者接收到dl的延迟消息消息:【"+msg+"】");
    }

/**
* Bean注解方式声明
*/
    @Bean
    public DirectExchange ttlExchange(){
        return  ExchangeBuilder.directExchange("dl.direct")
                .delayed()
                .durable(true)
                .build();
    }

/**
* 发送消息
*/
			Message message = MessageBuilder
                .withBody("hello ttl message".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay",1000)
               // .setExpiration("5000")
                .build();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 为何会报错
    因为它是基于官方的交换机实现,而官方的交换机要求是要立即转发。

四、消息堆积问题

当生产这发送消息的速度超过了消费者处理消息的速度,就会导致堆积。

解决的三种思路

  1. 增加更多的消费者,提高消费速度
  2. 提高单个消费这处理能力,在消费者内部开启线程池,适合业务耗时长的情况
  3. 加大队列容积,提高堆积上限

4.1 惰性队列

RabbitMQ 3.6.0开始增加的概念

特性

  1. 接收消息后直接存入磁盘而非内存
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存
  3. 支持数百万条的消息存储

惰性队列与持久化

  • lazy queue 消息不持久化 , 这种模式还是会把消息放到硬盘里,RAM的使用率会一直很稳定,但是重启后一样会丢失消息

声明时,使用即可
在这里插入图片描述

或者

	/**
	* Bean注解方式声明
	*/
 	@Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)
                .lazy()
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }
    /**
	* 注解方式声明
	*/
	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue",durable = "true"),
            exchange = @Exchange(value = "dl.direct",type = ExchangeTypes.DIRECT),
            key = {"dl"},
            arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listendeadQueue(String msg){
        log.info("消费者接收到dl的延迟消息消息:【"+msg+"】");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

优点:

  • 基于磁盘存储,消息上限高
  • 没有间歇性的Page-out性能(先内存再磁盘)比较稳定

缺点:

  • 基于磁盘存储,消息失效性会降低
  • 性能受限于磁盘IO

五、高可用性问题

5.1 集群分类

RabbitMQ是一个基于Erlang语言编写,Erlang是一个面向并发的语言,天然支持集群模式。

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。宕机将导致该节点数据无法使用。
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。主从同步存在延迟。
  • 仲裁队列:3.8版本后,底层使用Raft协议确保主从的数据一致性。

集群节点标示

rabbit@[hostname]

5.2 普通集群

又叫做标准集群(classic cluster)

  • 会在集群的各个节点间共享部分数据,包括交换机、队列元信息。不包含队列的消息。
  • 当访问集群中某个节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回。
  • 队列所在节点宕机,队列中的消息会消失(无法使用)。

集群中的节点共用一个Cookie,相同Cookie就允许相互通信。

我们获取一个Cookie

docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

保存下来后,作为共享的Cookie

随后我们rm掉我们现在在用的数据,并清空数据卷

docker rm -f mq
docker volume rm mq-plugins

在/tmp目录下,创建一个配置文件rabbitmq.conf

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1=rabbit@mq1
cluster_formation.classic_config.nodes.2=rabbit@mq2
cluster_formation.classic_config.nodes.3=rabbit@mq3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在tmp下,在创建一个文件,记录我们的Cookie

touch .erlang.cookie
echo “Cookie” > .erlang.cookie

修改权限

chmod 600 .erlang.cookie

在tmp下,准备3个目录mq1,mq2,mq3作为之后的数据卷

mkdir mq1 mq2 mq3

将我们配置好的全部拷贝进去

cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3

准备一个网络,让集群互联

docker network create mq-net

挂载,在/tmp位置执行,记得改mq1为mq2,mq3,以及端口暴露

docker run -e RABBITMQ_DEFAULT_USER=yjx23332 -e RABBITMQ_DEFAULT_PASS=123456 -v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v ${PWD}/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie --name mq1 --hostname mq1 -p 15672:15672 -p 5672:5672 --net mq-net -d rabbitmq:3-management

记得开放对应端口
在这里插入图片描述
如图,可以看到所有节点
在这里插入图片描述

我们在一个MQ里创建队列,另一个队列里面可以看到
在这里插入图片描述
中断创建队列的MQ后,从另一边发现队列为宕机状态。
在这里插入图片描述

5.3 镜像集群

本质是主从模式:

  • 交换机、队列、队列中的消息会在各个MQ的镜像节点之间同步备份
  • 创建队列的节点被称为该队列的主节点,备份到的其他节点叫作该队列的镜像节点(类似ElasticSearch,彼此之间相互备份)
  • 一个队列的主节点可能是另一个队列的镜像节点(镜像节点也可以继续进行备份)
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主节点宕机后,镜像节点会代替成为新的主节点

在普通集群的基础上进行配置

有三种模式

ha-modeha-params效果
准确模式 exactly队列的副本量 count集群中队列副本(中服务器和镜像服务器纸盒)的数量。count为1则意味着副本就是主节点。count为2就是一主节点一镜像。当集群数量小于count,主节点将镜像到所有节点。当集群数量大于则会随机选取进行镜像
all(none)队列在集群中的所有的节点之间进行镜像。队列将镜像到任何新加入的节点。镜像备份到所有节点的操作将对所有集群节点施加额外压力,包括网络I/O,磁盘I/O和磁盘空间使用情况。推荐使用exactly,设置副本数目为(N/2 + 1)
nodesnode names指定队列镜像创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建镜像节点到当前客户端连接到的节点

进入到容器中,进行操作

docker exec -it mq1 bash

输入下列的命令即可

  • exactly模式
    配置方式是通过命令来配置
    ha-two:规则名字,随便取
    “^two.”:regex正则表达,所有以two开头的主节点名字(队列/交换机)都会使用该模式
    ha-mode:模式
    ha-params:参数,此处是count
    ha-sync=mode:同步策略

rabbitmqctl set_policy ha-two “^two.” ‘{“ha-mode”:“exactly”,“ha-params”:2,“ha-sync-mode”:“automatic”}’

  • all模式

rabbitmqctl set_policy ha-all “^all.” ‘{“ha-mode”:“all”}’

  • nodes模式
    ha-params:此处是集群节点

rabbitmqctl set_policy ha-nodes “^nodes.” ‘{“ha-mode”:“nodes”,“ha-params”:[“rabbit@nodeA”,“rabbit@nodeB”]}’

可以在该处看到配置的规则
在这里插入图片描述

也可以在下面直接配置
在这里插入图片描述

5.4 仲裁队列

3.8以后才有的新功能,用来替代镜像队列

  • 与镜像队列一样,都是主从模式,支持初从数据同步
  • 使用非常简单,不用复杂的配置
  • 主从同步基于Raft协议,强一致

直接在控制台就可以创建
Quorum:仲裁队列
Node:指定主节点
在这里插入图片描述
代码创建
首先修改配置

spring:
  rabbitmq:
    addresses: Ip:端口,Ip:端口
    username: yjx23332
    password: 123456
    virtual-host: /
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

集群模式下,创建似乎不能绑定主节点等操作。同时监听器直接创建似乎也不可行。

	/**
	* Bean注解方式声明
	*/
	@Bean public Queue quorumQueue(){
		return QueueBuilder
				.durable("quorum.queue")
				.quorum()
				.build();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

参考文献

[1]Spring AMQP官网
[2]黑马程序员Java微服务
[3]RabbitMQ官方文档

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/996832
推荐阅读
相关标签
  

闽ICP备14008679号