赞
踩
目录
RabbitMQ 的消息确认机制应用场景非常广泛,尤其是在需要确保消息可靠性和避免消息丢失的场合下更为重要,例如:金融系统、电商交易系统等。以下是消息确认机制的一些常见应用场景和好处:
1. 确认消息的可靠性
在 RabbitMQ 中,生产者将消息发送到队列之后就不能再控制该消息的安全性,而消费者需要及时地对该消息进行处理并进行确认,以确保该消息已经被成功消费。使用消息确认机制可以保证消息只会被消费一次,从而确保消息的可靠性。
2. 防止消息丢失
在 RabbitMQ 中,当消费者从队列中取出消息之后,消息就被认为是已经消费,如果消费者在消费过程中出现异常导致消费失败,那么该消息就会从队列中被删除,从而导致消息丢失。使用消息确认机制可以避免这种情况的发生,从而保证消息不会丢失。
3. 避免重复消费
在 RabbitMQ 中,如果消费者在处理完一个消息之后没有及时确认该消息已经被消费,那么 RabbitMQ 认为该消息未被消费,就会将该消息重新发送给另一个消费者进行消费,从而导致消息重复消费。使用消息确认机制可以避免这种情况的发生,从而保证消息只会被消费一次。
4. 节约系统资源
在 RabbitMQ 中,当一个消费者同时处理多个消息时,可能会导致系统资源短缺或者消息被重复消费。使用消息确认机制可以限制消费者一次只处理一个消息,从而提高系统的稳定性和可靠性,同时还可以避免消息被重复消费的问题。
综上所述,消息确认机制在 RabbitMQ 中的应用场景非常广泛,可以有效地保证消息的可靠性、避免消息丢失和重复消费、节约系统资源等。因此,在实际应用中,推荐使用消息确认机制来确保 RabbitMQ 的高可用和高性能。
在消息传递系统中,实现消息的可靠性可以通过引入消息确认机制来完成。该机制涉及三个方面:确认消息的发送、确认消息的接收以及拒收消息的处理。以下是这一优化的详细方案:
确认消息的发送:
- 发送者在向消息队列发送消息之前,需等待接收到消息队列发出的确认信号。
- 当消息成功写入消息队列后,消息队列会发送一个确认信号给发送者,表示消息已经被成功接收并保存。
- 如果发送者在一定时间内未收到确认信号,可以选择重新发送消息或执行其他错误处理逻辑。
确认消息的接收:
- 接收者在从消息队列中获取消息后,需发送一个确认信号给消息队列,表示已经成功接收到该消息。
- 消息队列收到确认信号后,会将该消息标记为已确认,并在需要的情况下进行下一步处理。
- 如果接收者在一定时间内未发送确认信号,消息队列可以将该消息重新投递给其他接收者或执行其他补救措施。
拒收消息的处理:
- 如果接收者无法处理某条消息,可以发送拒收信号给消息队列,表示拒绝接收该消息。
- 消息队列收到拒收信号后,可以将该消息重新投递给其他接收者或执行其他适当的处理策略。
- 发送拒收信号的原因可能包括消息格式错误、业务逻辑不符等。
通过实现消息确认机制,可以提高消息传递的可靠性和稳定性。发送者可以确保消息被正确写入消息队列,接收者可以确保每条消息被成功接收,并且拒收功能可以帮助处理无法处理的消息。
- #自动签收:auto 手动:manual
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
若要实现手动确认,必须在配置中这样配置,否则消息会被重复消费,还会遇见不可预料的报错结果
- @Configuration
- public class RabbitMqConfig {
- Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
- rabbitTemplate.setMandatory(true);
- // 确认消息送到交换机(Exchange)回调
- rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
- if (ack) {
- assert correlationData != null;
- logger.info("消息确认送到交换机(Exchange),消息的唯一标识符:{}", correlationData.getId());
- } else {
- logger.info("投递失败,错误原因 :{}", cause);
- }
- });
- return rabbitTemplate;
- }
- }
生产者发送的消息,不管成功与否都会调用回调函数,确保消息已经成功发送到交换机中
如果设置手动确认,则所有队列中的消息被消费后都需要手动确认,不然不会从队列中移除,第二次重启服务后还会被重复消费,如下图所示:
- @Configuration
- public class SimpleQueueConfig {
- Logger logger = LoggerFactory.getLogger(SimpleQueueConfig.class);
- private static Map<Long, String> list = new HashMap<>();
-
- @Bean(name = "simpleQueue")
- public Queue queue() {
- Map<String, Object> arguments = new HashMap<>(4);
- arguments.put("x-message-ttl", 20000);
- arguments.put("x-max-length", 1000);
- arguments.put("x-dead-letter-exchange", "dead.exchange");
- arguments.put("x-dead-letter-routing-key", "dead.message");
- return new Queue("simple_queue", true, false, false, arguments);
- }
-
- @Bean(name = "deadQueue")
- public Queue deadQueue() {
- return new Queue("dead.queue", true, false, false);
- }
-
- @Bean(name = "deadExchange")
- public Exchange exchange() {
- return new DirectExchange("dead.exchange", true, false);
- }
-
- @Bean(name = "deadBinding")
- public Binding binding() {
- return BindingBuilder.bind(deadQueue()).to(exchange()).with("dead.message").noargs();
- }
-
- @RabbitListener(queues = "dead.queue")
- public void readDeadMessage(Message message, Channel channel) throws IOException {
- String msg = new String(message.getBody());
- logger.info("接收到的死信消息为:{}", msg);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
-
- @RabbitListener(queues = "simple_queue")
- public void readMessage(Message message, Channel channel) throws IOException {
- String msg = new String(message.getBody());
- System.out.println(msg);
- try {
- if (msg.contains("2") || msg.contains("7")) {
- logger.info("拒绝消费,(false)不重回队列,进入死信队列,消息为:{}", msg);
- // 第二个参数若为TRUE,则表示拒绝消费,重回队列让其他消费者消费,也可能自己会再次消费,若为FALSE,则表示不重回队列,将消息发送到死信队列中(前提是该队列绑定了死信队列)
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
- } else if (msg.contains("3")) {
- // 消费报了异常
- int i = 1 / 0;
- } else {
- logger.info("确认消费,消息为:{}", msg);
- // 符合消费的条件,确认消费,第二个参数表示,是否批量确认
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- } catch (Exception e) {
- logger.info("报错消息,拒绝消费,直接丢弃,进入死信队列,消息为:{}", msg);
- // 进入异常方法,拒绝当前消费,第二个参数表示是否批量拒绝,第三个参数表示当前消息是否重回队列顶部,若为FALSE则表示丢弃该消息,但该消息会进入死信队列(前提是该队列绑定了死信队列)
- channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- } finally {
- // 限制消费者只有在确认之前最多接收一个未确认的消息
- channel.basicQos(1);
- }
- }
-
- @RabbitListener(queues = "simple_queue")
- public void readMessageTwo(Message message, Channel channel) throws IOException {
- logger.info("two接收one拒绝的消息为:{}", new String(message.getBody()));
- // 一次只确认一条消息
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
注意:
channel.basicReject和channel.basicNack的主要区别是:是否可以批量拒绝
- @SpringBootTest(classes = MqApplication.class)
- @RunWith(SpringRunner.class)
- public class ProducerSimpleTest {
- @Resource
- RabbitTemplate rabbitTemplate;
-
- @Test
- public void test() {
- for (int i = 1; i <=10; i++) {
- String msg = "消息" + i;
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(UUID.randomUUID().toString());
- rabbitTemplate.convertAndSend("simple_queue", (Object) msg, correlationData);
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。