赞
踩
- // consumer处理成功后,通知broker删除队列中的消息,如果设置multiple=true,表示支持批量确认机制以减少网络流量
- channel.basicAck(deliveryTag, multiple);
-
- // 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列,该方法reject后,该消费者还是会消费到该条被reject的消息
- channel.basicReject(deliveryTag, requeue);
-
- // 不确认 deliveryTag 对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。
- channel.basicNack(deliveryTag, multiple, requeue);
-
- // 是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
- channel.basicRecover(false);
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- listener:
- type: simple
- simple:
- default-requeue-rejected: false
- acknowledge-mode: manual
- @Configuration
- public class RabbitMQConfig {
-
-
- // 正常业务
- public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
-
- public static final String NORMAL_QUEUE_A = "normal-queue-a";
-
- public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
-
-
- // 死信队列
- public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
-
- public static final String DEAD_QUEUE_A = "dead-queue-a";
-
- public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
-
-
- // 声明交换机
- @Bean("businessExchange")
- public TopicExchange normalExchangeA() {
- return new TopicExchange(NORMAL_EXCHANGE_A);
- }
-
- @Bean("deadExchange")
- public DirectExchange deadExchange() {
- return new DirectExchange(DEAD_EXCHANGE_A);
- }
-
-
- // 声明队列
-
- @Bean("businessQueueA")
- public Queue businessQueueA() {
- HashMap<String, Object> args = new HashMap<>(3);
- args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
- args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
- return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
- }
-
-
- @Bean("deadQueueA")
- public Queue deadQueue() {
- return QueueBuilder.durable(DEAD_QUEUE_A).build();
- }
-
-
- // 声明绑定关系
- @Bean
- public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
- }
-
- @Bean
- public Binding bindingDead(@Qualifier("deadQueueA") Queue queue, @Qualifier("deadExchange") DirectExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(DEAD_ROUTING_KEY_A);
- }
- }
- @Component
- public class SmsListener {
-
- @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
- public void smsListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
-
- System.out.println("收到消息:" + body);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- if (body.contains("dead")) {
- channel.basicNack(deliveryTag, false, false);
- }
- channel.basicAck(deliveryTag, false);
- }
- }
- @Component
- public class DeadListener {
-
- @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE_A)
- public void deadListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
- System.out.println("dead listener: " + body);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
- @RestController
- public class HelloController {
-
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping("/hello")
- public Boolean hello(String msg) {
- rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
- return true;
- }
-
- }
- @Configuration
- public class RabbitMQConfig {
-
-
- // 正常业务
- public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
-
- public static final String NORMAL_QUEUE_A = "normal-queue-a";
-
- public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
-
-
- // 死信队列
- public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
-
- public static final String DEAD_QUEUE_A = "dead-queue-a";
-
- public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
-
-
- // 声明交换机
- @Bean("businessExchange")
- public TopicExchange normalExchangeA() {
- return new TopicExchange(NORMAL_EXCHANGE_A);
- }
-
-
- // 声明队列
- @Bean()
- public Queue businessQueueA() {
- HashMap<String, Object> args = new HashMap<>(2);
- args.put("x-dead-letter-exchange", DEAD_EXCHANGE_A);
- args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY_A);
- return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(args).build();
- }
-
-
- // 声明绑定关系
- @Bean
- public Binding bindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") TopicExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with(NORMAL_ROUTING_KEY_A);
- }
-
-
- }
死信队列使用注解实现
- @Component
- public class DeadListener {
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A),
- exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, type = ExchangeTypes.DIRECT),
- key = RabbitMQConfig.DEAD_ROUTING_KEY_A
- ))
- public void deadListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
- System.out.println("死信队列消费消息: " + body);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
- @Component
- public class SmsListener {
-
- @RabbitListener(queues = RabbitMQConfig.NORMAL_QUEUE_A)
- // @RabbitListener(bindings = @QueueBinding(
- // value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
- // exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
- // key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
- // ))
- public void smsListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
-
- System.out.println("正常消费消息:" + body);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- if (body.contains("dead")) {
- channel.basicNack(deliveryTag, false, false);
- // return;
- }
- channel.basicAck(deliveryTag, false);
- }
- }
报错:
- Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
-
- // 由于程序编写不严谨,在 basicNack 执行后没有退出方法,导致最后还执行了 basicAck,出现了上述错误
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
- exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC),
- key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
- ))
- public void smsListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
-
- System.out.println("正常消费消息:" + body);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- if (body.contains("dead")) {
- channel.basicNack(deliveryTag, false, false);
- return;
- }
- channel.basicAck(deliveryTag, false);
- }
- // 问题二: 控制台报错,但是也能正常消费mq消息,这里与第一种唯一的区别是在于 @RabbitListener, 我的推测是 自定义 bean 和注解生成的 bean 重复导致,看能不能使用注解绑定死信队列
- 2023-04-23 22:03:25.630 ERROR 8580 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'normal-queue-a' in vhost '/': received none but current is the value 'dead-exchange-a' of type 'longstr', class-id=50, method-id=10)
- Broker not available; cannot force queue declarations during start: java.io.IOException
- @Target({})
- @Retention(RetentionPolicy.RUNTIME)
- public @interface Exchange {
- String TRUE = "true";
- String FALSE = "false";
-
- @AliasFor("name")
- String value() default "";
-
- @AliasFor("value")
- String name() default "";
-
- String type() default "direct";
-
- String durable() default "true";
-
- String autoDelete() default "false";
-
- String internal() default "false";
-
- String ignoreDeclarationExceptions() default "false";
-
- String delayed() default "false";
-
- Argument[] arguments() default {};
-
- String declare() default "true";
-
- String[] admins() default {};
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A),
- exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, type = ExchangeTypes.TOPIC, arguments = {
- @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
- @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
- }),
- key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
- ))
- public void smsListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
-
- System.out.println("正常消费消息:" + body);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- if (body.contains("dead")) {
- channel.basicNack(deliveryTag, false, false);
- return;
- }
- channel.basicAck(deliveryTag, false);
- }
可以使用注解的方式来绑定 死信队列,但是还是会报上面的错误,继续修改 参数试试
java - How to set x-dead-letter-exchange in Rabbit? - Stack Overflow
但是使用注解绑定的话好像又不生效了,问题原因,tmd将死信参数绑到交换机上了,c
修改代码
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
- @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
- @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
- }),
- exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
- key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
- ))
- public void smsListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
-
- System.out.println("正常消费消息:" + body);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- if (body.contains("dead")) {
- channel.basicNack(deliveryTag, false, false);
- return;
- }
- channel.basicAck(deliveryTag, false);
- }
至于问题二是由于队列和交换机默认持久化,这样就导第二次启动项目时重复
Springboot纯注解版的RabbitMq 死信队列_注解声明私信队列_lopo呀的博客-CSDN博客
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- listener:
- type: simple
- simple:
- default-requeue-rejected: false
- acknowledge-mode: manual
- // 正常业务
- public static final String NORMAL_EXCHANGE_A = "normal-exchange-a";
-
- public static final String NORMAL_QUEUE_A = "normal-queue-a";
-
- public static final String NORMAL_ROUTING_KEY_A = "normal-routing-key-a";
-
-
- // 死信队列
- public static final String DEAD_EXCHANGE_A = "dead-exchange-a";
-
- public static final String DEAD_QUEUE_A = "dead-queue-a";
-
- public static final String DEAD_ROUTING_KEY_A = "dead-routing-key-a";
- @Component
- public class SmsListener {
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = RabbitMQConfig.NORMAL_QUEUE_A, durable = "false", arguments = {
- @Argument(name = "x-dead-letter-exchange", value = RabbitMQConfig.DEAD_EXCHANGE_A),
- @Argument(name = "x-dead-letter-routing-key", value = RabbitMQConfig.DEAD_ROUTING_KEY_A)
- }),
- exchange = @Exchange(value = RabbitMQConfig.NORMAL_EXCHANGE_A, durable = "false", type = ExchangeTypes.TOPIC),
- key = RabbitMQConfig.NORMAL_ROUTING_KEY_A
- ))
- public void smsListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
-
- System.out.println("正常消费消息:" + body);
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- if (body.contains("dead")) {
- channel.basicNack(deliveryTag, false, false);
- return;
- }
- channel.basicAck(deliveryTag, false);
- }
- }
- @Component
- public class DeadListener {
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(value = RabbitMQConfig.DEAD_QUEUE_A, durable = "false"),
- exchange = @Exchange(value = RabbitMQConfig.DEAD_EXCHANGE_A, durable = "false"),
- key = RabbitMQConfig.DEAD_ROUTING_KEY_A
- ))
- public void deadListener(Message message, Channel channel) throws IOException {
- String body = new String(message.getBody());
- System.out.println("死信队列消费消息: " + body);
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
- @GetMapping("/hello")
- public Boolean hello(String msg) {
- System.out.println("发送消息:" + msg);
- rabbitTemplate.convertAndSend(RabbitMQConfig.NORMAL_EXCHANGE_A, RabbitMQConfig.NORMAL_ROUTING_KEY_A, msg);
- return true;
- }
- // conslog
- 发送消息:dead
- 正常消费消息:dead
- 死信队列消费消息: dead
明天在研究下回调啥的
springboot整合rabbitMQ confirm 确认模式 return 退回模式_weixin_44318244的博客-CSDN博客
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。 RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
confirm 确认模式
return 退回模式
rabbitmq 整个消息投递的路径为:producer—>rabbitmq broker—>exchange—>queue—>consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange–>queue 投递失败则会返回一个 returnCallback
我们将利用这两个 callback 控制消息的可靠性投递
消息的可靠投递小结 ➢ 设置ConnectionFactory的publisher-confirms=“true” 开启 确认模式。 ➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
➢ 设置ConnectionFactory的publisher-returns=“true” 开启 退回模式。 ➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- listener:
- type: simple
- simple:
- default-requeue-rejected: false
- acknowledge-mode: manual
- publisher-confirm-type: correlated # 发布确认属性配置
- publisher-returns: true # 开启 退回模式
- public enum ConfirmType {
-
- /**
- * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
- * within scoped operations.
- SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会 关闭channel,则接下来无法发送消息到broker;
- */
- SIMPLE,
-
- /**
- * Use with {@code CorrelationData} to correlate confirmations with sent 发布消息成功到交换器后会触发回调方法
- * messsages.
- */
- CORRELATED,
-
- /**
- * Publisher confirms are disabled (default).
- */
- NONE
-
- }
- @Configuration
- public class PublisherConfirmHandler implements RabbitTemplate.ConfirmCallback {
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (ack){
- System.out.println("发送消息到交换机成功!MessageId: " + correlationData.getId());
- }else {
- System.out.println("发送消息到交换机失败!MessageId: " + correlationData.getId() + ", 退回原因:" + cause);
- }
- }
- }
- @Resource
- private PublisherConfirmHandler publisherConfirmHandler;
-
- rabbitTemplate.setConfirmCallback(publisherConfirmHandler);
- @Configuration
- public class ReturnsCallbackHandler implements RabbitTemplate.ReturnsCallback {
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- System.out.println("return 执行了!" + returned);
- }
- }
- //
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setReturnsCallback(returnsCallbackHandler);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。