当前位置:   article > 正文

rabbitmq unacked消息如何处理_RabbitMQ如何保证消息的可靠传输?

rabbitmq界面total为1和unacked为1,getmessage提示queue is empty
01PART消息的可靠传输

消息的可靠传输是指消息从Producer发送到Broker,再到Consumer被消费的整个过程中,要确保消息不丢失,如果出现消息丢失的情况,系统也要能发现,并做一些补偿措施,而不是消息丢了都不知道,石沉大海。

08bf58b24c1c9e9df9c5135db0bebbef.png

如图,消息从Producer发送到Broker再到Consumer被消费的整个传输过程中,因为网络中断等原因,可能会造成消息的丢失,出现消息丢失的情况有以下几种:

1、Producer将消息发送到交换机的路上,因为网路中断等原因,消息丢失;

2、消息到达了交换机,但是交换机根据路由规则如果找不到匹配的queue,消息也会被抛弃;

3、消息发送到Broker后会存放在内存中(没有开启持久化的情况下),如果Consumer还没来得及消费,Broker挂掉了,那么消息也会丢失;

4、默认情况下,RabbitMQ开启的是自动确认autoack,即Consumer收到消息还没有处理时,就会返回一个确认应答ack,Broker收到ack,则认为消息已消费成功,会将该消息从队列中删除。如果Consumer在返回ack后宕机或者在处理消息时发生异常,就会造成消息丢失;

02PART怎么保障可靠传输?

要保证消息可靠传输,就要解决上面的几个问题,下面我们来看一下怎么解决这些问题。

1、生产者将消息发送到交换机的路上,消息丢失怎么办? 一般有两种解决方案: ① 开启事务,Producer发送消息时开启事务,如果发生异常事务回滚,但开启事务就意味着降低性能,尤其在高并发环境,慎用,本文demo不用这种方法! ② 开启生产者confirm机制,在SpringBoot中,在application.yml配置文件中设置spring.rabbitmq.publisher-confirm-type=correlated,生产类实现RabbitTemplate.ConfirmCallback接口,重写confirm方法,不管消息是否成功发送到交换机,都会回调confirm方法,可以在这个方法里做判断处理。

2、消息到达了交换机,但是没有找到匹配的队列造成的消息丢失怎么应对?

SpringBoot中,在application.yml配置文件中设置spring.rabbitmq.publisher-returns=true,生产者类实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法,在交换机没有找到匹配队列时会回调returnedMessage方法,可在这个方法内做重试补偿。如果找到了匹配队列,就不再回调这个方法。 3、消息到达Broker后会暂存在内存中,如果Consumer还没来得及消费,Broker挂掉了,导致的消息丢失问题怎么解决? 设置持久化,以保证即使Broker挂掉,服务重启后消息依然会恢复,

① 将Exchange设置为持久化的;

② 将Queue设置为持久化的;

③ 将message设置为持久化的。 但这里还有个问题,设置了持久化的消息存入Broker之后,还需要一段时间才能存入磁盘(虽然很短,但不能忽视),因为Broker并不会为每条消息都实时同步存盘,如果这个很短的时间内发生宕机、异常、重启等情况,消息也会丢失,解决办法是引入"RabbitMQ的镜像队列"(RabbitMQ集群,Master挂了Slave会换上去充当Master)。 4、Consumer宕机或处理消息时发生异常导致的消息丢失怎么应对? 关闭消费者自动确认autoack,开启手动ack(将autoAck参数为false)。 消费者处理完消息后再发送ack给Broker,Broker收到ack再将该消息从队列中移除。 如果Broker没有收到ack并检测到消费者的连接已断开,则Broker会将该消息发送给其它消费者(如果存在多个消费者)或重发给该消费者。 这里不会存在timeout的问题,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的client连接断开。 默认情况下,如果消费者执行抛出异常, Rabbitmq 会自动实现补偿机制,重新发送消息给消费者。SpringBoot集成RabbitMQ的@RabbitListener注解底层使用AOP进行拦截,如果Aop拦截到异常的话,会自动实现补偿机制,该消息会一直缓存在Rabbitmq服务器进行重发,一直重试到不抛出异常为止。默认5s重试一次,最大重试次数无数次,即一直重试,为了避免死循环,我们可以修改重试参数。 我们可以在application.yml文件中配置最大重试次数、重试时间间隔等参数,如果达到最大重试次数还未成功,我们再做一些干预措施(如: 邮件报警、消息落库,或者加入死信队列等)。03PART我们怎么做?

