当前位置:   article > 正文

rabbitMQ-生产者可靠发送-消息确认-Confirm_publisher-confirms: true

publisher-confirms: true

1、rabbitMQ可能存在的数据不一致问题原因

  • 生产者发送消息到Broker消息服务器(消息发送失败)
  • rabbitmq服务器自身故障导致消息丢失
  • 消息消费者接收消息后处理失败(消费消息失败)

2、生产者发送消息防止消息丢失

  •  使用rabbitMq的事务机制,效率极低,并且失去了异步的初衷,所以建议不适用
  • 使用confirm消息确认机制,比上面的事务机制要好一些,但是当访问量上来之后,由于频繁的确认交互,也会很大程度降低效率,所以如果不是很重要的消息,也不建议使用。
  • 其实confirm机制只适用于生产者发送到exchange交换机的回调,如果交换机没有路由到queue中,就需要开启return机制了。

3、springBoot演示消息确认机制

3.1引入依赖

  1. <!--引入rabbitmq继承依赖-->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.amqp</groupId>
  8. <artifactId>spring-rabbit-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>

3.2配置application.yml

  1. spring:
  2. application:
  3. name: rabbitmq-springboot
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. username: ems
  8. password: 123
  9. virtual-host: /ems
  10. # 发送者开启 confirm 确认机制
  11. publisher-confirms: true
  12. # 发送者开启 return 确认机制
  13. publisher-returns: true
  14. listener:
  15. simple:
  16. # 设置消费端手动 ack
  17. acknowledge-mode: manual
  18. # 是否支持重试
  19. retry:
  20. enabled: true

3.3管理交换机、队列的Bean的配置类

  1. package com.lst;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class QueueConfig {
  8. //定义队列
  9. @Bean(name = "TestQueue")
  10. public Queue returnTestQueue() {
  11. return new Queue("returnTestQueue", true, false, false);
  12. }
  13. //定义交换机
  14. @Bean(name = "TestExchange")
  15. public DirectExchange returnTestExchange() {
  16. return new DirectExchange("TestExchange");
  17. }
  18. //将队列绑定交换机
  19. @Bean
  20. public Binding confirmTestExchangeAndQueue(
  21. @Qualifier("TestExchange") DirectExchange TestExchange,
  22. @Qualifier("TestQueue") Queue TestQueue) {
  23. return BindingBuilder.bind(TestQueue).to(TestExchange).with("testRoutingKey");
  24. }
  25. }

 3.4生产者测试类

  1. package com.lst;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.boot.test.context.SpringBootTest;
  8. import org.springframework.test.context.junit4.SpringRunner;
  9. @SpringBootTest(classes = RabbitmqSpringbootApplication.class)
  10. @RunWith(SpringRunner.class)
  11. public class TestRabbitMq {
  12. //注入rabbitTemplate对象
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. /**
  16. * 确认模式步骤
  17. * 1、确认模式开启 publisher-confirms: true
  18. * 2、在rabbitTemple中定义回调函数
  19. */
  20. @Test
  21. public void test(){
  22. //定义回调
  23. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  24. /**
  25. *
  26. * @param correlationData 相关的配置
  27. * @param b :ack 交换机是否成功收到了消息 true代表成功 false代表失败
  28. * @param s :失败的原因
  29. */
  30. @Override
  31. public void confirm(CorrelationData correlationData, boolean b, String s) {
  32. if(b){//接收成功
  33. System.out.println("发送成功");
  34. }else{//接受失败
  35. System.out.println("接收失败"+s);
  36. }
  37. }
  38. });
  39. //发送消息
  40. rabbitTemplate.convertAndSend("TestExchange","testRoutingKey","hello");
  41. }
  42. }

3.5 测试结果

进行到这,出现了一个问题,虽然消息成功进入了队列,但是返回的ack一直是false,查阅资料得知,因为我这里的生产者使用test方法模拟,所以当消息发送之后,connection就会被立即销毁,所以。当测试方法结束,rabbitmq相关的资源也就关闭了,虽然我们的消息发送出去,但异步的ConfirmCallback却由于资源关闭而出现了上面的问题。

所以我改用rest风格的接口访问

  1. package com.lst.hello;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.PostMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. public class TestRabbitMq {
  9. //注入rabbitTemplate对象
  10. @Autowired
  11. private RabbitTemplate rabbitTemplate;
  12. /**
  13. * 确认模式步骤
  14. * 1、确认模式开启 connetionfactory中开启publisher—confirm = "true"
  15. * 2、在rabbitTemple中定义回调函数
  16. */
  17. @PostMapping("/test")
  18. public void test() {
  19. //定义回调
  20. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  21. /**
  22. *
  23. * @param correlationData 相关的配置
  24. * @param b :ack 交换机是否成功收到了消息 true代表成功 false代表失败
  25. * @param s :失败的原因
  26. */
  27. @Override
  28. public void confirm(CorrelationData correlationData, boolean b, String s) {
  29. if (b) {//接收成功
  30. System.out.println("发送成功");
  31. } else {//接受失败
  32. System.out.println("接收失败" + s);
  33. }
  34. }
  35. });
  36. //发送消息
  37. rabbitTemplate.convertAndSend("TestExchange", "testRoutingKey", "hello confirm");
  38. }
  39. }

调用之后

成功示例

 失败示例:

我们随意更改以下交换机的名称,使其错误,再调用一下

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/798523
推荐阅读
相关标签
  

闽ICP备14008679号