当前位置:   article > 正文

RabbitMq的消费确认ACK_rabbitmq channel.basicack

rabbitmq channel.basicack

1:什么是消息确认ACK

如果在处理消息的过程中,消费者的服务在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不丢失,RabbitMQ支持消息确定

2:ACK的消息确认机制

ACK机制实消费者从RabbitMQ收到消息并完成处理后,反馈给RabbitMQ,MQ收到反馈后才会将此消息从队列中删除

如果一个消费者在处理消息出现了网络不稳定服务器异常等现象,那么就不会有ACK反馈,mq会认为这个消息没有被正常消费,会将消息重新放入队列中。

如果在消息集群的情况下,mq会立即将消息推送给这个在线的其他消费者,保证了在消费者服务端故障的时候,不会丢失任何消息和任务

消息永远不会从RabbitMq中删除,只有当消费者正确发送ACK反馈,mq确认收到后才会从mq的服务器中将数据删除

3:ACK机制注意事项
如果忘记了ACK,那么后果很严重。当Consumer退出的时候,Message会一致重新分发,然后mq占据的内存越来越多,会导致内存泄漏。

4:结合项目实例

首先建立一个生产者:

  1. package com.tmnch.pinan;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import com.rabbitmq.client.MessageProperties;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.rabbit.support.CorrelationData;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import java.io.IOException;
  10. import java.util.Date;
  11. import java.util.concurrent.TimeoutException;
  12. /**
  13. *  * Description: 
  14. *  *
  15. *  * @author thw
  16. *  * @date 2020/7/14 14:01
  17. *  
  18. */
  19. public class Producer {
  20. private static final String EXCHANGE_NAME = "exchange_pinan";
  21. private static final String ROUTING_KEY = "routingkey_pinan";
  22. private static final String QUEUR_NAME = "queue_pinan";
  23. //rabbitmq的服务地址
  24. private static final String IP_ADDRESS = "localhost";
  25. //RabbitMq 服务端 默认端口号为5672
  26. private static final int PORT = 5672;
  27. //用户名
  28. private static final String USER_NAME = "guest";
  29. //密码
  30. private static final String PASSWORD = "guest";
  31. public static void main(String[] args) throws IOException, TimeoutException {
  32. ConnectionFactory connectionFactory = new ConnectionFactory();
  33. connectionFactory.setHost(IP_ADDRESS);
  34. connectionFactory.setPort(PORT);
  35. connectionFactory.setUsername(USER_NAME);
  36. connectionFactory.setPassword(PASSWORD);
  37. /**
  38. * 创建连接
  39. */
  40. Connection connection = connectionFactory.newConnection();
  41. /**
  42. * 创建信道
  43. */
  44. Channel channel = connection.createChannel();
  45. /**
  46. * 创建一个type=direct 持久化的 非自动删除的交换器
  47. * 交换器名称
  48. * 交换器类型
  49. * durable是否持久化
  50. * autodelete是否自动删除
  51. * 其他参数
  52. */
  53. channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
  54. /**
  55. * 将交换器与队列通过路由键绑定
  56. */
  57. channel.queueBind(QUEUR_NAME, EXCHANGE_NAME, ROUTING_KEY);
  58. // channel.basicAck();
  59. /* channel.basicConsume(QUEUE_NAME, true, consumer);
  60. channel.basicConsume(QUEUR_NAME,false,)*/
  61. /**
  62. * 发送一条持 久化消息
  63. */
  64. Date date =new Date();
  65. String message = date.toString();
  66. message=message+"平安反渗漏接口message";
  67. channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  68. /**
  69. * 关闭资源
  70. */
  71. channel.close();
  72. connection.close();
  73. /* //使用rabbitmq模板
  74. @Autowired
  75. private RabbitTemplate rabbitTemplate;
  76.    //发送消息
  77. public void sendOrder(Order order) throws Exception{
  78.      
  79. CorrelationData correlationData = new CorrelationData();
  80. correlationData.setId(order.getMessageId());
  81. rabbitTemplate.convertAndSend("order-exchange",//exchange
  82. "order.abcd",//routingKey
  83. order,//消息体内容
  84. correlationData); //消息唯一id
  85. }*/
  86. }
  87. }

