当前位置:   article > 正文

Springboot整合RabbitMQ手动ACK_springboot rabbitmq 手动ack

springboot rabbitmq 手动ack

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务但是只完成了部分突然它挂掉了,会发生什么情况?RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

  1. Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
  2. Channel.basicNack (用于否定确认)
  3. Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

生产者代码

properties

  1. server.port=8081
  2. #rabbitmq服务器ip
  3. spring.rabbitmq.host=localhost
  4. #rabbitmq的端口
  5. spring.rabbitmq.port=5672
  6. #用户名
  7. spring.rabbitmq.username=guest
  8. #密码
  9. spring.rabbitmq.password=guest
  10. #配置虚拟机
  11. spring.rabbitmq.virtual-host=demo
  12. #开启发送确认机制,消息到达交换机后会有回调
  13. spring.rabbitmq.publisher-confirm-type=correlated
  14. #可以确保消息在未被队列接收时返回
  15. spring.rabbitmq.publisher-returns=true
  16. ##发送重试配置
  17. #启用发送重试
  18. #spring.rabbitmq.template.retry.enabled=true
  19. #最大重试次数
  20. #spring.rabbitmq.template.retry.max-attempts=5
  21. #第一次和第二次尝试发布或传递消息之间的间隔
  22. #spring.rabbitmq.template.retry.initial-interval=1000ms
  23. #应用于上一重试间隔的乘数 步长
  24. #spring.rabbitmq.template.retry.multiplier=2
  25. #最大重试时间间隔
  26. #spring.rabbitmq.template.retry.max-interval=10000ms

pom依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <version>2.6.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.projectlombok</groupId>
  8. <artifactId>lombok</artifactId>
  9. <optional>true</optional>
  10. </dependency>
  11. <dependency>
  12. <groupId>com.alibaba</groupId>
  13. <artifactId>fastjson</artifactId>
  14. <version>1.2.78</version>
  15. </dependency>

RabbitConfig

  1. @Configuration
  2. @Slf4j
  3. public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
  4. @Autowired
  5. RabbitTemplate rabbitTemplate;
  6. @PostConstruct
  7. public void initRabbitTemplate(){
  8. rabbitTemplate.setConfirmCallback(this);
  9. rabbitTemplate.setReturnsCallback(this);
  10. }
  11. /**
  12. * durable:是否持久化
  13. * exclusive:是否独享、排外的
  14. * autoDelete:是否自动删除
  15. * @return
  16. */
  17. @Bean
  18. Queue addUserQueue(){
  19. return new Queue(RabbitConstant.QUEUE_ADD_USER,true,false,false);
  20. }
  21. /**
  22. * 消息成功到达交换机触发该方法
  23. * @param correlationData
  24. * @param ack
  25. * @param cause
  26. */
  27. @Override
  28. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  29. if (ack){
  30. //消息成功到达交换机
  31. log.info("{}消息成功到达交换机",correlationData.getId());
  32. }else{
  33. log.error("{}消息未到达交换机,原因:{}",correlationData.getId(),cause);
  34. }
  35. }
  36. /**
  37. * 配置publisher-returns为true 消息未成功到达队列,会触发该方法
  38. * @param returned
  39. */
  40. @Override
  41. public void returnedMessage(ReturnedMessage returned) {
  42. log.error("{}消息未到达队列",returned.toString());
  43. }
  44. }

这里我们直接用直连交换机,【DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上】

再写个常量类专门放队列名,交换机名啥的,写到配置文件也可以

RabbitConstant

  1. public class RabbitConstant {
  2. /**
  3. * 简单消息队列
  4. */
  5. public static final String QUEUE_HELLO_MSG = "hello_world_mq";
  6. /**
  7. * 队列
  8. */
  9. public static final String QUEUE_ADD_USER = "queue.add.user";
  10. }

ProducerServiceImpl

  1. @Service
  2. public class ProducerServiceImpl implements ProducerService {
  3. @Autowired
  4. RabbitTemplate rabbitTemplate;
  5. @Override
  6. public Boolean addUser(User user) {
  7. //这里进行一些操作,然后把用户信息发送到消息队列
  8. String userStr = JSON.toJSONString(user);
  9. rabbitTemplate.convertAndSend(RabbitConstant.QUEUE_ADD_USER, (Object) userStr,new CorrelationData(UUID.randomUUID().toString()));
  10. return true;
  11. }
  12. }

实体类User

  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class User implements Serializable {
  5. private static final long serialVersionUID = 1809655848237434192L;
  6. private Integer id;
  7. private String userName;
  8. private String describe;
  9. }

ProducerController

  1. @RestController
  2. public class ProducerController {
  3. @Autowired
  4. ProducerService producerService;
  5. @PostMapping("/addUser")
  6. public Boolean addUser(@RequestBody User user){
  7. return producerService.addUser(user);
  8. }
  9. }

消费者代码

properties

  1. server.port=8082
  2. #rabbitmq服务器ip
  3. spring.rabbitmq.host=localhost
  4. #rabbitmq的端口
  5. spring.rabbitmq.port=5672
  6. #用户名
  7. spring.rabbitmq.username=guest
  8. #密码
  9. spring.rabbitmq.password=guest
  10. #配置虚拟机
  11. spring.rabbitmq.virtual-host=demo
  12. #设置消费端手动 ack none不确认 auto自动确认 manual手动确认
  13. spring.rabbitmq.listener.simple.acknowledge-mode=manual

pom

pom就不贴了都一样的

ConsumerService

  1. @Service
  2. @Slf4j
  3. public class ConsumerService {
  4. public static final String QUEUE_ADD_USER = "queue.add.user";
  5. @RabbitListener(queues =QUEUE_ADD_USER)
  6. @RabbitHandler
  7. public void addUser(String userStr,Message message, Channel channel){
  8. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  9. try {
  10. //在这里做一些操作
  11. User user = JSONObject.parseObject(userStr,User.class);
  12. log.info(user.toString());
  13. //手动ack 第一个参数是消息的标记,第二个参数代表是false 代表仅仅确认当前消息,为true表示确认之前的所有消息
  14. channel.basicAck(deliveryTag,false);
  15. } catch (Exception e) {
  16. //告诉mq本条消息消费失败
  17. try {
  18. channel.basicNack(deliveryTag,false,true);
  19. } catch (IOException ex) {
  20. ex.printStackTrace();
  21. }
  22. }
  23. }
  24. }

测试

 发送后我们看消费者这边已经拿到了

 来不及截图我又发送了一次,看一下RabbitMQ的控制台

 再修改一下消费端代码,直接除零异常,看是否会出现Nack

 好了我们再发送一次

 而且该消息一直在再投递

下一篇具体讲讲如何处理这种情况

基本概念可以参考此篇博文rabbitmq入门

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/127101
推荐阅读
相关标签
  

闽ICP备14008679号