当前位置:   article > 正文

RabbitMQ消息确认

rabbitmq消息确认

目录

1. 消息确认作用

2 开发示例

2.1 生产者确认

2.2 消费者确认


1. 消息确认作用

保证消息的可靠性主要依靠三种机制:一个是消息的持久化,一个是事务机制,一个就是消息的确认机制。

1)消息持久化

消息持久化是将消息写入本地文件,如果rabbitmq故障退出,在重启时会从本地文件系统读取队列数据。

2)事务机制

rabbitmq的事务机制提供了消息生产者和消息服务器(broker)之间的事务的开启,提交,回滚操作(如下图所示)。这套机制可以保证消息可靠性,但也有缺点:由于使用事务机制会导致消息生产者和broker(服务器)交互次数增加,造成性能的浪费,且事务机制是阻塞的,在发送一条消息后需要等待RabbitMQ回应,获取回应后才能发送下一条消息,因此事务机制并不提倡使用(RabbitMQ事务模式与非事务模式在性能上相差可达高达上百倍,具体数值因机器性能和网络环境而不同,但差异都会非常明显)

事务提交流程:

  • 客户端向服务器请求开启事务(tx.select)
  • 服务器端返回响应接收开启事务(tx.select-ok)
  • 推送消息
  • 客户端请求提交事务(tx.commit)
  • 服务器提交事务返回响应(tx.commit-ok)

3)消息确认

消息确认分为:发送者确认,接收方确认。 发送者确认分为:消息到达交换机确认,消息到达与交换机绑定的队列确认。

2 开发示例

用于示例开发基础代码:

git clone -b rabbitmqDemo git@gitee.com:heizifeng/rabbit-mqdemo.git

2.1 生产者确认

因为:每个RabbitTemplate实例只能注册一个ConfirmCallback,所以如果启动web容器并多次调用该方法进行消息发送,则会报异常。(测试用例可以通过,是因为每次测试执行完毕后容器就终止,下次运行时是新的容器)

