赞
踩
为了保证我们生产者在发送消息的时候消息不丢失,我们需要保证发送者的可靠性
假如发送消息的时候消息丢失 ,我们可以使用发送者 重试机制,尝试重新发送消息
实现该机制非常简单,只需要在yml文件中 配置 rabbitmq的配置信息就可以了
在我们 生产者发送消息到交换机的时候,假如 我们发送到交换机 ,但是 队列没有收到消息,会返回ack,发送到交换机,然后发送到队列,消费者没有接收到消息返回ack,但是发送到交换机失败,会返回nack
也就是说 我们 只要消息成功发送到 mq里面去,无论消息是否成功路由被消费,或者没被消费,他都会 返回ack
下面我们看具体实现代码
生产者代码
- @Test
- public void publisherConfirm(){
- //获得 相关组件
- CorrelationData correlationData = new CorrelationData();
- correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("消息发送错误{}",ex.getMessage());
- }
-
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- if(result.isAck()){
- log.info("发送成功收到{},但是消息可能未路由",result.getReason());
- }else{
- log.error("发送失败收到{}",result.getReason());
- }
- }
- });
- //我们像一个不的交换机中发送一条消息 验证我们的情况
- rabbitTemplate.convertAndSend("myExchange.direct","my","aaa",correlationData);
- }
消费者代码
- @Component
- @Slf4j
- @RequiredArgsConstructor
- public class Consumer2 {
- private final RabbitTemplate rabbitTemplate;
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = "mySimple.queue",durable = "true",
- arguments = @Argument(name="x-queue-mode",value="lazy")),
- exchange = @Exchange(value = "myExchange.direct",durable = "true"),
- key = "my"
- ))
- public void consumerMessage(String message){
- log.info("收到消息{}",message);
- }
-
- }
我们在设置的时候可以设置交换机队列持久化,这样即使我们rabbitmq重启的话,也不用担心交换机队列丢失
我们在设置的时候可以手动设置 一个持久化的 message 对象,当作消息传递,然后 持久化之后消息会直接存储到磁盘中去,防止内存中的消息积压同时也可以为消息设置优先级
我们使用lazyqueue 形式发送消息到队列的时候,消息会直接存储到磁盘中去,一直到被需要消费的时候才会被加载
设置lazyqueue也很简单,我们只需要在代码中简单配置即可
消费者确认消息之后会返回ack,并且删除消息,
消费者对消息处理失败返回nack,mq需要重新发送消息
reject 消费者处理消息失败,并且拒绝该消息,并且删除消息
而怎么确认消息也有三种机制
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活
auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:
如果是业务异常,会自动返回nack
;
如果是消息处理或校验异常,自动返回rejec
t
我们在yml文件中配置即可
当一条消息发送失败的时候,消费者重新尝试消费消息,达到我们重试的次数之后,消费者返回reject,mq直接删除消息
当消息彻底处理失败,我们消费者设置一个新的交换机队列 重新存储这些失败的消息后续再做处理
- package com.itheima.consumer.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
- import org.springframework.context.annotation.Bean;
-
- @Configuration
- @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
- public class ErrorMessageConfig {
- @Bean
- public DirectExchange errorMessageExchange(){
- return new DirectExchange("error.direct");
- }
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue", true);
- }
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
- return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
- }
-
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
死信交换机 ,都是假如一个定时消息过期了,或者发送延迟消息我们直接把该消息传递到我们绑定的死信交换机中,跟上文 消息发送失败了返回rejct之后,消息发送到err交换机是两种不同的策略
我们用死信交换机发送延迟消息的策略可以如下图
而代码也很简单,只需要再声明队列a的时候绑定死信交换机就可以了
dlx就是死信交换机
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。