当前位置:   article > 正文

RabbitMq 消息接收确认(可靠消费)_listener: simple: acknowledge-mode: manual

listener: simple: acknowledge-mode: manual

RabbitMq 消息接收确认(可靠消费)

一.消息接收确认是什么:

是RabbitMq确认消息是否成功被消费的一种机制。

有三种消息确认方式:

1.none代表不确认:该模式下,只要队列获取到了消息,就默认已成功消费。该模式下,容易造成消息丢失的情况。

listener:
      simple:
            acknowledge-mode: none 
  • 1
  • 2
  • 3

2.manual手动确认: 该模式下需要在代码中进行手动确认消息。若出现异常,会触发消息的重试机制(默认重试三次),若重试结束后仍没有被确认,则消息状态会变成Unacked,如下图示:

2.1 配置方式

listener:
      simple:
            acknowledge-mode: manual
  • 1
  • 2
  • 3

3.auto自动确认(默认模式):自动应答,该模式下若消费出现异常则会触发MQ的重试机制,而重试机制若没处理好则容易导致死循环。如下图示:

3.1 配置方式

listener:
      simple:
            acknowledge-mode: auto
  • 1
  • 2
  • 3

3.2 消息接收确认重试机制的处理配置

避免重试机制导致的队列消费死循环的方法就是限制重试次数,或者使用手动应答等方式处理。
注意,在自动应答模式下,消息的最大重试次数容易造成消息的丢失。
listener:
      simple:
            acknowledge-mode: auto
            retry:
          	enabled: true #开启重试
          	max-attempts: 3 #最大重试次数,默认3次,达到次数后,会进行消息移除。若绑定了死信队列,则会放入死信队列中
          	initial-interval: 2000ms  #重试间隔时间
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