增加RabbitTemplate的配置类,在配置类中指定消息确认回调方法:

  1. package com.zking.rabbitmqdemo.provied.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. *
  10. * @site www.xiaomage.com
  11. * @company xxx公司
  12. * @create 2021-11-14 10:04
  13. */
  14. @Configuration
  15. @Slf4j
  16. public class RabbitTemplateConfig {
  17. @Bean
  18. public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
  19. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  20. template.setMandatory(true);
  21. template.setMessageConverter(new Jackson2JsonMessageConverter());
  22. template.setEncoding("utf-8");
  23. //实现消息发送到exchange后接收ack回调,publisher-confirms:true
  24. //如果队列是可持久化的,则在消息成功持久化之后生产者收到确认消息
  25. template.setConfirmCallback(((correlationData, ack, cause) -> {
  26. if(ack) {
  27. log.info("消息成功发送到exchange,id:{}", correlationData.getId());
  28. } else {
  29. /*
  30. * 消息未被投放到对应的消费者队列,可能的原因:
  31. * 1)发送时在未找到exchange,例如exchange参数书写错误
  32. * 2)消息队列已达最大长度限制(声明队列时可配置队列的最大限制),此时
  33. * 返回的cause为null
  34. */
  35. log.info("******************************************************");
  36. log.info("11消息发送失败: {}", cause);
  37. }
  38. }));
  39. //消息发送失败返回队列,publisher-returns:true
  40. template.setMandatory(true);
  41. //实现消息发送的exchange,但没有相应的队列于交换机绑定时的回调
  42. template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  43. String id = message.getMessageProperties().getCorrelationId();
  44. log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", id, replyCode, replyText, exchange, routingKey);
  45. });
  46. return template;
  47. }
  48. }

 编写用于发送消息的延迟队列(死信)

  1. package com.zking.rabbitmqdemo.provied.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. /**
  9. *
  10. * @site www.xiaomage.com
  11. * @company xxx公司
  12. * @create 2021-11-12 10:04
  13. */
  14. @Configuration
  15. public class RabbitMQConfig {
  16. @Bean(name="directExchange")
  17. public Exchange directExchange() {
  18. return ExchangeBuilder.directExchange("direct_exchange").durable(true).build();
  19. }
  20. @Bean(name="directQueue")
  21. public Queue directQueue() {
  22. Map<String,Object> args = new HashMap<>();
  23. args.put("x-message-ttl", 1000*60*20);
  24. args.put("x-max-length", 3);
  25. args.put("x-overflow","reject-publish");
  26. return QueueBuilder.durable("direct_queue").withArguments(args).build();
  27. }
  28. @Bean
  29. public Binding directBinding(
  30. @Qualifier("directQueue") Queue queue,
  31. @Qualifier("directExchange") Exchange exchange) {
  32. return BindingBuilder
  33. .bind(queue)
  34. .to(exchange)
  35. .with("direct_exchange_routing_key")
  36. .noargs();
  37. }
  38. @Bean(name="topicExchange")
  39. public Exchange topicExchange() {
  40. return ExchangeBuilder
  41. .topicExchange("topic_exchange")
  42. .durable(true)
  43. .build();
  44. }
  45. @Bean(name="topicQueue1")
  46. public Queue topicQueue1() {
  47. return QueueBuilder.durable("topic_queue_q1").build();
  48. }
  49. @Bean(name="topicQueue2")
  50. public Queue topicQueue2() {
  51. return QueueBuilder.durable("topic_queue_q2").build();
  52. }
  53. @Bean
  54. public Binding topicBindingQ1(
  55. @Qualifier("topicQueue1") Queue queue,
  56. @Qualifier("topicExchange") Exchange exchange) {
  57. return BindingBuilder
  58. .bind(queue)
  59. .to(exchange)
  60. .with("topic.queue.#")
  61. .noargs();
  62. }
  63. @Bean
  64. public Binding topicBindingQ2(
  65. @Qualifier("topicQueue2") Queue queue,
  66. @Qualifier("topicExchange") Exchange exchange) {
  67. return BindingBuilder
  68. .bind(queue)
  69. .to(exchange)
  70. .with("topic.queue.#")
  71. .noargs();
  72. }
  73. //死信队列
  74. @Bean(name="dxlExchange")
  75. public Exchange dxlExchange() {
  76. return ExchangeBuilder.topicExchange("dxl_exchange").durable(true).build();
  77. }
  78. @Bean(name="dxlQueue")
  79. public Queue dxlQueue() {
  80. return QueueBuilder.durable("dxl_queue").build();
  81. }
  82. @Bean
  83. public Binding bindingDxl(
  84. @Qualifier("dxlQueue") Queue queue,
  85. @Qualifier("dxlExchange") Exchange exchange) {
  86. return BindingBuilder
  87. .bind(queue)
  88. .to(exchange)
  89. .with("routing_dxl_key")
  90. .noargs();
  91. }
  92. @Bean(name="usualExchange")
  93. public Exchange usualExchange() {
  94. return ExchangeBuilder.directExchange("usual_direct_exchange").durable(true).build();
  95. }
  96. @Bean(name="usualQueue")
  97. public Queue usualQueue() {
  98. return QueueBuilder.durable("usual_queue")
  99. .withArgument("x-message-ttl", 1000*60*2)
  100. .withArgument("x-max-length", 5)
  101. .withArgument("x-dead-letter-exchange", "dxl_exchange")
  102. .withArgument("x-dead-letter-routing-key", "routing_dxl_key")
  103. .build();
  104. }
  105. @Bean
  106. public Binding bindingUsualQueue(
  107. @Qualifier("usualQueue") Queue queue,
  108. @Qualifier("usualExchange") Exchange exchange) {
  109. return BindingBuilder
  110. .bind(queue)
  111. .to(exchange)
  112. .with("routing_usual_key")
  113. .noargs();
  114. }
  115. }

