当前位置:   article > 正文

SpringBoot 集成 RabbitMq 消费者手动确认消息,失败重试后发送至死信队列_rabbitmq 手动确认消息报错进死信队列

rabbitmq 手动确认消息报错进死信队列

1.POM 引入依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- lombok 插件 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- mq依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

2.配置交换机和队列绑定

package com.xx.rabbitmq.consume.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 消息确认失败发送至死信队列
 */
@Configuration
public class RabbitMqAckConfig {

    /**
     * 正常交换机
     */
    @Bean
    public DirectExchange ackExchange() {
        return new DirectExchange("ack.exchange");
    }

    /**
     * 正常队列
     * 消息拒绝 发送到死信交换机,根据路由key到队列
     */
    @Bean
    public Queue ackQueue() {
        return QueueBuilder.durable("ack.queue")
                .withArgument("x-dead-letter-exchange", "ack.error.exchange")
                .withArgument("x-dead-letter-routing-key", "ack.error")
                .build();
    }

    /**
     * 正常交换机绑定正常队列
     */
    @Bean
    public Binding ackBinding() {
        return BindingBuilder.bind(ackQueue())
                .to(ackExchange())
                .with("ack.abc");
    }

    /**
     * 异常死信交换机
     */
    @Bean
    public DirectExchange ackErrorExchange() {
        return new DirectExchange("ack.error.exchange");
    }

    /**
     * 异常死信队列
     */
    @Bean
    public Queue ackErrorQueue() {
        return QueueBuilder.durable("ack.error.queue").build();
    }

    /**
     * 异常死信交换机绑定死信队列
     */
    @Bean
    public Binding ackErrorBinding() {
        return BindingBuilder.bind(ackErrorQueue())
                .to(ackErrorExchange())
                .with("ack.error");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

3.消费端代码

package com.xx.rabbitmq.consume.mq.consume;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消息确认
 */
@Slf4j
@Component
public class AckConsume {

    private static final Logger ack_error_logger = LoggerFactory.getLogger("ack-error");

    // mq 消息最大重试次数
    @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}")
    private Integer retryCountMax;


    /**
     * 消费消息
     */
    @RabbitListener(queues = {"ack.queue"})
    public void simple1(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("----------------------消费者1 获取到消息: {}", msg);
            // 模拟异常
            System.out.println(1 / 0);
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 头部信息
	        Map<String, Object> headers = message.getMessageProperties().getHeaders();
	        // 处理消息失败,获取重试次数
	        int retryCount = (int) headers.getOrDefault("x-retry-count", 1);
	        // 判断是否达到最大重试次数
	        if (retryCount >= retryCountMax) {
	            // 绑定死信队列,拒绝消息丢到死信队列
	            channel.basicReject(deliveryTag, false);
	        } else {
	            headers.put("x-retry-count", retryCount + 1);
	            throw new RuntimeException("抛异常重试消息");
	        }
        }
    }

    @RabbitListener(queues = {"ack.error.queue"})
    public void ackError(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        // 1.错误日志记录
        ack_error_logger.info("失败消息info:{}", msg);
        // 2.重试机制
        // 3.人工干预
        // 4.消息丢弃

        channel.basicAck(deliveryTag, false);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

4.application.yml 配置

server:
  port: 8081

spring:
  rabbitmq:
    host: 111.111.111.111 # rabbitmq的连接地址
    port: 5672 # rabbitmq的连接端口号
    virtual-host: my-host # rabbitmq的虚拟host
    username: admin # rabbitmq的用户名
    password: xxxxxx # rabbitmq的密码
    listener:
      simple:
        # 一次只拉取一条消息,实现公平分配
        prefetch: 1
        # 消费者手动确认消息
        acknowledge-mode: manual
        retry:
          enabled: true      # 启用消息重试
          initial-interval: 1000   # 初始重试间隔时间,单位为毫秒
          max-attempts: 3    # 最大重试次数
          multiplier: 5    # 重试间隔的乘数因子
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

5. 生产者发送消息

rabbitTemplate.convertAndSend("ack.exchange", "ack.abc", "hello333");
  • 1

运行结果

[2023-07-19 15:23:15.315] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO com.xx.rabbitmq.consume.mq.consume.AckConsume - [simple1,42] - ----------------------消费者1 获取到消息: hello333
[2023-07-19 15:23:16.316] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO com.xx.rabbitmq.consume.mq.consume.AckConsume - [simple1,42] - ----------------------消费者1 获取到消息: hello333
[2023-07-19 15:23:21.316] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO com.xx.rabbitmq.consume.mq.consume.AckConsume - [simple1,42] - ----------------------消费者1 获取到消息: hello333
[2023-07-19 15:23:21.328] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] INFO ack-error - [ackError,79] - 失败消息info:hello333
  • 1
  • 2
  • 3
  • 4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/996790
推荐阅读
相关标签
  

闽ICP备14008679号