当前位置:   article > 正文

RabbitMQ 死信队列实现_broker not available; cannot force queue declarati

broker not available; cannot force queue declarations during start: java.io.
  1. // consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量
  2. channel.basicAck(deliveryTag, multiple);
  3. // 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列,该方法reject后,该消费者还是会消费到该条被reject的消息
  4. channel.basicReject(deliveryTag, requeue);
  5. // 不确认 deliveryTag 对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
  6. channel.basicNack(deliveryTag, multiple, requeue);
  7. // 是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
  8. channel.basicRecover(false);

搭建项目

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. listener:
  8. type: simple
  9. simple:
  10. default-requeue-rejected: false
  11. acknowledge-mode: manual

  1. @Configuration
  2. public class RabbitMQConfig {
  3. // 正常业务
  4. public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
  5. public static final String NORMAL_QUEUE_A = "normal-queue-a";
  6. public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
  7. // 死信队列
  8. public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
  9. public static final String DEAD_QUEUE_A = "dead-queue-a";
  10. public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
  11. // 声明交换机
  12. @Bean("businessExchange")
  13. public TopicExchange normalExchangeA() {
  14. return new TopicExchange(NORMAL_EXCHANGE_A);
  15. }
  16. @Bean("deadExchange")
  17. public DirectExchange deadExchange() {
  18. return new DirectExchange(DEAD_EXCHANGE_A);
  19. }
  20. // 声明队列
  21. @Bean("businessQueueA")
  22. public Queue businessQueueA() {
  23. HashMap<String, Object> args = new HashMap<>(3);
  24. args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
  25. args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
  26. return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
  27. }
  28. @Bean("deadQueueA")
  29. public Queue deadQueue() {
  30. return QueueBuilder.durable(DEAD_QUEUE_A).build();
  31. }
  32. // 声明绑定关系
  33. @Bean
  34. public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
  35. return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
  36. }
  37. @Bean
  38. public Binding bindingDead(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) {
  39. return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY_A);
  40. }
  41. }
  1. @Component
  2. public class SmsListener {
  3. @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
  4. public void smsListener(Message message, Channel channel) throws IOException {
  5. String body = new String(message.getBody());
  6. System.out.println("收到消息:" + body);
  7. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  8. if (body.contains("dead")) {
  9. channel.basicNack(deliveryTag, false, false);
  10. }
  11. channel.basicAck(deliveryTag, false);
  12. }
  13. }
  1. @Component
  2. public class DeadListener {
  3. @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_A)
  4. public void deadListener(Message message, Channel channel) throws IOException {
  5. String body = new String(message.getBody());
  6. System.out.println("dead listener: " + body);
  7. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  8. }
  9. }
  1. @RestController
  2. public class HelloController {
  3. @Resource
  4. private RabbitTemplate rabbitTemplate;
  5. @GetMapping("/hello")
  6. public Boolean hello(String msg) {
  7. rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
  8. return true;
  9. }
  10. }

使用注解

  1. @Configuration
  2. public class RabbitMQConfig {
  3. // 正常业务
  4. public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
  5. public static final String NORMAL_QUEUE_A = "normal-queue-a";
  6. public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
  7. // 死信队列
  8. public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
  9. public static final String DEAD_QUEUE_A = "dead-queue-a";
  10. public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
  11. // 声明交换机
  12. @Bean("businessExchange")
  13. public TopicExchange normalExchangeA() {
  14. return new TopicExchange(NORMAL_EXCHANGE_A);
  15. }
  16. // 声明队列
  17. @Bean()
  18. public Queue businessQueueA() {
  19. HashMap<String, Object> args = new HashMap<>(2);
  20. args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
  21. args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
  22. return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
  23. }
  24. // 声明绑定关系
  25. @Bean
  26. public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
  27. return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
  28. }
  29. }