编写测试Service 发送消息

  1. package com.zking.rabbitmqdemo.provied.service;
  2. /**
  3. * @author aq
  4. * @site www.xiaomage.com
  5. * @company xxx公司
  6. * @create 2021-11-12 10:11
  7. */
  8. public interface ISendMsgService {
  9. void sendDirectMsg();
  10. void topicExchangeSend();
  11. void dxlExchangeSend();
  12. void confirmMessage();
  13. }
  1. package com.zking.rabbitmqdemo.provied.service;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. import javax.annotation.Resource;
  7. import java.time.LocalDateTime;
  8. import java.time.format.DateTimeFormatter;
  9. import java.util.UUID;
  10. /**
  11. * @author aq
  12. * @site www.xiaomage.com
  13. * @company xxx公司
  14. * @create 2021-11-12 10:08
  15. */
  16. @Service
  17. public class SendMsgService implements ISendMsgService {
  18. @Autowired
  19. private RabbitTemplate rabbitTemplate;
  20. @Override
  21. public void sendDirectMsg() {
  22. String msg = "rabbitmq direct exchange send msg "
  23. + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  24. rabbitTemplate.convertAndSend("direct_exchange", "direct_exchange_routing_key",msg);
  25. }
  26. @Override
  27. public void topicExchangeSend() {
  28. String msg = "rabbitmq topic exchange send msg "
  29. + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  30. rabbitTemplate.convertAndSend("topic_exchange", "topic.queue.msg", msg);
  31. }
  32. @Override
  33. public void dxlExchangeSend() {
  34. String msg = "rabbitmq usual exchange send msg "
  35. + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  36. rabbitTemplate.convertAndSend("usual_direct_exchange", "routing_usual_key", msg);
  37. }
  38. @Override
  39. public void confirmMessage() {
  40. String msg = "rabbitmq direct exchange send msg and confirm "
  41. + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  42. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  43. rabbitTemplate.convertAndSend(
  44. "direct.exchange",
  45. "direct.exchange.routing.key",
  46. msg,
  47. correlationData);
  48. }
  49. }

编写测试接口

  1. package com.zking.rabbitmqdemo.provied.service;
  2. import org.junit.Test;
  3. import org.junit.runner.RunWith;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import org.springframework.test.context.junit4.SpringRunner;
  7. import static org.junit.Assert.*;
  8. /**
  9. *
  10. * @site www.xiaomage.com
  11. * @company xxx公司
  12. * @create 2021-11-12 10:12
  13. */
  14. @RunWith(SpringRunner.class)
  15. @SpringBootTest
  16. public class SendMsgServiceTest {
  17. @Autowired
  18. private ISendMsgService sendMsgService;
  19. @Test
  20. public void sendDirectMsg() {
  21. sendMsgService.sendDirectMsg();
  22. }
  23. @Test
  24. public void topicExchangeSend() {
  25. sendMsgService.topicExchangeSend();
  26. }
  27. @Test
  28. public void testDxlExchange() {
  29. sendMsgService.dxlExchangeSend();
  30. }
  31. }

2.2 消费者确认

1)直接使用@RabbitListener,@RabbitHandler注解,通过配置文件配置监听容器:

