赞
踩
1:什么是消息确认ACK
如果在处理消息的过程中,消费者的服务在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不丢失,RabbitMQ支持消息确定
2:ACK的消息确认机制
ACK机制实消费者从RabbitMQ收到消息并完成处理后,反馈给RabbitMQ,MQ收到反馈后才会将此消息从队列中删除
如果一个消费者在处理消息出现了网络不稳定服务器异常等现象,那么就不会有ACK反馈,mq会认为这个消息没有被正常消费,会将消息重新放入队列中。
如果在消息集群的情况下,mq会立即将消息推送给这个在线的其他消费者,保证了在消费者服务端故障的时候,不会丢失任何消息和任务
消息永远不会从RabbitMq中删除,只有当消费者正确发送ACK反馈,mq确认收到后才会从mq的服务器中将数据删除
3:ACK机制注意事项
如果忘记了ACK,那么后果很严重。当Consumer退出的时候,Message会一致重新分发,然后mq占据的内存越来越多,会导致内存泄漏。
4:结合项目实例
首先建立一个生产者:
- package com.tmnch.pinan;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.support.CorrelationData;
- import org.springframework.beans.factory.annotation.Autowired;
-
- import java.io.IOException;
- import java.util.Date;
- import java.util.concurrent.TimeoutException;
-
- /**
- * * Description:
- * *
- * * @author thw
- * * @date 2020/7/14 14:01
- *
- */
- public class Producer {
- private static final String EXCHANGE_NAME = "exchange_pinan";
- private static final String ROUTING_KEY = "routingkey_pinan";
- private static final String QUEUR_NAME = "queue_pinan";
- //rabbitmq的服务地址
- private static final String IP_ADDRESS = "localhost";
- //RabbitMq 服务端 默认端口号为5672
- private static final int PORT = 5672;
- //用户名
- private static final String USER_NAME = "guest";
- //密码
- private static final String PASSWORD = "guest";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory connectionFactory = new ConnectionFactory();
-
- connectionFactory.setHost(IP_ADDRESS);
- connectionFactory.setPort(PORT);
- connectionFactory.setUsername(USER_NAME);
- connectionFactory.setPassword(PASSWORD);
-
- /**
- * 创建连接
- */
- Connection connection = connectionFactory.newConnection();
- /**
- * 创建信道
- */
- Channel channel = connection.createChannel();
-
- /**
- * 创建一个type=direct 持久化的 非自动删除的交换器
- * 交换器名称
- * 交换器类型
- * durable是否持久化
- * autodelete是否自动删除
- * 其他参数
- */
- channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
-
- /**
- * 将交换器与队列通过路由键绑定
- */
- channel.queueBind(QUEUR_NAME, EXCHANGE_NAME, ROUTING_KEY);
- // channel.basicAck();
- /* channel.basicConsume(QUEUE_NAME, true, consumer);
-
- channel.basicConsume(QUEUR_NAME,false,)*/
- /**
- * 发送一条持 久化消息
- */
- Date date =new Date();
- String message = date.toString();
- message=message+"平安反渗漏接口message";
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
-
- /**
- * 关闭资源
- */
- channel.close();
- connection.close();
- /* //使用rabbitmq模板
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //发送消息
- public void sendOrder(Order order) throws Exception{
-
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId(order.getMessageId());
-
- rabbitTemplate.convertAndSend("order-exchange",//exchange
- "order.abcd",//routingKey
- order,//消息体内容
- correlationData); //消息唯一id
- }*/
- }
- }
2:新建springboot项目,引入mq依赖后,修改qpplication.yml
- rabbitmq:
- host: localhost #服务器IP
- port: 5672 #默认端口
- username: guest #登录名
- password: guest #密码
- listener: #消费者监听配置
- simple:
- acknowledge-mode: manual #开启手动ACK确认
- retry:
- enabled: true #开启重发机制
- max-attempts: 5 #重发次数
新建TopicRabbitConfig.java
- package com.pingan.mq;
-
- import org.springframework.context.annotation.Bean;
-
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Configuration;
- /**
- * * Description:
- * 建topic类型的mq所用
- * *
- * * @author thw
- * * @date 2020/7/29 17:38
- *
- */
- public class TopicRabbitConfig {
- //绑定键#路由键值
- public final static String man = "routingkey_pinan";
- @Bean
- public Queue firstQueue() {
- return new Queue(TopicRabbitConfig.man);
- }
- @Bean
- TopicExchange exchange() {
- return new TopicExchange("exchange_pinan");
- }
- //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
- //这样只要是消息携带的路由键是topic.man,才会分发到该队列
- @Bean
- Binding bindingExchangeMessage() {
- return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
- }
- }
新建TopicReceiver.java
- package com.tmnch.tmnchpinan.controller;
-
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
- import java.util.Map;
-
- /**
- * * Description:
- * *建立监听队列进入方法
- * * @author thw
- * * @date 2020/7/29 17:28
- *
- */
- @Component
- @RabbitListener(queues = "queue_pinan")//监听的队列名称
- public class TopicReceiver {
- @RabbitHandler
- public void process(String testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
- System.out.println("消费者消息: " + testMessage.toString());
- try {
- channel.basicAck(tag, false); //第一个参数为该条消息在mq中的唯一序列号。第二个
- //参数为是否也确认其他消息被消费
- System.out.println("序列号:"+tag+":ack接收success");
- } catch (IOException e) {
- e.printStackTrace();
- System.out.println("序列号:"+tag+":ack接收fail");
- }
- }
-
-
- }
配送pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.3.1.RELEASE</version>
- <relativePath/> <!--<!– lookup parent from repository –>-->
- </parent>
- <groupId>com.tmnch</groupId>
- <artifactId>tmnchpinan</artifactId>
- <version>4.0-SNAPSHOT</version>
- <name>tmnchpinan</name>
- <description>pingan</description>
- <properties>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>1.1.1</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <scope>runtime</scope>
- </dependency>
- <!--org.springframework.boot-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- <version>1.5.4.RELEASE</version>
- <!--<version>2.1.4.RELEASE</version>-->
- </dependency>
- <!-- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-messaging</artifactId>
- <version>4.2.4.RELEASE</version>
- </dependency>-->
- <!-- fastjson-->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.6</version>
- </dependency>
- <!--lombok -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.16.8</version>
- </dependency>
- <!--rabbitmq依赖-->
- <!--<dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>4.1.1</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.6.2.RELEASE</version>
- </dependency>-->
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
-
- </dependencies>
-
-
- <!--build依赖-->
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- <resources>
- <resource>
- <directory>src/main/java</directory>
- <includes>
- <include>**/*.xml</include>
- </includes>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- <includes>
- <include>**/*.*</include>
- </includes>
- </resource>
- </resources>
- </build>
- </project>
或者
- # 给当前项目起名称.
- spring.application.name=rabbitmq-ack-direct-consumer
-
- # 配置端口号
- server.port=8080
-
- # 配置rabbitmq的参数.
- # rabbitmq服务器的ip地址.
- spring.rabbitmq.host=192.168.110.133
- # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
- spring.rabbitmq.port=5672
- # rabbitmq的账号.
- spring.rabbitmq.username=guest
- # rabbitmq的密码.
- spring.rabbitmq.password=guest
-
- # 设置交换器的名称,方便修改.
- # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.
- rabbitmq.config.exchange=log.exchange.direct
-
- # info级别的队列名称.
- rabbitmq.config.queue.info=log.info.queue
- # info的路由键.
- rabbitmq.config.queue.info.routing.key=log.info.routing.key
-
- # error级别的队列名称.
- rabbitmq.config.queue.error=log.error.queue
- # error的路由键.
- rabbitmq.config.queue.error.routing.key=log.error.routing.key
-
- # 开启重试
- spring.rabbitmq.listener.simple.retry.enabled=true
- # 重试次数,默认为3次
- spring.rabbitmq.listener.simple.retry.max-attempts=5
消费类:
- package com.example.bie.consumer;
-
- import org.springframework.amqp.core.ExchangeTypes;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- *
- * @author biehl
- *
- * 消息接收者
- *
- * 1、@RabbitListener bindings:绑定队列
- *
- * 2、@QueueBinding
- * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
- *
- * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
- *
- * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
- *
- *
- */
- @Component
- @RabbitListener(bindings = @QueueBinding(
-
- value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
-
- exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
-
- key = "${rabbitmq.config.queue.error.routing.key}"))
- public class LogErrorConsumer {
-
- /**
- * 接收消息的方法,采用消息队列监听机制.
- *
- * @RabbitHandler意思是将注解@RabbitListener配置到类上面
- *
- * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
- *
- * @param msg
- */
- @RabbitHandler
- public void consumer(String msg) {
- // 打印消息
- System.out.println("ERROR消费者===>消费<===消息message: " + msg);
- // throw new RuntimeException(); 用户确认ack是否正常
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。