当前位置:   article > 正文

mq常见问题:消息丢失、消息重复消费、消息保证顺序_mq消息丢失和重复消费解决方案

mq消息丢失和重复消费解决方案

mq常见问题:消息丢失、消息重复消费、消息保证顺序

消息丢失问题
rabbitmq举例来说,出现消息丢失的场景如下图

从图中可以看到一共有以下三种可能出现消息丢失的情况:

1> 生产者丢消息
生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败

2>MQ自身丢消息
未开启RabbitMQ的持久化,数据存储于内存,服务挂掉后队列数据丢失;
开启了RabbitMQ持久化,消息写入后会持久化到磁盘,但是在落盘的时候挂掉了,不过这种概率很小

3>消费者弄丢了消息
消费者刚接收到消息还没处理完成,结果消费者挂掉了…

针对以上三种情况,每种情况都有对应的处理方法:

1》生产者弄丢消息的解决方法
方法一:开启RabbitMQ的事务

rabbitmq提供了与三个事务相关的命令:select开启事务、commit提交事务、rollback回滚事务
采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能。

方法二:开启confirm模式

使用springboot时在application.yml配置文件中做如下配置

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest

发送者开启 confirm 确认机制

publisher-confirm-type: correlated

实现confirm回调接口

@Slf4j
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (!ack) {
        log.error("消息发送异常!");
        //可以进行重发等操作
    } else {
        log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

}

生产者发送消息时设置confirm回调

@Slf4j
@Configuration
public class RabbitMqConfig {

@Bean
public ConfirmCallbackService confirmCallbackService() {
    return new ConfirmCallbackService();
}

@Bean
public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);

    /**
     * 消费者确认收到消息后,手动ack回执回调处理
     */
    rabbitTemplate.setConfirmCallback(confirmCallbackService());
    return rabbitTemplate;
}

//其他配置代码
......
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

小结: 事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制。

2》MQ自身弄丢消息时的解决方法
使用持久化队列,发送消息时做持久化到磁盘处理。
同时设置queue和message持久化以后,RabbitMQ 挂了再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据,保证数据不会丢失。

但是就算开启持久化机制,也有可能出现消息落盘时服务挂掉的情况。这时可以考虑结合生产者的confirm机制来处理,持久化机制开启后消息只有成功落盘时才会通过confirm回调通知生产者,所以可以考虑生产者在生产消息时维护一个正在等待消息发送确认的队列,如果超过一定时间还没从confirm中收到对应消息的反馈,自动进行重发处理。

3》消费者自身弄丢消息时的解决方法
关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费,不过我们只需要保证幂等性就好了,重复消费也不会造成问题。

在springboot中修改application.yml配置文件更改为手动ack模式

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest

发送者开启 confirm 确认机制

publisher-confirm-type: correlated

发送者开启 return 确认机制

publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true

设置消费端手动 ack

acknowledge-mode: manual

是否支持重试

retry:
enabled: true

消费端手动ack参考代码:

@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
    try {
        //业务处理代码
        ......
    
        //手动ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        
    } catch (Exception e) {
        if (message.getMessageProperties().getRedelivered()) {
            log.error("消息已重复处理失败,拒绝再次接收...", e);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
        } else {
            log.error("消息即将再次返回队列处理...", e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

消息重复消费问题
出现消息重复消费的情况

原因一
1、生产者发送给消息队列以后,消息队列会应达给生产者,但是这个过程中,消息队列出问题了没有收到消息,那么生产者就会重复发生消息,这时就产生了重复消息。
2、生产者发生消息给消息队列,消息队列由于数量太大延迟了,生产者等待响应超时了,这时生产者又会从新发生消息给消息队列。
3、生产者和消息队列因网络问题引起,生产者会发起重试。这样也会产生重复消息。
4、其实主要原因就是,消息成功进入了消息队列,但是由于各种原因消息队列没有给生产者成功的返回值,而生产者又有重试机制这种情况下就会产生重复消息。

原因二
1、消息队列推送给消费者,消费者处理消息这个过程中消费出现了问题,消息队列不知道消费者处理结果,就会在次投递。
2、消费者处理完,网络出现问题,这时没有给中间件消息队列返回结果,消息队列会在次投递消费者。
3、消费者处理超时,超过了消息队列的超时时间,这时消息队列也会再次投递。
4、消费者处理完结果返回给消息中间件,但是消息中间件出现问题,处理结果丢失了,重启后,消息中间件内部检查发现这个消息还没有处理也会在次投递给消费者。

针对该问题,一般是在消费者端做幂等处理。

如何保证消息队列消费的幂等性

这一块应该还是要结合业务来选择合适的方法,有以下几个方案:

消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。
消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。
针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了。

消息保证顺序消费
消息在投入到queue的时候是有顺序,如果只是单个消费者来处理对应的单个queue,是不会出现消息错乱的问题。但是在消费的时候有可能多个消费者消费同一个queue,由于各个消费者处理消息的时间不同,导致消息未能按照预期的顺序处理。其实根本的问题就是如何保证消息按照预期的顺序处理完成。

出现消费顺序错乱的情况

为了提高处理效率,一个queue存在多个consumer

一个queue只存在一个consumer,但是为了提高处理效率,consumer中使用了多线程进行处理

保证消息顺序性的方法
将原来的一个queue拆分成多个queue,每个queue都有一个自己的consumer。该种方案的核心是生产者在投递消息的时候根据业务数据关键值(例如订单ID哈希值对订单队列数取模)来将需要保证先后顺序的同一类数据(同一个订单的数据) 发送到同一个queue当中。

一个queue就一个consumer,在consumer中维护多个内存队列,根据业务数据关键值(例如订单ID哈希值对内存队列数取模)将消息加入到不同的内存队列中,然后多个真正负责处理消息的线程去各自对应的内存队列当中获取消息进行消费。

RabbitMQ保证消息顺序性总结:
核心思路就是根据业务数据关键值划分成多个消息集合,而且每个消息集合中的消息数据都是有序的,每个消息集合有自己独立的一个consumer。多个消息集合的存在保证了消息消费的效率,每个有序的消息集合对应单个的consumer也保证了消息消费时的有序性。

本文总结:
消息丢失:

生产者 建议使用异步confirm(非高并发需求情况下也可以考虑rabbitmq的事务机制)

    mq:做消息持久化

    消费者:关闭自动提交offset/自动ack  使用手动处理
  • 1
  • 2
  • 3
  • 4
  • 5

重复消费:很容易解决,建立去重表,做幂等处理

如何保证有序:

多个queue, 每个queue都有一个自己的consumer,将一类消息投递到一个queue中

    消费者维护内存队列,同一类消息hash到一个内存队列中
  • 1
  • 2
  • 3
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/597469
推荐阅读
相关标签
  

闽ICP备14008679号