当前位置:   article > 正文

springboot整合rabbitmq 消费者Consumer 手动进行ack确认_rabbitmq中的basicconsume方法的使用,手动确定ack

rabbitmq中的basicconsume方法的使用,手动确定ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息

代码演示:用队列中的消息做除法运算,如果是整数,就会手动提交ack,如果是随便输入的文字,就会触发 basicNack


1 修改配置文件

  1. server:
  2. port: 8021
  3. spring:
  4. #给项目来个名字
  5. application:
  6. name: rabbitmq-test
  7. #配置rabbitMq 服务器
  8. rabbitmq:
  9. host: 127.0.0.1
  10. port: 5672
  11. username: need
  12. password: 123456
  13. #虚拟host 可以不设置,使用server默认host
  14. virtual-host: /testhost
  15. #ack 确认方式
  16. listener:
  17. simple:
  18. acknowledge-mode: manual
  19. direct:
  20. acknowledge-mode: manual

相关的api解释

channel basicAck(long deliveryTag, boolean multiple);
deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

void basicNack(long deliveryTag, boolean multiple, boolean requeue);
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列

channel.basicReject(long deliveryTag, boolean requeue);
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息


2 消费者代码示例,有些是使用实现接口的方式 ,那是原生api的用法

  1. import com.rabbitmq.client.Channel;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.io.IOException;
  6. @Component
  7. public class DirectReceiver_1 {
  8. @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
  9. public void process(Message message, Channel channel) throws IOException {
  10. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  11. //
  12. //@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag
  13. try {
  14. String msgbody = new String(message.getBody());
  15. //1.接收转换消息
  16. System.out.println("DirectReceiver消费者 1 收到消息 : " +msgbody+" 编号: "+deliveryTag);
  17. //2. 处理业务逻辑
  18. System.out.println("处理业务逻辑...");
  19. //模拟出现错误
  20. System.out.println(500/Double.valueOf(msgbody));
  21. //3. 手动签收
  22. channel.basicAck(deliveryTag,true);
  23. } catch (Exception e) {
  24. try {
  25. Thread.sleep(3000);
  26. } catch (InterruptedException interruptedException) {
  27. interruptedException.printStackTrace();
  28. }
  29. //e.printStackTrace();
  30. //4.拒绝签收
  31. /*
  32. 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
  33. */
  34. channel.basicNack(deliveryTag,true,true);
  35. //channel.basicReject(deliveryTag,true);
  36. }
  37. }
  38. }

3 随便输入的消息,如图,未确认ack

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/127118
推荐阅读
相关标签
  

闽ICP备14008679号