2:新建springboot项目,引入mq依赖后,修改qpplication.yml

  1. rabbitmq:
  2. host: localhost #服务器IP
  3. port: 5672 #默认端口
  4. username: guest #登录名
  5. password: guest #密码
  6. listener: #消费者监听配置
  7. simple:
  8. acknowledge-mode: manual #开启手动ACK确认
  9. retry:
  10. enabled: true #开启重发机制
  11. max-attempts: 5 #重发次数

新建TopicRabbitConfig.java

  1. package com.pingan.mq;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.amqp.core.Binding;
  4. import org.springframework.amqp.core.BindingBuilder;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.core.TopicExchange;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. *  * Description: 
  10. * 建topic类型的mq所用
  11. *  *
  12. *  * @author thw
  13. *  * @date 2020/7/29 17:38
  14. *  
  15. */
  16. public class TopicRabbitConfig {
  17. //绑定键#路由键值
  18. public final static String man = "routingkey_pinan";
  19. @Bean
  20. public Queue firstQueue() {
  21. return new Queue(TopicRabbitConfig.man);
  22. }
  23. @Bean
  24. TopicExchange exchange() {
  25. return new TopicExchange("exchange_pinan");
  26. }
  27. //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man
  28. //这样只要是消息携带的路由键是topic.man,才会分发到该队列
  29. @Bean
  30. Binding bindingExchangeMessage() {
  31. return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
  32. }
  33. }

新建TopicReceiver.java

  1. package com.tmnch.tmnchpinan.controller;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import java.io.IOException;
  9. import java.util.Map;
  10. /**
  11. *  * Description: 
  12. *  *建立监听队列进入方法
  13. *  * @author thw
  14. *  * @date 2020/7/29 17:28
  15. *  
  16. */
  17. @Component
  18. @RabbitListener(queues = "queue_pinan")//监听的队列名称
  19. public class TopicReceiver {
  20. @RabbitHandler
  21. public void process(String testMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  22. System.out.println("消费者消息: " + testMessage.toString());
  23. try {
  24. channel.basicAck(tag, false); //第一个参数为该条消息在mq中的唯一序列号。第二个
  25. //参数为是否也确认其他消息被消费
  26. System.out.println("序列号:"+tag+":ack接收success");
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. System.out.println("序列号:"+tag+":ack接收fail");
  30. }
  31. }
  32. }

配送pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.3.1.RELEASE</version>
  9. <relativePath/> <!--&lt;!&ndash; lookup parent from repository &ndash;&gt;-->
  10. </parent>
  11. <groupId>com.tmnch</groupId>
  12. <artifactId>tmnchpinan</artifactId>
  13. <version>4.0-SNAPSHOT</version>
  14. <name>tmnchpinan</name>
  15. <description>pingan</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-jdbc</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.mybatis.spring.boot</groupId>
  26. <artifactId>mybatis-spring-boot-starter</artifactId>
  27. <version>1.1.1</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>mysql</groupId>
  31. <artifactId>mysql-connector-java</artifactId>
  32. <scope>runtime</scope>
  33. </dependency>
  34. <!--org.springframework.boot-->
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-test</artifactId>
  38. <scope>test</scope>
  39. <exclusions>
  40. <exclusion>
  41. <groupId>org.junit.vintage</groupId>
  42. <artifactId>junit-vintage-engine</artifactId>
  43. </exclusion>
  44. </exclusions>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.springframework.boot</groupId>
  48. <artifactId>spring-boot-starter-web</artifactId>
  49. <version>1.5.4.RELEASE</version>
  50. <!--<version>2.1.4.RELEASE</version>-->
  51. </dependency>
  52. <!-- <dependency>
  53. <groupId>org.springframework</groupId>
  54. <artifactId>spring-messaging</artifactId>
  55. <version>4.2.4.RELEASE</version>
  56. </dependency>-->
  57. <!-- fastjson-->
  58. <dependency>
  59. <groupId>com.alibaba</groupId>
  60. <artifactId>fastjson</artifactId>
  61. <version>1.2.6</version>
  62. </dependency>
  63. <!--lombok -->
  64. <dependency>
  65. <groupId>org.projectlombok</groupId>
  66. <artifactId>lombok</artifactId>
  67. <version>1.16.8</version>
  68. </dependency>
  69. <!--rabbitmq依赖-->
  70. <!--<dependency>
  71. <groupId>com.rabbitmq</groupId>
  72. <artifactId>amqp-client</artifactId>
  73. <version>4.1.1</version>
  74. </dependency>
  75. <dependency>
  76. <groupId>org.springframework.amqp</groupId>
  77. <artifactId>spring-rabbit</artifactId>
  78. <version>1.6.2.RELEASE</version>
  79. </dependency>-->
  80. <!--rabbitmq-->
  81. <dependency>
  82. <groupId>org.springframework.boot</groupId>
  83. <artifactId>spring-boot-starter-amqp</artifactId>
  84. </dependency>
  85. <dependency>
  86. <groupId>org.springframework.boot</groupId>
  87. <artifactId>spring-boot-starter</artifactId>
  88. </dependency>
  89. </dependencies>
  90. <!--build依赖-->
  91. <build>
  92. <plugins>
  93. <plugin>
  94. <groupId>org.springframework.boot</groupId>
  95. <artifactId>spring-boot-maven-plugin</artifactId>
  96. </plugin>
  97. </plugins>
  98. <resources>
  99. <resource>
  100. <directory>src/main/java</directory>
  101. <includes>
  102. <include>**/*.xml</include>
  103. </includes>
  104. </resource>
  105. <resource>
  106. <directory>src/main/resources</directory>
  107. <includes>
  108. <include>**/*.*</include>
  109. </includes>
  110. </resource>
  111. </resources>
  112. </build>
  113. </project>

