赞
踩
原创 程序猿羊 程序猿小杨 2023-11-17 07:45 发表于上海
收录于合集
#消息中间件MQ9个
#近期热推20个
程序猿小杨
分享Java相关技术、数据库、Python、职场、感悟、视频资源等干货和学习心得。 如:kettle、ES、redis\mongoDB、springboot、Zookeeper、高并发多线程、中间件、JVM、程序员攻略等。
86篇原创内容
公众号
近期热推文章:
1、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;
2、SpringBoot整合多数据源,并支持动态新增与切换(详细教程)
4、研发必会-异步编程利器之CompletableFuture(含源码 中)
一、生产者重连
有时候由于网路波动,可能会出现客户端连接MQ失败的情况,通过配置可以开启连接失败后的重试机制:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#virtual-host: /tech-sharing
publisher-returns: false #一般不需要开启
publisher-confirm-type: correlated #发布消息成功到交换器后会触发回调方法 也即:发送确认confirm
connection-timeout: 1s #设置MQ的连接超时时间
template:
mandatory: false #必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #失败后下次的等待时长倍数,下次等待时长= initial-interval * multiplier
max-attempts: 3 #最大重试次数
listener:
simple:
prefetch: 1 #消费者每次只能获取一条消息,才处理完才能获取下一条(可实现能者多劳)
acknowledge-mode: NONE #1、NONE:全自动 2、AUTO:半自动 3、MANUAL:手动 开启消费端消息确认机制
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然可以考虑使用异步线程来执行发送消息的代码。
二、生产者确认
RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
1、消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功;
2、临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功;
3、持久消息投递到了MO,并且入队完成持久化,返回ACK,告知投递成功;
4、其它情况都会返回NACK,告知投递失败。
yml中开启生效:
详细配置如下:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#virtual-host: /tech-sharing
publisher-returns: true #一般不需要开启
publisher-confirm-type: correlated #发布消息成功到交换器后会触发回调方法 也即:发送确认confirm
connection-timeout: 1s #设置MQ的连接超时时间
template:
mandatory: true #必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
retry:
enabled: true #开启超时重试机制
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #失败后下次的等待时长倍数,下次等待时长= initial-interval * multiplier
max-attempts: 3 #最大重试次数
listener:
simple:
prefetch: 1 #消费者每次只能获取一条消息,才处理完才能获取下一条(可实现能者多劳)
acknowledge-mode: NONE #1、NONE:全自动 2、AUTO:半自动 3、MANUAL:手动 开启消费端消息确认机制
2.1、开启Return机制:
@Component
@Slf4j
public class MQConfirmConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
//获取RabbitTemplate
RabbitTemplate rabbitTemplate=applicationContext.getBean(RabbitTemplate.class);
//设置
rabbitTemplate.setReturnsCallback(returnCallback->{
log.info("发送消息失败,返回结果为returnCallback:{}",returnCallback);
});
}
}
注意:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
2.2、开启Confirm机制:
/********使用confirm确认机制发送消息*******************************/
public void sendMsgByConfirm(String message){
CorrelationData correlationData=new CorrelationData();
//给Future添加confirmcallbancj
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.info("发生异常....."); //基本不会触发
}
@Override
public void onSuccess(@Nullable CorrelationData.Confirm result) {
//result.isAck() , boolean类型,true代表ack回执,false 代表nack回执
if(result.isAck()){
log.info("发送消息投递成功....");
}else{
log.error("发送消息投递失败:{}",result.getReason());
}
}
});
//发送消息
rabbitTemplate.convertAndSend("TestDirectExchange001",
"TestDirectRouting001", message,correlationData);
}
总结:
最后说明:大多数情况下,不需要开启消息确认机制,如果业务场景要求比较高的情况下,再考虑开启。
三、MQ的可靠性
3.1、数据持久化
说明:3.6版本以后数据持久化已经默认。3.6版本以前需要自行设置。
--spring中交换机与队列持久化源码:
发送消息,非持久化会出现paged out,会阻塞IO,性能下降:
持久化消息,不会出现阻塞情况,性能下降的比较小:
3.2、Lazy Queue
(3.12版本之前)需要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
设置后效果如下:
总结:
RabbitMQ如何保证消息的可靠性:
首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会称为队列的默认模式,LazyQueue会将所有消息都持久化。
开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执。
3.3、消息落库的方式
消息投递成功,根据ID更新消息状态;消息发送失败时,进行消息重发,如果没有超过最大尝试次数,进行消息重发的同时更新DB消息状态。(具体的见一下章节)。
四、消费者可靠性
保证消费者的可靠性主要有三种手段:1、消费者确认机制;2、消费失败处理;3、业务幂等性。
4.1、消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。
回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息。
nack:消息处理失败,RabbitMQ需要再次投递消息 。
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(一般是消息的参数不正确) 。
Spring AMQP已经实现了消息确认功能。并允许通过配置文件选择ACK处理方式,有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用。
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活。
auto:自动模式。Spring AMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:1、如果是业务异常,会自动返回nack;2、如果是消息处理或校验异常,自动返回reject。
开启消费者确认其机制:
spring:
rabbitmq: listener: # 开启消费者确认其机制
simple:
prefetch: 1 #消费者每次只能获取一条消息,才处理完才能获取下一条(可实现能者多劳)
acknowledge-mode: AUTO # none:关闭ack;manual:手动ack;auto:自动ack
消费者业务模拟异常:
@RabbitListener(bindings = @QueueBinding(value=@Queue(name="BatchDirectQueue02"),
exchange=@Exchange(name="BatchDirectExchange02",type = ExchangeTypes.DIRECT,ignoreDeclarationExceptions = "true")
))
public void receiveBatchMsg(String message){
log.info("接收发送消息....."+message);
throw new RuntimeException("测试异常");
}
auto模式下,消息未被处理会保留消息,尝试重新投递给消费者。如下模拟的异常情况下,消息会一直被投递。
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mg的消息处理飙升,带来不必要的压力。
失败消息处理策略:
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer: 重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定的交换机。
RepublishMessageRecoverer原理:
定义接收失败消息的交换机、队列及其绑定关系:
/**
* 功能描述:定义接收错误消费的日志
* @MethodName: receiveErrorMessage
* @MethodParam: [message]
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/11/15 9:55
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "errorQueue"),
exchange = @Exchange(name = "errorExchange", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"),
key = "errorRouting"
))
public void receiveErrorMessage(String message) {
log.info("消费者收到发送错误的消息: " + message);
}
定义RepublishMessageRecoverer:
/**
* @Description: TODO:定义错误消息接收
* @Author: yyalin
* @CreateDate: 2023/11/15 9:58
* @Version: V1.0
*/
@Configuration
@Slf4j
public class ErrorConfig {
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
log.debug("加载RepublishMessageRecoverer");
return new RepublishMessageRecoverer(rabbitTemplate,"errorExchange","errorRouting");
}
}
测试结果:
消费者如何保证消息一定被消费?
1、开启消费者确认机制为auto,由spring确认消息处理成功后返回ack,异常时返回nack。如果一直处理异常会一直重试。
2、开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。
指同一个业务,执行一次或多次对业务状态的影响是一致的。
方案一:是给每个消息都设置一个唯一id,利用id区分是否是重复消息。
步骤:
1、每一条消息都生成一个唯一的id,与消息一起投递给消费者。
/**
* 功能描述:生成唯一ID配置
* @MethodName: jacksonMessageConvertor
* @MethodParam: []
* @Return: org.springframework.amqp.support.converter.MessageConverter
* @Author: yyalin
* @CreateDate: 2023/11/16 13:42
*/
@Bean
public MessageConverter jacksonMessageConvertor() {
//1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
//2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
2、消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库。(对业务有侵入)
@RabbitListener(bindings = @QueueBinding(value=@Queue(name="BatchDirectQueue02"),
exchange=@Exchange(name="BatchDirectExchange02",type = ExchangeTypes.DIRECT,ignoreDeclarationExceptions = "true"),
key = "BatchDirectRouting02"
))
public void receiveBatchMsg(Message message){
log.info("接收发送消息:message:{},消息体body:{}",message,message.getBody());
log.info("获取消息id:"+ message.getMessageProperties().getMessageId());
}
3、如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
方案二:是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。
五、源码获取方式
更多优秀文章,请关注个人微信公众号或搜索“程序猿小杨”查阅。然后回复:源码,可以获取对应的源码,开箱即可使用。
说明:后期中间件MQ相关代码都会放到此文件夹中,请大家前去下载,开箱即可使用。
参考网站:
https://blog.csdn.net/jinkkkkkkk/article/details/133933737
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。