赞
踩
目录
由于网络波动,可能会出现客户端连接失败的情况,需开启重连机制。
SpringBoot项目配置:
- spring:
- rabbitmq:
- # 连接超时时间
- connection-timeout: 500ms
- template:
- retry:
- # 开启失败重连
- enabled: true
- # 失败后重连初始间隔时间
- initial-interval: 1000ms
- # 失败后下次间隔的时长倍数,下次间隔时长=本次间隔时长*multiplier
- multiplier: 1
- # 最大重试次数
- max-attempts: 3
注意:重试过程线程是被阻塞的,合理配置等待时长及最大重试次数,或开启异步线程执行,以免影响业务性能。
SpringBoot项目配置:
- spring:
- rabbitmq:
- # 开启生产者确认
- publisher-confirm-type: correlated
- # 返回路由失败消息,一般是开发问题,无需开启
- publisher-returns: true
publisher-confirm-type三种模式:
none | 关闭 |
simple | 同步阻塞等待MQ回执消息 |
correlated(推荐) | MQ异步回调返回回执消息 |
注意:以上两种方式均会造成MQ性能下降,非必要不建议开启。失败情况毕竟非常少,可在代码中通过输出日志或存储数据库等方式将发送失败的消息记录下来,稍后手动处理。
SpringBoot项目:
交换机和消息默认为持久化,需自行设置队列持久化。
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "xx.queue", durable = "true"),
- exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC),
- key = "xx"
- ))
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
声明LazyQueue:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "xx.queue", durable = "true",
- arguments = {@Argument(name = "x-queue-mode", value = "lazy"),
- exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC),
- key = "xx"
- ))
优点:
基于磁盘存储,消息上限高
没有间歇性的page-out,性能比较稳定
缺点:
基于磁盘存储,消息时效性会降低
性能受限于磁盘的IO
失败后尝试在本地重试,重试后依然失败,将消息投递到用于投递失败消息的交换机,存储到失败消息队列中,等待后续手动处理。
SpringBoot项目配置:
- spring:
- rabbitmq:
- listener:
- simple:
- retry:
- # 开启失败重试
- enabled: true
SpringBoot项目配置类:
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitMqErrorConfig {
-
- @Bean
- public DirectExchange errorExchange() {
- return new DirectExchange("error.direct");
- }
-
- @Bean
- public Queue errorQueue() {
- return new Queue("error.queue");
- }
-
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
- return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
- }
-
- @Bean
- public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
由于网络问题或消息生产消费过程中出现问题,均会导致消息重复的情况。
在业务层面上,保证重复执行对结果不产生影响。例如:支付成功后修改订单状态,可以将未支付状态作为修改语句的执行条件。
如果不能通过业务保证幂等性,可以将处理过的消息ID记录到redis,如果新到的消息ID已经在记录中,那么就不再处理这条消息。
优化消费者处理逻辑,使消费者更快处理。
将队列绑定多个消费者,提高消费速度。
基于磁盘存储,消息上限高。
发生原因:
1、将一个队列拆分成多个队列,保证一个队列只绑定一个消费者,生产者在投递消息时根据业务数据关键值来将需要保证先后顺序的同一类数据发送到同一个队列当中。
2、将队列设置为单活模式
x-single-active-consumer:单活模式,表⽰是否最多只允许⼀个消费者消费,如果有多个消费者同时绑定,则只会激活第⼀个,除⾮第⼀个消费者被取消或者死亡,才会⾃动转到下⼀个消费者。
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "xx.queue", durable = "true",
- arguments = {@Argument(name = "x-single-active-consumer", value = "true", type = "java.lang.Boolean")}),
- exchange = @Exchange(name = "xx.topic", type = ExchangeTypes.TOPIC),
- key = "xx"
- ))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。