死信队列使用注解实现

  1. @Component
  2. public class DeadListener {
  3. @RabbitListener(bindings = @QueueBinding(
  4. value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A),
  5. exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, type = ExchangeTypes.DIRECT),
  6. key = RabbitMQConfig.DEAD_ROUTING_KEY_A
  7. ))
  8. public void deadListener(Message message, Channel channel) throws IOException {
  9. String body = new String(message.getBody());
  10. System.out.println("死信队列消费消息: " + body);
  11. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  12. }
  13. }
  1. @Component
  2. public class SmsListener {
  3. @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
  4. // @RabbitListener(bindings = @QueueBinding(
  5. // value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
  6. // exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
  7. // key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
  8. // ))
  9. public void smsListener(Message message, Channel channel) throws IOException {
  10. String body = new String(message.getBody());
  11. System.out.println("正常消费消息:" + body);
  12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  13. if (body.contains("dead")) {
  14. channel.basicNack(deliveryTag, false, false);
  15. // return;
  16. }
  17. channel.basicAck(deliveryTag, false);
  18. }
  19. }

报错:

  1. Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
  2. // 由于程序编写不严谨,在 basicNack 执行后没有退出方法,导致最后还执行了 basicAck,出现了上述错误
  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
  3. exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
  4. key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
  5. ))
  6. public void smsListener(Message message, Channel channel) throws IOException {
  7. String body = new String(message.getBody());
  8. System.out.println("正常消费消息:" + body);
  9. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  10. if (body.contains("dead")) {
  11. channel.basicNack(deliveryTag, false, false);
  12. return;
  13. }
  14. channel.basicAck(deliveryTag, false);
  15. }
  1. // 问题二: 控制台报错,但是也能正常消费mq消息,这里与第一种唯一的区别是在于 @RabbitListener, 我的推测是 自定义 bean 和注解生成的 bean 重复导致,看能不能使用注解绑定死信队列
  2. 2023-04-23 22:03:25.630 ERROR 8580 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'normal-queue-a' in vhost '/': received none but current is the value 'dead-exchange-a' of type 'longstr', class-id=50, method-id=10)
  3. Broker not available; cannot force queue declarations during start: java.io.IOException
  1. @Target({})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. public @interface Exchange {
  4. String TRUE = "true";
  5. String FALSE = "false";
  6. @AliasFor("name")
  7. String value() default "";
  8. @AliasFor("value")
  9. String name() default "";
  10. String type() default "direct";
  11. String durable() default "true";
  12. String autoDelete() default "false";
  13. String internal() default "false";
  14. String ignoreDeclarationExceptions() default "false";
  15. String delayed() default "false";
  16. Argument[] arguments() default {};
  17. String declare() default "true";
  18. String[] admins() default {};
  19. }
  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
  3. exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC, arguments = {
  4. @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
  5. @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
  6. }),
  7. key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
  8. ))
  9. public void smsListener(Message message, Channel channel) throws IOException {
  10. String body = new String(message.getBody());
  11. System.out.println("正常消费消息:" + body);
  12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  13. if (body.contains("dead")) {
  14. channel.basicNack(deliveryTag, false, false);
  15. return;
  16. }
  17. channel.basicAck(deliveryTag, false);
  18. }

可以使用注解的方式来绑定 死信队列,但是还是会报上面的错误,继续修改 参数试试

java - How to set x-dead-letter-exchange in Rabbit? - Stack Overflow

但是使用注解绑定的话好像又不生效了,问题原因,tmd将死信参数绑到交换机上了,c

修改代码

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
  3. @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
  4. @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
  5. }),
  6. exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
  7. key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
  8. ))
  9. public void smsListener(Message message, Channel channel) throws IOException {
  10. String body = new String(message.getBody());
  11. System.out.println("正常消费消息:" + body);
  12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  13. if (body.contains("dead")) {
  14. channel.basicNack(deliveryTag, false, false);
  15. return;
  16. }
  17. channel.basicAck(deliveryTag, false);
  18. }

 至于问题二是由于队列和交换机默认持久化,这样就导第二次启动项目时重复

Springboot纯注解版的RabbitMq 死信队列_注解声明私信队列_lopo呀的博客-CSDN博客

