赞
踩
生产者确认机制
RabbitMQ提供了publisher confirm机制,来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
每一个消息有一个全局唯一ID,以区分不同的消息,避免ACK冲突
接下来我们实现该逻辑,部署部分可以参考
docker run -e RABBITMQ_DEFAULT_USER=yjx23332 -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
创建如下结构
为publisher配置好yml文件
spring:
rabbitmq:
host: IP
port: 5672
username: yjx23332
password: 123456
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
每一个RabbitTemplate只能配置一个ReturnCallback(publish-returns),因此需要在项目启动过程中配置。我们在publisher中配置
package com.yjx23332.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j /** * 但Spring容器创建完成后,会通知该方法 * */ public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); rabbitTemplate.setReturnsCallback(returnedMessage -> { log.error("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage()); //如果失败了,还可以通知管理员、重发等 }); } }
但是不同于publish-returns对于每一个RabbitTemplate 只能有一个的特点,publisher-confirm让每一个发送都可以有一个实现
我们接下来,在test中进行配置,记得创建对应的交换机(topic)和绑定对应的队列。
package com.yjx23332.mq; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.concurrent.FailureCallback; import org.springframework.util.concurrent.SuccessCallback; import java.util.UUID; import static java.lang.Thread.sleep; @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2SimpleQueue()throws InterruptedException{ String routingKey = "simple.test"; String message = "hello,Spring amqp!"; String exchange = "amq.topic"; //消息ID CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result -> { //判断结果 if(result.isAck()){ //ACK log.info("投递到交换机成功!消息ID:{}",correlationData.getId()); } else{ //NACK log.error("投递到交换机失败!消息ID:{}",correlationData.getId()); } }, ex -> { //记录日志 log.error("消息发送失败",ex); //也可以重发 }); rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData); //避免结束直接关闭,但是异步没有结束 sleep(1000); } }
成功
填错交换机,投递交换机失败
routinkey填错,放入队列失败
Durable,代表持久化的意思。
在创建的时候选择
代码部分,想要持久化,可以
//交换机
@Bean
public DirectExchange simpleExchange(){
/**
*@param 交换机名称,是否持久化,当没有queue与其绑定时是否自动删除
*/
return new DirectExchange("simple.direct",true,false);
}
//队列
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
消息默认时持久化的,我们可以自己修改
Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))//消息体
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//持久化模式
.build();
rabbitTemplate.convertAndSend(exchange,routingKey,msg,correlationData);
消费者处理消息后可以向MQ发送ACK绘制,MQ收到ACK绘制后,才会删除该消息。
允许三种确认模式:
接下来我们配置消费者的yml文件
spring:
rabbitmq:
host: IP
port: 5672
username: yjx23332
password: 123456
virtual-host: /
listener:
simple:
prefetch: 1 # 每次最多取多少数据
acknowledge-mode: auto # 确认策略
package com.yjx23332.mq.listener; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class SpringRabbitListener { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "simple.queue"), exchange = @Exchange(value = "amq.topic",type = ExchangeTypes.TOPIC), key = {"simple.#"} )) public void listenSimpleQueue(String msg){ System.out.println(1/0); System.out.println("消费者接收到simple.queue的消息:【"+msg+"】"); } }
此时会不断的循环处理。因为出错,消息会requeue(重新入队),然后重新被取出
换为none,不会不断的执行,执行失败了也没有问题
换为manual,队列会卡住。可以看到uncaked,在等待答复。
当出现异常后,requeue会让MQ消息处理飙升,带来压力
利用Spring的retry机制,在消息异常时,本地重试。
对消费者进行配置
spring: rabbitmq: host: IP地址 port: 5672 username: yjx23332 password: 123456 virtual-host: / listener: simple: prefetch: 1 acknowledge-mode: manual retry: enabled: true # 开启消费者失败重试机制 initial-interval: 1000 # 初次的失败等待时间为1000毫秒 multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval(最新等待时长) max-attempts: 3 # 最大重试次数 stateless: true # 是否有状态?事务则是用,false有状态
最后如果还是失败了(Retries exhausted 重试耗尽),消息就会被拒绝。默认为丢弃,即返回了一个Reject。
当开启重试模式后,重试耗尽,失败消息处理策略用MessageRecoverer接口处理,包含三种实现。
在消费中,我们准备一个错误处理的交换机和队列
package com.yjx23332.mq.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j /** * 但Spring容器创建完成后,会通知该方法 * */ public class CommonConfig{ @Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean Queue errorQueue(){ return new Queue("error.queue",true); } @Bean public Binding errorBinding(){ return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error"); } @Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error"); } }
使用auto,然后重启。
队列中的消息满足下列情况之一时,就可以称为死信(dead letter)
死信交换机
与RepublishMessageRecoverer区别是
queue需要配置
Time-To-Live:如果一个消息在TTL结束后仍未消费,则会变为死信,ttl超时分为两种情况:
消息和队列都有TTL,则以TTL更短的时间为准
通过存活时间超时,然后由死信交换机投递到对应的队列和消费者,来做到延迟处理。
我们用监听器监听如下内容
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue"),
exchange = @Exchange(value = "dl.direct",type = ExchangeTypes.DIRECT),
key = {"dl"}
))
public void listendeadQueue(String msg){
log.info("消费者接收到dl的延迟消息消息:【"+msg+"】");
}
我们建立如下交换机、队列
@Bean public DirectExchange ttlExchange(){ return new DirectExchange("ttl.direct"); } @Bean public Queue ttlQueue(){ return QueueBuilder .durable("ttl.queue") .ttl(10000) .deadLetterExchange("dl.direct") .deadLetterRoutingKey("dl") .build(); } @Bean public Binding ttlBinding(){ return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl"); }
对于发布者,我们新增一个
@Test
public void testTTLMessage(){
Message message = MessageBuilder
.withBody("hello ttl message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
log.info("消息已成功发送");
}
可以看到大致是10秒左右
我们也可给消息设置时间
Message message = MessageBuilder
.withBody("hello ttl message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setExpiration("5000")
.build();
利用TLL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列使用场景包括:
RabbitMQ准备了DelayExchange插件,避免我们自己配置使用很麻烦。它不是基于队列,而是基于交换机。
官方原生插件地址
我们在里面找到如下内容后下载
注意版本,笔者用的版本只支持到3.9.0
通过如下命令,找到数据卷位置
docker volume inspect mq-plugins
上传到那个位置即可。
进入容器,开启插件
docker exec -it mq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启一下,就可以用了。
该插件将交换机功能进行了升级,是对官方交换机进行了一层封装,让交换机存储消息。
手动指定类
发消息时需要加一个消息头,类似如下图
使用上来说
/** * 注解方式声明 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue",durable = "true"), exchange = @Exchange(value = "dl.direct",type = ExchangeTypes.DIRECT,delayed = "true"), key = {"dl"} )) public void listendeadQueue(String msg){ log.info("消费者接收到dl的延迟消息消息:【"+msg+"】"); } /** * Bean注解方式声明 */ @Bean public DirectExchange ttlExchange(){ return ExchangeBuilder.directExchange("dl.direct") .delayed() .durable(true) .build(); } /** * 发送消息 */ Message message = MessageBuilder .withBody("hello ttl message".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .setHeader("x-delay",1000) // .setExpiration("5000") .build();
当生产这发送消息的速度超过了消费者处理消息的速度,就会导致堆积。
解决的三种思路
RabbitMQ 3.6.0开始增加的概念
特性
惰性队列与持久化
声明时,使用即可
或者
/** * Bean注解方式声明 */ @Bean public Queue ttlQueue(){ return QueueBuilder .durable("ttl.queue") .ttl(10000) .lazy() .deadLetterExchange("dl.direct") .deadLetterRoutingKey("dl") .build(); } /** * 注解方式声明 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "dl.queue",durable = "true"), exchange = @Exchange(value = "dl.direct",type = ExchangeTypes.DIRECT), key = {"dl"}, arguments = @Argument(name = "x-queue-mode",value = "lazy") )) public void listendeadQueue(String msg){ log.info("消费者接收到dl的延迟消息消息:【"+msg+"】"); }
优点:
缺点:
RabbitMQ是一个基于Erlang语言编写,Erlang是一个面向并发的语言,天然支持集群模式。
集群节点标示
rabbit@[hostname]
又叫做标准集群(classic cluster)
集群中的节点共用一个Cookie,相同Cookie就允许相互通信。
我们获取一个Cookie
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
保存下来后,作为共享的Cookie
随后我们rm掉我们现在在用的数据,并清空数据卷
docker rm -f mq
docker volume rm mq-plugins
在/tmp目录下,创建一个配置文件rabbitmq.conf
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1=rabbit@mq1
cluster_formation.classic_config.nodes.2=rabbit@mq2
cluster_formation.classic_config.nodes.3=rabbit@mq3
在tmp下,在创建一个文件,记录我们的Cookie
touch .erlang.cookie
echo “Cookie” > .erlang.cookie
修改权限
chmod 600 .erlang.cookie
在tmp下,准备3个目录mq1,mq2,mq3作为之后的数据卷
mkdir mq1 mq2 mq3
将我们配置好的全部拷贝进去
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
准备一个网络,让集群互联
docker network create mq-net
挂载,在/tmp位置执行,记得改mq1为mq2,mq3,以及端口暴露
docker run -e RABBITMQ_DEFAULT_USER=yjx23332 -e RABBITMQ_DEFAULT_PASS=123456 -v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf -v ${PWD}/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie --name mq1 --hostname mq1 -p 15672:15672 -p 5672:5672 --net mq-net -d rabbitmq:3-management
记得开放对应端口
如图,可以看到所有节点
我们在一个MQ里创建队列,另一个队列里面可以看到
中断创建队列的MQ后,从另一边发现队列为宕机状态。
本质是主从模式:
在普通集群的基础上进行配置
有三种模式
ha-mode | ha-params | 效果 |
---|---|---|
准确模式 exactly | 队列的副本量 count | 集群中队列副本(中服务器和镜像服务器纸盒)的数量。count为1则意味着副本就是主节点。count为2就是一主节点一镜像。当集群数量小于count,主节点将镜像到所有节点。当集群数量大于则会随机选取进行镜像 |
all | (none) | 队列在集群中的所有的节点之间进行镜像。队列将镜像到任何新加入的节点。镜像备份到所有节点的操作将对所有集群节点施加额外压力,包括网络I/O,磁盘I/O和磁盘空间使用情况。推荐使用exactly,设置副本数目为(N/2 + 1) |
nodes | node names | 指定队列镜像创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建镜像节点到当前客户端连接到的节点 |
进入到容器中,进行操作
docker exec -it mq1 bash
输入下列的命令即可
rabbitmqctl set_policy ha-two “^two.” ‘{“ha-mode”:“exactly”,“ha-params”:2,“ha-sync-mode”:“automatic”}’
rabbitmqctl set_policy ha-all “^all.” ‘{“ha-mode”:“all”}’
rabbitmqctl set_policy ha-nodes “^nodes.” ‘{“ha-mode”:“nodes”,“ha-params”:[“rabbit@nodeA”,“rabbit@nodeB”]}’
可以在该处看到配置的规则
也可以在下面直接配置
3.8以后才有的新功能,用来替代镜像队列
直接在控制台就可以创建
Quorum:仲裁队列
Node:指定主节点
代码创建
首先修改配置
spring:
rabbitmq:
addresses: Ip:端口,Ip:端口
username: yjx23332
password: 123456
virtual-host: /
集群模式下,创建似乎不能绑定主节点等操作。同时监听器直接创建似乎也不可行。
/**
* Bean注解方式声明
*/
@Bean public Queue quorumQueue(){
return QueueBuilder
.durable("quorum.queue")
.quorum()
.build();
}
[1]Spring AMQP官网
[2]黑马程序员Java微服务
[3]RabbitMQ官方文档
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。