当前位置:   article > 正文

RabbitMQ保证消息不丢失_rabbitmq 自动监听消息消费 业务发生异常消息不丢失

rabbitmq 自动监听消息消费 业务发生异常消息不丢失

消息丢失的情况

(1)生产者方面:生产者发送消息至MQ的数据丢失
(2)RabbitMQ方面:MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失
(3)消费者方面:消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

在这里插入图片描述
解决方法

1.配置文件中添加

#消息已发送到交换机(Exchange)时返回
spring.rabbitmq.publisher-confirm-type=correlated
# 消息在未被队列收到的情况下返回
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true 
# 开启消息手动确认机制
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.config类配置

package com.example.demo.config;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RabbitmqConfig {
    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //发送到exchange时调用回调函数

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:"+"相关数据:"+correlationData+" ConfirmCallback:"+"确认情况:"+ack+" ConfirmCallback:"+"原因:"+cause);

            }
        });

        //设置消息抵达队列的失败回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("二位热热我热太热");
            }
        });
        return rabbitTemplate;
    }

    /**
     * 在spring中也可使用@bean的方式去創建 綁定 等操作 这里我创建了一个交换机
     */
    @Bean
    public void createNormalExchange(){
        DirectExchange mydirect = new DirectExchange("mydirect3", true, false);
        amqpAdmin.declareExchange(mydirect);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

3.监听类改写

package com.example.demo.service.impl;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class RabbitmqTest2Impl {

    @RabbitListener(queues = {"mybe"})
    public void getMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            byte[] body = message.getBody();
            MessageProperties messageProperties = message.getMessageProperties();
            Thread.sleep(2000);
            String s = new String(body);
            System.out.println("消费消息完成"+s);
			//进行手动确认
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            //消息消费方错误后的处理
            //deliveryTag消息id
            //multiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.
            // requeue是否重新入队
            channel.basicNack(deliveryTag,false,false);

            e.printStackTrace();
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/1019022
推荐阅读
相关标签
  

闽ICP备14008679号