当前位置:   article > 正文

SpringCloudStream——RabbitMQ 手动ACK,Channel 参数为空?_springcloud stream rabbit ack

springcloud stream rabbit ack

问题描述

使用SpringCloudStream 集成RabbitMQ的过程中,一直无法使用手动ACK功能。
在这里插入图片描述
SpringCloud版本:Hoxton.RELEASE
SpringBoot 版本:2.2.1.RELEASE
SpringCloudStream 版本 :3.0.0.RELEASE
MQ 配置文件:

spring:
  cloud:
    stream:
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

原因分析:

配置问题,导致无法绑定对应的channel

在这里插入图片描述

解决方案:

配置文件:

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          greetings-in:
            consumer:
              acknowledge-mode: manual
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
          consumer:
            acknowledge-mode: manual # manual手动确认 ,auto 自动确认
        greetings-out:
          destination: greetings
          contentType: application/json
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

消费者:

@Slf4j
@EnableBinding(GreetingsStreams.class)
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(Message<Greetings> entityMessage) {
        Channel channel = entityMessage.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
        Long deliveryTag = entityMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
        try {
            log.info("Received message: {} ,channel :{} , deliveryTag: {} ", entityMessage.getPayload(), channel, deliveryTag);
            Thread.sleep(10000);
            Random rand = new Random();
            int i = rand.nextInt(10);
            log.info("产生的随机数是:{}", i);
            if (i > 5) {
                /**消息确认**/
                channel.basicAck(deliveryTag, false);
            } else {
                /**重回队列**/
                channel.basicReject(deliveryTag, true);
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

basicReject / basicNack / basicRecover区别

channel.basicReject(deliveryTag, true);
basic.reject方法拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
该方法reject后,该消费者还是会消费到该条被reject的消息。

channel.basicNack(deliveryTag, false, true);
basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue
,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

channel.basicRecover(true);
basic.recover是否恢复消息到队列,参数是是否requeue,true则重新入队列,
并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。false则消息会重新被投递给自己。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/127076
推荐阅读
相关标签
  

闽ICP备14008679号