当前位置:   article > 正文

RabbitMQ消息重复消费

rabbitmq消息重复消费

RabbitMQ消息重复消费问题

同一条消息被一个消费者消费多次或者被多个消费者消费。可能导致系统相关业务重复执行和数据不一致问题。

1.场景模拟

生产者

public String sendMessage() {
        for (int i = 1; i <= 100; i++) {
            //生成消息id
            String messageId = UUID.randomUUID().toString();
            //消息内容
            String message = "rabbitMQ测试消息!" + i;
            //发送消息
            rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
                msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
                msg.getMessageProperties().setMessageId(messageId);//设置消息id
                return msg;
            });
            System.out.println("已发送消息:id=" + messageId + " message="+ message);
        }
        return "消息发送成功!";
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

消费者

           @RabbitListener(queues = "directQueue")
    public void spendMessage(String msg, Channel channel, Message message) throws IOException {
        String messageId = message.getMessageProperties().getMessageId();
        System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
        try {
            //模拟消费耗时
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

测试步骤

生产者发送100条消息

在这里插入图片描述

启动消费者

在这里插入图片描述

由于最后一条消息没有调用basicAck方法,消息并没有消费成功,当我们重启消费者服务时,消息会被再次消费。

重启消费者

在这里插入图片描述

2.解决方案

因为每条消息都有自己的id(唯一标识),可根据这个id来判断消息是否被消费过。

消费消息前先获取消息id—>查询缓存是否存在此id,判断id对应的值—>为1则表示该消息被消费过,为0则表示消费中

以下示例使用redis作为介质

消费者

    @RabbitListener(queues = "directQueue")
    public void spendMessage(String msg, Channel channel, Message message) throws IOException {
        String messageId = message.getMessageProperties().getMessageId();
         //messageId对应的缓存值为0时表示消息消费中,1表示消费完成
        if(Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(messageId, "0", 30L, TimeUnit.SECONDS))){//消息第一次被消费
            try {
                //模拟消费耗时
                System.out.println("接收到消息:id=" + messageId+ " message=" + msg);
                Thread.sleep(100);
                //业务执行完成后标识消息消费完成
                redisTemplate.opsForValue().set(messageId,"1",30L,TimeUnit.SECONDS);
                //消息消费确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                //消费成功后删除缓存
                redisTemplate.delete(messageId);
            } catch (Exception e) {
                //丢弃消息(关联了死信队列的话可以放入死信队列处理)
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                //删除缓存
                redisTemplate.delete(messageId);
            }
        }else{
            String value = redisTemplate.opsForValue().get(messageId);
            if("0".equals(value)){
                return;
            }
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

再次按照以上步骤测试,消息不会被重复消费。

在这里插入图片描述

最后一条被消费的消息为49,再次启动后未重复消费。

本示例中缓存过期时间为30s,若启动消费者的时间间隔超过30s,则消息仍会被重复消费

在这里插入图片描述

3.死信队列

死信队列就是一个普通队列,可以使用任意种交换机,业务队列可通过绑定死信交换机和路由键自动将被nack和reject且不重新入队的消息发送给对应的死信队列。或者通过设置业务队列的消息过期时间实现延时消息。

测试配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    private static final String DIRECT_EXCHANGE_NAME = "directExchange";//交换机名称
    private static final String DIRECT_ROUTE_KEY = "directRoute";//路由键
    private static final String DIRECT_QUEUE_NAME = "directQueue";//队列名称

    /**
     * 业务交换机
     * @return direct交换机
     * 名称,是否持久化,无队列自动删除
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE_NAME,true,false);
    }

    /**
     * 队列
     * @return 队列
     * 名称,是否持久化,是否独占,是否自动删除
     */
    @Bean
    public Queue directQueue(){
        Map<String, Object> args = new HashMap<>(3);
        //队列绑定的死信交换机
        args.put("x-dead-letter-exchange","dead_exchange");
        //队列的死信路由key
        args.put("x-dead-letter-routing-key", "dead_route");
        //消息过期时间
        //args.put("x-message-ttl",4000);
        return QueueBuilder.durable(DIRECT_QUEUE_NAME).withArguments(args).build();
    }

    /**
     * 绑定交换机和队列
     * @param directExchange 交换机
     * @param queue 队列
     * @return
     */
    @Bean
    public Binding bindingExchangeWithQueue(DirectExchange directExchange, @Qualifier("directQueue") Queue queue){
        return BindingBuilder.bind(queue).to(directExchange).with(DIRECT_ROUTE_KEY);
    }

    /**
     * 死信交换机
     * @return direct交换机
     * 名称,是否持久化,无队列自动删除
     */
    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange("dead_exchange",true,false);
    }

    @Bean
    public Queue deadQueue(){
        return new Queue("dead_queue",true,false,false);
    }

    @Bean
    public Binding bindingDeadExchangeWithQueue(DirectExchange deadExchange, Queue deadQueue){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead_route");
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

在第二步的代码中添加模拟报错代码

if(msg.endsWith("2")){//模拟消息处理出错
    throw new RuntimeException();
  }
  • 1
  • 2
  • 3

消费者消费完所有消息后,控制台可以看到死信队列里有10条消息

在这里插入图片描述

新建死信队列监听

    @RabbitListener(queues = "dead_queue")
    public void deadMessage(String msg, Channel channel, Message message) throws IOException {
        String bodyMessage = new String(message.getBody());
        System.out.println("deadMessage = " + bodyMessage);
        System.out.println("处理死信消息:" + msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

启动消费者后死信队列消息被处理

在这里插入图片描述

4.延时消息(建议使用延时消息插件)

在生产者消息发送方法内添加如下参数,设置当前发送消息的过期时间,消息过期后根据队列绑定的死信交换机和路由键将消息发送到死信队列,死信队列消费者消费消息即完成了消息延时消费。

**注意:**这种延时消息局限性较大,因为如果先发送一条消息设置过期时间为30s,随后发送一条过期时间为10s的消息,仍会是第一条消息过期后第二条消息才能进入死信队列。

发送10条消息到队列,依次设置过期时间为10s到1s

    public String sendMessage() {
        for (int i = 10; i >= 1; i--) {
            //生成消息id
            String messageId = UUID.randomUUID().toString();
            //消息内容
            String message = "rabbitMQ测试消息!" + i;
            int time = i;
            //发送消息
            rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, DIRECT_ROUTE_KEY,message,msg -> {
                msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//持久化
                msg.getMessageProperties().setMessageId(messageId);//设置消息id
                msg.getMessageProperties().setExpiration(time + "000");
                return msg;
            });
            System.out.println("已发送消息:id=" + messageId + " message="+ message);
        }
        return "消息发送成功!";
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

在这里插入图片描述
消费者注释消费普通队列的代码,启动消费者观察死信队列消息消费顺序。

前10s都是没有处理消息的,因为第一条消息未过期,后续的消息也不会进入死信队列。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/977660
推荐阅读
相关标签
  

闽ICP备14008679号