赞
踩
为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。
默认情况下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以在实际项目中会使用手动Ack。
消费者端的配置,相关属性值改为自己的:
- server.port=8082
- #rabbitmq服务器ip
- spring.rabbitmq.host=localhost
- #rabbitmq的端口
- spring.rabbitmq.port=5672
- #用户名
- spring.rabbitmq.username=lonewalker
- #密码
- spring.rabbitmq.password=XX
- #配置虚拟机
- spring.rabbitmq.virtual-host=demo
- #设置消费端手动 ack none不确认 auto自动确认 manual手动确认
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
修改消费代码:请勿复制使用,会卡死
- package com.example.consumer.service;
-
- import com.alibaba.fastjson.JSONObject;
- import com.example.consumer.entity.User;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
-
- /**
- * @description:
- * @author: LoneWalker
- * @create: 2022-04-04
- **/
- @Service
- @Slf4j
- public class ConsumerService {
-
-
- @RabbitListener(queues ="publisher.addUser")
- public void addUser(String userStr,Channel channel,Message message){
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- log.info("我一直在重试");
- int a = 1/0;
- User user = JSONObject.parseObject(userStr,User.class);
- log.info(user.toString());
- //手动ack 第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
- channel.basicAck(deliveryTag,false);
- } catch (Exception e) {
- //手动nack 告诉rabbitmq该消息消费失败 第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
- try {
- channel.basicNack(deliveryTag,false,true);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- }
- }
先启动发布者发送消息,查看控制台:有一条消息待消费·
启动消费端,因为代码中有除0,所以会报错,这里就会出现一条unacked消息:
因为设置的是将消息重新请求,所以它会陷入死循环
防止出现这种情况,可以将basicNack最后一个参数改为false,让消息进去死信队列
说简单点就是备胎队列,而死信的来源有以下几种:
channel.basicNack
或 channel.basicReject
,并且此时requeue
属性被设置为false
。“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:
从控制台将之前的交换机都删除,然后修改代码。
首先看一下发布者的配置代码:
- package com.example.publisher.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author LoneWalker
- * @date 2023/4/8
- * @description
- */
- @Slf4j
- @Configuration
- public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
-
- @Bean
- public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
- //设置给rabbitTemplate
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnsCallback(this);
- rabbitTemplate.setMandatory(true);
- return rabbitTemplate;
- }
-
- @Bean
- public MessageConverter jackson2JsonMessageConverter() {
- return new Jackson2JsonMessageConverter();
- }
- /************ 正常配置 ******************/
- /**
- * 正常交换机,开启持久化
- */
- @Bean
- DirectExchange normalExchange() {
- return new DirectExchange("normalExchange", true, false);
- }
-
- @Bean
- public Queue normalQueue() {
- // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。
- // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- Map<String, Object> args = deadQueueArgs();
- // 队列设置最大长度
- args.put("x-max-length", 5);
- return new Queue("normalQueue", true, false, false, args);
- }
-
- @Bean
- public Queue ttlQueue() {
- Map<String, Object> args = deadQueueArgs();
- // 队列设置消息过期时间 60 秒
- args.put("x-message-ttl", 60 * 1000);
- return new Queue("ttlQueue", true, false, false, args);
- }
-
- @Bean
- Binding normalRouteBinding() {
- return BindingBuilder.bind(normalQueue())
- .to(normalExchange())
- .with("normalRouting");
- }
-
- @Bean
- Binding ttlRouteBinding() {
- return BindingBuilder.bind(ttlQueue())
- .to(normalExchange())
- .with("ttlRouting");
- }
-
-
- /**************** 死信配置 *****************/
- /**
- * 死信交换机
- */
- @Bean
- DirectExchange deadExchange() {
- return new DirectExchange("deadExchange", true, false);
- }
-
- /**
- * 死信队列
- */
- @Bean
- public Queue deadQueue() {
- return new Queue("deadQueue", true, false, false);
- }
-
- @Bean
- Binding deadRouteBinding() {
- return BindingBuilder.bind(deadQueue())
- .to(deadExchange())
- .with("deadRouting");
- }
-
- /**
- * 转发到 死信队列,配置参数
- */
- private Map<String, Object> deadQueueArgs() {
- Map<String, Object> map = new HashMap<>();
- // 绑定该队列到死信交换机
- map.put("x-dead-letter-exchange", "deadExchange");
- map.put("x-dead-letter-routing-key", "deadRouting");
- return map;
- }
-
-
- /**
- * 消息成功到达交换机会触发
- * @param correlationData
- * @param ack
- * @param cause
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack) {
- log.info("交换机收到消息成功:" + correlationData.getId());
- }else {
- log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
- }
- }
-
- /**
- * 消息未成功到达队列会触发
- * @param returnedMessage
- */
- @Override
- public void returnedMessage(ReturnedMessage returnedMessage) {
- log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
- }
- }
properties
- server.port=8081
- #rabbitmq服务ip
- spring.rabbitmq.host=localhost
- #rabbitmq端口号
- spring.rabbitmq.port=5672
- #用户名
- spring.rabbitmq.username=用户名改为自己的
- #密码
- spring.rabbitmq.password=密码改为自己的
- #虚拟机
- spring.rabbitmq.virtual-host=demo
-
- spring.rabbitmq.publisher-confirm-type=correlated
- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.template.mandatory=true
发送消息:
- @RequiredArgsConstructor
- @Service
- public class PublisherServiceImpl implements PublisherService{
-
- private final RabbitTemplate rabbitTemplate;
-
- @Override
- public void addUser(User user) {
-
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(UUID.randomUUID().toString());
-
- rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData);
- }
- }
文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:
- package com.example.consumer.service;
-
- import com.alibaba.fastjson.JSONObject;
- import com.example.consumer.entity.User;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
-
- /**
- * @description:
- * @author: LoneWalker
- * @create: 2022-04-04
- **/
- @Service
- @Slf4j
- public class ConsumerService {
-
-
- @RabbitListener(queues ="normalQueue")
- public void addUser(String userStr,Channel channel,Message message){
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- int a = 1/0;
- User user = JSONObject.parseObject(userStr,User.class);
- log.info(user.toString());
- //手动ack 第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
- channel.basicAck(deliveryTag,false);
- } catch (Exception e) {
- //手动nack 告诉rabbitmq该消息消费失败 第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
- try {
- channel.basicNack(deliveryTag,false,false);
- } catch (IOException ex) {
- throw new RuntimeException("消息处理失败");
- }
- }
- }
- }
注意basicNack的第三个参数,设置为false后就不会重新请求。
配置上面的代码已经有过了:
测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:
过期时间TTL表示可以对消息设置预期的时间,超过这个时间就删除或者放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s
死信队列中的消息处理和正常的队列没什么区别,就不赘述了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。