① 生产者重试补偿

    开启生产者Confirm机制,当捕获到 "消息发送到交换机时发生异常" 或 "交换机根据路由规则没有找到匹配的队列" 时,开始重试,设置最大重试次数为3,达到最大重试次数时还未成功,则做干预处理(如:邮件报警、消息落库等);

② 设置exchange、queue、message 持久化 ;

③ 关闭消费者autoack,开启手动ack;

④ 消费者重试补偿

    设置最大重试次数为3,达到3次还未消费成功,作干预处理(如:邮件报警、消息落库,或者加入死信队列等)。将重试的次数存放在redis中,利用redis的自增命令每次+1,生产者在发送消息时携带一个全局的correlationId,消费端消费时用correlationId作为key,重试次数作为value放在redis中用来控制重试次数。

04PART一个可靠传输的demo

新建2个项目,一个 rabbitmq-provider (生产者),一个rabbitmq-consumer(消费者),构建一个生产者、一个direct交换机、一个queue、一个消费者,实现一个"路由模式"的demo,保证可靠传输。

1、rabbitmq-provider项目

> pom依赖

    org.springframework.boot    spring-boot-starter-amqp    2.2.4.RELEASE

> application.yml

spring:  rabbitmq:    host: xxx.xxx.xx.xxx    port: 5672    virtual-host: felix-vHost    username: long.yuan    password: long.yuan    listener:          simple:            prefetch: 1            # 设置生产者手动确认manual            acknowledge-mode: manual    # 消息发送到交换机的确认    publisher-confirm-type: correlated    # 交换机找到匹配队列的确认    publisher-returns: true

> 配置类

@Configurationpublic class RabbitConfig {    /**     * @Title 创建一个持久化队列     * @Description 默认就是持久化的     * @Author long.yuan     * @Date 2020/2/29 17:24     * @Param []     * @return org.springframework.amqp.core.Queue     **/    @Bean    public Queue durableQueue(){        return new Queue("durable-queue");    }    /**     * @Title 声明一个Direct类型的交换机     * @Description 默认也是持久化的     * @Author long.yuan     * @Date 2020/2/29 14:44     * @Param []     * @return org.springframework.amqp.core.DirectExchange     **/    @Bean    DirectExchange directExchange(){        return new DirectExchange("durable-exchange");    }    /**     * @Title 绑定队列和交换机(绑定的时候指定BindingKey)     * @Description 如果队列和交换机都是持久化的,那么他们之间的Binding也是持久化的     * @Author long.yuan     * @Date 2020/2/29 17:31     * @Param [reliableQueue, directExchange]     * @return org.springframework.amqp.core.Binding     **/    @Bean    Binding bindExchange(Queue reliableQueue,DirectExchange directExchange){        return BindingBuilder.bind(reliableQueue).to(directExchange).with("rabbit.long.yuan");    }}

> 生产者

@Componentpublic class Publisher implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{    private RabbitTemplate rabbitTemplate;    @Autowired    public Publisher(RabbitTemplate rabbitTemplate) {        this.rabbitTemplate = rabbitTemplate;        rabbitTemplate.setConfirmCallback(this);        rabbitTemplate.setReturnCallback(this);    }    public void sendMessage() {        System.out.println("Send Message ...");        // MessageProperties实体的CorrelationId属性 是传到消费者唯一标识该消息的        MessageProperties properties = new MessageProperties();        properties.setCorrelationId(UUID.randomUUID().toString());        Message message = new Message("Hello RabbitMQ".getBytes(), properties);        // 使用convertAndSend方法发送,消息默认就是持久化的        // 发送消息时CorrelationData实体的ID属性是在发送回调时标识该消息的        rabbitTemplate.convertAndSend("durable-exchange", "rabbit.long.yuan1", message, new CorrelationData(UUID.randomUUID().toString()));    }    /**     * @Title 消息发送到交换机上的回调     * @Description 消息发送到交换机上的回调     * @Author long.yuan     * @Date 2020/2/29 18:22     * @Param [correlationData, ack, cause]     * @return void     **/    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        if (ack){            System.out.println("消息发送到交换机成功!");        }else{            System.out.println("消息发送到交换机失败!");        }    }    /**     * @Title 交换机没有找到匹配队列时的回调     * @Description 注意,这个方法只有在交换机没有找到匹配队列时才会回调,如果找到了匹配队列,就不再回调这个方法     * @Author long.yuan     * @Date 2020/2/29 18:24     * @Param [message, replyCode, replyText, exchange, routingKey]     * @return void     **/    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        System.out.println("交换机没有找到匹配队列!");    }}
2、rabbitmq-consumer项目

> pom依赖