或者

  1. # 给当前项目起名称.
  2. spring.application.name=rabbitmq-ack-direct-consumer
  3. # 配置端口号
  4. server.port=8080
  5. # 配置rabbitmq的参数.
  6. # rabbitmq服务器的ip地址.
  7. spring.rabbitmq.host=192.168.110.133
  8. # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
  9. spring.rabbitmq.port=5672
  10. # rabbitmq的账号.
  11. spring.rabbitmq.username=guest
  12. # rabbitmq的密码.
  13. spring.rabbitmq.password=guest
  14. # 设置交换器的名称,方便修改.
  15. # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.
  16. rabbitmq.config.exchange=log.exchange.direct
  17. # info级别的队列名称.
  18. rabbitmq.config.queue.info=log.info.queue
  19. # info的路由键.
  20. rabbitmq.config.queue.info.routing.key=log.info.routing.key
  21. # error级别的队列名称.
  22. rabbitmq.config.queue.error=log.error.queue
  23. # error的路由键.
  24. rabbitmq.config.queue.error.routing.key=log.error.routing.key
  25. # 开启重试
  26. spring.rabbitmq.listener.simple.retry.enabled=true
  27. # 重试次数,默认为3
  28. spring.rabbitmq.listener.simple.retry.max-attempts=5

消费类:

  1. package com.example.bie.consumer;
  2. import org.springframework.amqp.core.ExchangeTypes;
  3. import org.springframework.amqp.rabbit.annotation.Exchange;
  4. import org.springframework.amqp.rabbit.annotation.Queue;
  5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. *
  11. * @author biehl
  12. *
  13. * 消息接收者
  14. *
  15. * 1、@RabbitListener bindings:绑定队列
  16. *
  17. * 2、@QueueBinding
  18. * value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
  19. *
  20. * 3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
  21. *
  22. * 4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
  23. *
  24. *
  25. */
  26. @Component
  27. @RabbitListener(bindings = @QueueBinding(
  28. value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
  29. exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
  30. key = "${rabbitmq.config.queue.error.routing.key}"))
  31. public class LogErrorConsumer {
  32. /**
  33. * 接收消息的方法,采用消息队列监听机制.
  34. *
  35. * @RabbitHandler意思是将注解@RabbitListener配置到类上面
  36. *
  37. * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
  38. *
  39. * @param msg
  40. */
  41. @RabbitHandler
  42. public void consumer(String msg) {
  43. // 打印消息
  44. System.out.println("ERROR消费者===>消费<===消息message: " + msg);
  45. // throw new RuntimeException(); 用户确认ack是否正常
  46. }
  47. }

 

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号