application.properties

  1. server.port=8084
  2. #rabbitMQç¸å³éç½®
  3. spring.rabbitmq.host=192.168.164.128
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=admin
  6. spring.rabbitmq.password=admin
  7. spring.rabbitmq.virtual-host=my_vhost
  8. #启用消息确认
  9. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  10. spring.rabbitmq.listener.direct.acknowledge-mode=manual
  11. #连接模式为channel
  12. spring.rabbitmq.cache.connection.mode=channel
  13. spring.rabbitmq.cache.channel.size=50
  14. #每个队列的消费者数量
  15. spring.rabbitmq.listener.direct.consumers-per-queue=2
  16. #侦听器调用者线程的最小数量
  17. spring.rabbitmq.listener.simple.concurrency=2
  18. spring.rabbitmq.listener.simple.max-concurrency=100
  19. #每次用队列中取出1个消息,在有多个消息消费者,且消息者处理能力不均时,可以
  20. #起到均衡各消息消费者的处理功能的功能
  21. spring.rabbitmq.listener.direct.prefetch=1
  1. /**
  2. * @author Administrator
  3. * @create 2020-02-2422:30
  4. */
  5. @Component
  6. @Slf4j
  7. public class ReceiverConfirmDemo implements ChannelAwareMessageListener {
  8. /**
  9. * 指定监听的队列.
  10. * 该注解可以放在方法上,也可以放在类上,如果放在类上则需要
  11. * 在一个方法上设置@RabbitHandler(isDefault=true),否则会报如下异常:
  12. * “Listener method ‘no match’ threw exception”。
  13. * 因此建议注解始终使用在方法上。
  14. */
  15. @RabbitListener(queues = "direct.queue")
  16. //指定该方法为消息处理器
  17. @RabbitHandler
  18. @Override
  19. public void onMessage(Message message, Channel channel) throws IOException {
  20. log.info("消息内容: {}",new String(message.getBody()));
  21. /**
  22. * 模拟业务处理方法.
  23. * 对应业务方法中出现的异常需要区分对待,例如以下情况:
  24. * 1)网络异常等可能恢复的异常,可以设置消息重新返回到队列,以便于重新处理
  25. * 2)对应业务数据等不可恢复的异常,则可以进行补偿操作,或放入死信队列进行人工干预
  26. */
  27. try {
  28. log.info("正在处理 ....");
  29. //延迟5
  30. TimeUnit.SECONDS.sleep(5);
  31. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  32. //模拟在业务处理是发生了网络异常,如:在连接数据库保存数据时网络发生了抖动
  33. //此类异常是可以恢复的,需要要消息重新返回队列,以便于下次处理
  34. if(deliveryTag % 2 == 0) {
  35. throw new ConnectException("模拟消息消费者发生网络异常");
  36. }
  37. //模拟发生不可恢复异常,此种情况消息重新入队没有意义
  38. if(deliveryTag % 3 == 0) {
  39. throw new ClassCastException("模拟消息消费者发生不可恢复异常");
  40. }
  41. } catch (SocketException se) {
  42. log.info("SocketException: {}", se.getMessage());
  43. //拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,false则不会重新入队
  44. //如果配置了死信队列则消息会被投递到死信队列
  45. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  46. //不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,
  47. //与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。
  48. // nack后的消息也会被自己消费到
  49. //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  50. //是否恢复消息到队列,参数是是否requeue,true则重新入队列,
  51. // 并且尽可能的将之前recover的消息投递给其他消费者消费,
  52. //而不是自己再次消费。false则消息会重新被投递给自己
  53. //channel.basicRecover(true);
  54. return;
  55. } catch (Exception e) {
  56. //此处处理无法恢复的异常,可记录日志或将消息发送到指定的队列以便于后续的处理
  57. log.info("Exception: {}", e.getMessage());
  58. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
  59. return;
  60. }
  61. log.info("处理完毕, 发送ack确认 .... ");
  62. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  63. }
  64. }

编写完成消息消费者后,将通过消息生产者发送消息,查看消息消费的消费。

2)使用SimpleMessageListenerContainer配置监听容器

  1. /**
  2. * @author Administrator
  3. * @create 2020-03-0119:27
  4. */
  5. @Configuration
  6. @Slf4j
  7. public class MessageListenerContainer {
  8. @Autowired
  9. private ReceiverConfirmDemo receiverConfirmDemo;
  10. @Bean
  11. public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
  12. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  13. container.setConnectionFactory(connectionFactory);
  14. container.setConcurrentConsumers(1);
  15. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  16. container.setQueueNames("direct.queue");
  17. //后置处理器,接收到的消息都添加了Header请求头
  18. /*container.setAfterReceivePostProcessors(message -> {
  19. message.getMessageProperties().getHeaders().put("desc",10);
  20. return message;
  21. });*/
  22. container.setMessageListener(receiverConfirmDemo);
  23. return container;
  24. }
  25. }

在该类中注入的receiverConfirmDemo即为上面已经编写完成的ReceiverConfirmDemo消息处理器。

运行结果

 如果又问题那么 如果是网络异常 客恢复数据那么会回到原有队列 ,如果是不可处理异常那么可以用数据库保存,人工介入处理。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/700849
推荐阅读
相关标签
  

闽ICP备14008679号