    org.springframework.boot    spring-boot-starter-amqp    2.2.4.RELEASE    org.springframework.boot    spring-boot-starter-data-redis

> application.yml文件

spring:  rabbitmq:    host: xxx.xxx.xx.xxx    port: 5672    virtual-host: felix-vHost    username: long.yuan    password: long.yuan    listener:      simple:        # 公平分发        prefetch: 1        # 设置消费者手动确认(只有消费者手动ack后才将消息从队列中删除)        acknowledge-mode: manual        # 重试次数超过设置后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)        default-requeue-rejected: false        retry:          # 消费失败时是否重试          enabled: true          # 最大重试次数(默认无数次)          max-attempts: 3          # 重试间隔(单位毫秒)          initial-interval: 10000  # Redis  redis:    host: xxx.xxx.xx.xxx    port: 6379    password: 'xxxxxx'

> 配置类

@Configurationpublic class RabbitConfig {    /**     * @Title 创建一个持久化队列     * @Description 默认就是持久化的     * @Author long.yuan     * @Date 2020/2/29 17:24     * @Param []     * @return org.springframework.amqp.core.Queue     **/    @Bean    public Queue durableQueue(){        return new Queue("durable-queue");    }    /**     * @Title 声明一个Direct类型的交换机     * @Description 默认也是持久化的     * @Author long.yuan     * @Date 2020/2/29 14:44     * @Param []     * @return org.springframework.amqp.core.DirectExchange     **/    @Bean    DirectExchange directExchange(){        return new DirectExchange("durable-exchange");    }    /**     * @Title 绑定队列和交换机(绑定的时候指定BindingKey)     * @Description 如果队列和交换机都是持久化的,那么他们之间的Binding也是持久化的     * @Author long.yuan     * @Date 2020/2/29 17:31     * @Param [reliableQueue, directExchange]     * @return org.springframework.amqp.core.Binding     **/    @Bean    Binding bindExchange(Queue reliableQueue,DirectExchange directExchange){        return BindingBuilder.bind(reliableQueue).to(directExchange).with("rabbit.long.yuan");    }}

> 消费者

@Componentpublic class Consumer {    @Autowired    StringRedisTemplate stringRedisTemplate;    /**     * @Title 消费者监听     * @Description 消费者监听     * @Author long.yuan     * @Date 2020/2/29 20:00     * @Param [message]     * @return void     **/    @RabbitListener(queues = "durable-queue")    @RabbitHandler    public void process(Message message, Channel channel) throws IOException {        System.out.println("监听队列: " + message);        // 用CorrelationId作为key,value为重试次数        if (stringRedisTemplate.opsForValue().get(message.getMessageProperties().getCorrelationId())==null){            stringRedisTemplate.opsForValue().set(message.getMessageProperties().getCorrelationId(),"1");        }        stringRedisTemplate.opsForValue().increment(message.getMessageProperties().getCorrelationId(), 1);        if (Integer.parseInt(stringRedisTemplate.opsForValue().get(message.getMessageProperties().getCorrelationId())) >= 3){            // 最多重试3次            System.out.println("重试3次仍然消费异常,触发报警邮件人工干预!");            // 达到重试次数后在这里可以做干预操作(如消息入库、邮件报警、死信队列等等)            // 返回ack,将消息从队里删除不要再重试了            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            return;        }        // 开始业务操作...        // 发送过来的message是非数字字符串"Hello RabbitMQ",在转换为int时会报异常        Integer.parseInt(message.getBody().toString());        // 手动确认        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        System.out.println("手动ack完成");    }}

保证RobbitMQ可靠传输,首先要搞清楚可能出现消息丢失或异常的情况有哪些,再针对这些情况一一寻求解决方案。

另外还有一个问题应该也属于可靠传输考虑的范畴,就是"消息的幂等性",幂等性我们在下一篇单独说,感兴趣的朋友可以持续关注。

205f43d6e6bcaa67bb4ff02ea4e89ef6.png

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号