全注解版

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. listener:
  8. type: simple
  9. simple:
  10. default-requeue-rejected: false
  11. acknowledge-mode: manual
  1. // 正常业务
  2. public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
  3. public static final String NORMAL_QUEUE_A = "normal-queue-a";
  4. public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
  5. // 死信队列
  6. public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
  7. public static final String DEAD_QUEUE_A = "dead-queue-a";
  8. public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
  1. @Component
  2. public class SmsListener {
  3. @RabbitListener(bindings = @QueueBinding(
  4. value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
  5. @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
  6. @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
  7. }),
  8. exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
  9. key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
  10. ))
  11. public void smsListener(Message message, Channel channel) throws IOException {
  12. String body = new String(message.getBody());
  13. System.out.println("正常消费消息:" + body);
  14. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  15. if (body.contains("dead")) {
  16. channel.basicNack(deliveryTag, false, false);
  17. return;
  18. }
  19. channel.basicAck(deliveryTag, false);
  20. }
  21. }
  1. @Component
  2. public class DeadListener {
  3. @RabbitListener(bindings = @QueueBinding(
  4. value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A, durable = "false"),
  5. exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, durable = "false"),
  6. key = RabbitMQConfig.DEAD_ROUTING_KEY_A
  7. ))
  8. public void deadListener(Message message, Channel channel) throws IOException {
  9. String body = new String(message.getBody());
  10. System.out.println("死信队列消费消息: " + body);
  11. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  12. }
  13. }
  1. @GetMapping("/hello")
  2. public Boolean hello(String msg) {
  3. System.out.println("发送消息:" + msg);
  4. rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
  5. return true;
  6. }
  1. // conslog
  2. 发送消息:dead
  3. 正常消费消息:dead
  4. 死信队列消费消息: dead

明天在研究下回调啥的

springboot整合rabbitMQ confirm 确认模式 return 退回模式_weixin_44318244的博客-CSDN博客

回调

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式

  • return 退回模式

rabbitmq 整个消息投递的路径为:producer—>rabbitmq broker—>exchange—>queue—>consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。

  • 消息从 exchange–>queue 投递失败则会返回一个 returnCallback

我们将利用这两个 callback 控制消息的可靠性投递

消息的可靠投递小结 ➢ 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。 ➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

➢ 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。 ➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

确认模式

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. username: guest
  6. password: guest
  7. listener:
  8. type: simple
  9. simple:
  10. default-requeue-rejected: false
  11. acknowledge-mode: manual
  12. publisher-confirm-type: correlated # 发布确认属性配置
  13. publisher-returns: true # 开启 退回模式
  1. public enum ConfirmType {
  2. /**
  3. * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
  4. * within scoped operations.
  5. SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会 关闭channel,则接下来无法发送消息到broker;
  6. */
  7. SIMPLE,
  8. /**
  9. * Use with {@code CorrelationData} to correlate confirmations with sent 发布消息成功到交换器后会触发回调方法
  10. * messsages.
  11. */
  12. CORRELATED,
  13. /**
  14. * Publisher confirms are disabled (default).
  15. */
  16. NONE
  17. }
  1. @Configuration
  2. public class PublisherConfirmHandler implements RabbitTemplate.ConfirmCallback {
  3. @Override
  4. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  5. if (ack){
  6. System.out.println("发送消息到交换机成功!MessageId: " + correlationData.getId());
  7. }else {
  8. System.out.println("发送消息到交换机失败!MessageId: " + correlationData.getId() + ", 退回原因:" + cause);
  9. }
  10. }
  11. }
  1. @Resource
  2. private PublisherConfirmHandler publisherConfirmHandler;
  3. rabbitTemplate.setConfirmCallback(publisherConfirmHandler);

回退模式  

  1. @Configuration
  2. public class ReturnsCallbackHandler implements RabbitTemplate.ReturnsCallback {
  3. @Override
  4. public void returnedMessage(ReturnedMessage returned) {
  5. System.out.println("return 执行了!" + returned);
  6. }
  7. }

 

  1. //
  2. rabbitTemplate.setMandatory(true);
  3. rabbitTemplate.setReturnsCallback(returnsCallbackHandler);

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/1018416
推荐阅读
相关标签
  

闽ICP备14008679号