当前位置:   article > 正文

RabbitMQ的消息确认ACK机制_什么是rabbitmq的ack机制

什么是rabbitmq的ack机制

RabbitMQ的消息确认ACK机制

1、什么是消息确认ACK。

如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。

2、ACK的消息确认机制。

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
    如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
    消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
    消息的ACK确认机制默认是打开的。

3、ACK机制的开发注意事项。

如果忘记了ACK,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。

代码

生产者。控制层的触发生产者生产消息,这里只生产一条消息。方便观察现象。

package com.example.bie.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.example.bie.provider.RabbitMqLogErrorProduce;
import com.example.bie.provider.RabbitMqLogInfoProduce;

/**
 *
 * @author biehl
 *
 */
@Controller
public class RabbitmqController {

    @Autowired
    private RabbitMqLogInfoProduce rabbitMqLogInfoProduce;

    @Autowired
    private RabbitMqLogErrorProduce rabbitMqLogErrorProduce;

    @RequestMapping(value = "/logInfo")
    @ResponseBody
    public String rabbitmqSendLogInfoMessage() {
        String msg = "生产者===>生产者的LogInfo消息message: ";
        for (int i = 0; i < 1; i++) {
            rabbitMqLogInfoProduce.producer(msg + i);
        }
        return "生产===>  LogInfo消息message  ===> success!!!";
    }

    @RequestMapping(value = "/logError")
    @ResponseBody
    public String rabbitmqSendLogErrorMessage() {
        String msg = "生产者===>生产者的LogError消息message: ";
        for (int i = 0; i < 1; i++) {
            rabbitMqLogErrorProduce.producer(msg + i);
        }
        return "生产===>  LogError消息message  ===> success!!!";
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

消费者消费消息,打印输出后面手动抛出运行时异常,观察现象。

package com.example.bie.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 *
 * @author biehl
 *
 *         消息接收者
 *
 *         1、@RabbitListener bindings:绑定队列
 *
 *         2、@QueueBinding
 *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
 *
 *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
 *
 *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
 *
 *
 */
@Component
@RabbitListener(bindings = @QueueBinding(

        value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),

        exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),

        key = "${rabbitmq.config.queue.error.routing.key}"))
public class LogErrorConsumer {

    /**
     * 接收消息的方法,采用消息队列监听机制.
     *
     * @RabbitHandler意思是将注解@RabbitListener配置到类上面
     *
     * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
     *
     * @param msg
     */
    @RabbitHandler
    public void consumer(String msg) {
        // 打印消息
        System.out.println("ERROR消费者===>消费<===消息message: " + msg);
        throw new RuntimeException();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

观察现象,如下所示

###在RabbitMQ的浏览器界面,可以看到一条消息未被进行ACK的消息确认机制,这条消息被锁定Unacked,所以一直在控制台进行报错。

在这里插入图片描述

控制台效果如下所示,一直进行消息的发送,因为消费方一直没有返回ACK确认,RabbitMQ认为消息未进行正常的消费,会将消息再次放入到队列中,再次让你消费,但是还是没有返回ACK确认,依次循环,形成了死循环。
在这里插入图片描述
如何解决问题呢,如果消息发送的时候,程序出现异常,后果很严重的,会导致内存泄漏的,所以在程序处理中可以进行异常捕获,保证消费者的程序正常执行,这里不进行介绍了。第二种方式可以使用RabbitMQ的ack确认机制。开启重试,然后重试次数,默认为3次。这里设置为5次。

# 给当前项目起名称.
spring.application.name=rabbitmq-ack-direct-consumer

# 配置端口号
server.port=8080

# 配置rabbitmq的参数.
# rabbitmq服务器的ip地址.
spring.rabbitmq.host=192.168.110.133
# rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
spring.rabbitmq.port=5672
# rabbitmq的账号.
spring.rabbitmq.username=guest
# rabbitmq的密码.
spring.rabbitmq.password=guest

# 设置交换器的名称,方便修改.
# 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.
rabbitmq.config.exchange=log.exchange.direct

# info级别的队列名称.
rabbitmq.config.queue.info=log.info.queue
# info的路由键.
rabbitmq.config.queue.info.routing.key=log.info.routing.key

# error级别的队列名称.
rabbitmq.config.queue.error=log.error.queue
# error的路由键.
rabbitmq.config.queue.error.routing.key=log.error.routing.key

# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数,默认为3次
spring.rabbitmq.listener.simple.retry.max-attempts=5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

效果如下所示:

可以看到控制台尝试了5次以后就不再进行重试了。
在这里插入图片描述
RabbitMQ的界面可以看到,开始的效果和上面的一致,但是5次尝试以后,就变成了0条。RabbitMQ将这条消息丢弃了。

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/878428
推荐阅读
相关标签
  

闽ICP备14008679号