赞
踩
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- lombok 插件 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- mq依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
package com.xx.rabbitmq.consume.mq.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 消息确认失败发送至死信队列 */ @Configuration public class RabbitMqAckConfig { /** * 正常交换机 */ @Bean public DirectExchange ackExchange() { return new DirectExchange("ack.exchange"); } /** * 正常队列 * 消息拒绝 发送到死信交换机,根据路由key到队列 */ @Bean public Queue ackQueue() { return QueueBuilder.durable("ack.queue") .withArgument("x-dead-letter-exchange", "ack.error.exchange") .withArgument("x-dead-letter-routing-key", "ack.error") .build(); } /** * 正常交换机绑定正常队列 */ @Bean public Binding ackBinding() { return BindingBuilder.bind(ackQueue()) .to(ackExchange()) .with("ack.abc"); } /** * 异常死信交换机 */ @Bean public DirectExchange ackErrorExchange() { return new DirectExchange("ack.error.exchange"); } /** * 异常死信队列 */ @Bean public Queue ackErrorQueue() { return QueueBuilder.durable("ack.error.queue").build(); } /** * 异常死信交换机绑定死信队列 */ @Bean public Binding ackErrorBinding() { return BindingBuilder.bind(ackErrorQueue()) .to(ackErrorExchange()) .with("ack.error"); } }
package com.xx.rabbitmq.consume.mq.consume; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Value; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; /** * 消息确认 */ @Slf4j @Component public class AckConsume { private static final Logger ack_error_logger = LoggerFactory.getLogger("ack-error"); // mq 消息最大重试次数 @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") private Integer retryCountMax; /** * 消费消息 */ @RabbitListener(queues = {"ack.queue"}) public void simple1(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("----------------------消费者1 获取到消息: {}", msg); // 模拟异常 System.out.println(1 / 0); // 手动确认消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { // 头部信息 Map<String, Object> headers = message.getMessageProperties().getHeaders(); // 处理消息失败,获取重试次数 int retryCount = (int) headers.getOrDefault("x-retry-count", 1); // 判断是否达到最大重试次数 if (retryCount >= retryCountMax) { // 绑定死信队列,拒绝消息丢到死信队列 channel.basicReject(deliveryTag, false); } else { headers.put("x-retry-count", retryCount + 1); throw new RuntimeException("抛异常重试消息"); } } } @RabbitListener(queues = {"ack.error.queue"}) public void ackError(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { // 1.错误日志记录 ack_error_logger.info("失败消息info:{}", msg); // 2.重试机制 // 3.人工干预 // 4.消息丢弃 channel.basicAck(deliveryTag, false); } }
server: port: 8081 spring: rabbitmq: host: 111.111.111.111 # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: my-host # rabbitmq的虚拟host username: admin # rabbitmq的用户名 password: xxxxxx # rabbitmq的密码 listener: simple: # 一次只拉取一条消息,实现公平分配 prefetch: 1 # 消费者手动确认消息 acknowledge-mode: manual retry: enabled: true # 启用消息重试 initial-interval: 1000 # 初始重试间隔时间,单位为毫秒 max-attempts: 3 # 最大重试次数 multiplier: 5 # 重试间隔的乘数因子
rabbitTemplate.convertAndSend("ack.exchange", "ack.abc", "hello333");
[2023-07-19 15:23:15.315] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO com.xx.rabbitmq.consume.mq.consume.AckConsume - [simple1,42] - ----------------------消费者1 获取到消息: hello333
[2023-07-19 15:23:16.316] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO com.xx.rabbitmq.consume.mq.consume.AckConsume - [simple1,42] - ----------------------消费者1 获取到消息: hello333
[2023-07-19 15:23:21.316] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO com.xx.rabbitmq.consume.mq.consume.AckConsume - [simple1,42] - ----------------------消费者1 获取到消息: hello333
[2023-07-19 15:23:21.328] [] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] INFO ack-error - [ackError,79] - 失败消息info:hello333
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。