注意!,若想使用重试机制配置来限制重试的行为,那么在对应消费队列代码中,进行Nack的操作时,最后一个入参不能传入true(是否将消息重新发回队列中),否则照样会导致死循环。代码如下:

  @RabbitListener(queues = RabbitMqConstants.FANOUT_EMAIL_QUEUE)
   public void smsConsumerListener(Message message, Channel channel) throws IOException {
       String msg = new String(message.getBody());
       try {
      	   log.info("获取到队列消息:{}",msg);
           int a = 1 / 0;
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
       } catch (Exception e) {
       	此处最后一个入参为true时,会导致死循环
           channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
       }
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

二.最为重要的手动应答模式

1.手动应答的好处
	针对异常消息,可以将其转移到死信队列中,这样对当前队列既不会造成消息的阻塞堆积,
	也不会影响当前队列继续运行接收新的消息。
  • 1
  • 2
2.手动应答的使用
2.1 消费已确认方法,会确认唯一标识(deliveryTag)对应的消息
  • 1
	void basicAck(long deliveryTag, boolean multiple) throws IOException;
入参说明:
	long deliveryTag:消息唯一标识,RabbitMQ自动生成的对应消息的唯一ID,
	可从message.getMessageProperties().getDeliveryTag()方法中获得。
	
  boolean multiple:是否批量退回,不开启就使用false,
  开启批量退回需要增加自己的业务判断逻辑
  (比如:攒够几条再批量回退,或者设置等待间隔等等)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
2.2 消费不确认方法,不确认唯一标识(deliveryTag)对应的消息
  • 1
	void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;
入参说明:
	long deliveryTag:同Ack
	
  boolean multiple:同Ack

  boolean requeue:是否重新退回到原消息队列,退回就使用true,不退回的话就使用false。
  若是false,在没有绑定死信队列的情况下,则直接会将消息给丢弃掉。有死信队列则会将该消息转移到死信中去。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.3完整yml配置

# 本服务端口
server:
  port: xxxxx

# 本服务应用名称
spring:
  application:
    name: xxxx-xxxx
# Nacos配置地址
  cloud:
    nacos:
      discovery:
        server-addr: xxxxx

#RabbitMq配置
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 此处写你的RabbitMq服务地址
    port: 此处写你的RabbitMq端口
    listener:
      simple:
        acknowledge-mode: manual  #开启手动确认,none代表不确认,manual才是手动确认,auto自动确认
        retry:
          enabled: true #开启重试
          max-attempts: 3 #最大重试次数,默认3次,达到次数后,会进行消息移除。若绑定了死信队列,则会放入死信队列中
          initial-interval: 2000ms  #重试间隔时间

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

具体代码:

2.6 常量类

package constants;

/**
 * MQ常量类
 */
public class RabbitMqConstants {

    /************************************FANOUT模式***************************************************/

    /**
     * 发布订阅模式交换机
     */
    public static final String FANOUT_EXCHANGE = "fanout-exchange";

    /**
     * 发布订阅模式死信推送队列
     */
    public static final String FANOUT_EMAIL_QUEUE_TO_DLX = "fanout.email.queue.to.dlx";
    
   /************************************DeadLetter死信队列***************************************************/

    /**
     * 死信队列交换机
     */
    public static final String DEAD_LETTER_EXCHANGE_DLX = "dead-letter-exchange";

    /**
     * 死信队列
     */
    public static final String DEAD_LETTER_EMAIL_QUEUE_DLQ = "dead.letter.email.queue";

    /**
     * 死信队列路由键
     */
    public static final String DEAD_LETTER_ROUTING_KEY_DLK = "dead.letter.email.routing.key";


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

2.5 Fanout队列绑定配置类

package com.rabbitmq.nacos.config;


import constants.RabbitMqConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutEmailToDlxConfig {
    /**
     * Fanout声明交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(RabbitMqConstants.FANOUT_EXCHANGE, true, false);
    }

    /**
     * Fanout声明队列
     */
    @Bean
    public Queue fanoutEmailToDlxQueue() {
        return QueueBuilder.durable(RabbitMqConstants.FANOUT_EMAIL_QUEUE_TO_DLX)
                .withArgument("x-dead-letter-exchange", RabbitMqConstants.DEAD_LETTER_EXCHANGE_DLX)
                .withArgument("x-dead-letter-routing-key", RabbitMqConstants.DEAD_LETTER_ROUTING_KEY_DLK)
                .build();
    }

    /**
     * 绑定Fanout交换机与队列
     */
    @Bean
    public Binding fanoutEmailToDlxBinding() {
        return BindingBuilder.bind(fanoutEmailToDlxQueue()).to(fanoutExchange());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2.6 死信队列绑定配置类

package com.rabbitmq.nacos.config;

import constants.RabbitMqConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Configuration
public class DeadLetterBindConfig {

    @Resource
    private AmqpAdmin amqpAdmin;

    @PostConstruct
    public void initDeclared() {
        amqpAdmin.initialize();
    }

    /**
     * DLX,全称为Dead-Letter-Exchange
     */
    @Bean
    public TopicExchange deadLetterExchange() {
        return new TopicExchange(RabbitMqConstants.DEAD_LETTER_EXCHANGE_DLX);
    }

    /**
     * DLQ,全称为Dead-Letter-Queue
     */
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(RabbitMqConstants.DEAD_LETTER_EMAIL_QUEUE_DLQ, true);
    }

    /**
     * DLK,全称为Dead-Letter-Routing-Key
     */
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(RabbitMqConstants.DEAD_LETTER_ROUTING_KEY_DLK);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

2.7队列监听类

package com.rabbitmq.nacos.consumer.fanout;


import com.rabbitmq.client.Channel;
import constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class FanoutEmailToDlxListener {

    @RabbitListener(queues = RabbitMqConstants.FANOUT_EMAIL_QUEUE_TO_DLX)
    public void fanoutEmailToDlxListener(Message message, Channel channel) throws IOException {

        String msg = new String(message.getBody());
        try {
            log.info("获取到队列消息:{}", msg);
            int a = 1 / 0;
            log.info("");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号