当前位置:   article > 正文

RabbitMQ消费端并发和限流设置_rabbitmq消费者并发

rabbitmq消费者并发

并发

默认情况一下,一个listener对应一个consumer,如果想对应多个,有两种方式。

  1. 在消费端,配置prefetch和concurrency参数便可以实现消费端MQ并发处理消息

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        listener:
          simple:
    #        acknowledge-mode: manual  # 手动确定(默认自动确认)
            concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
            max-concurrency: 10 # 消费端的监听最大个数
            prefetch: 10
        connection-timeout: 15000   # 超时时间
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这个是个全局配置,应用里的任何队列对应的listener都至少有5个consumer,但是最好别这么做,因为一般情况下,一个listener对应一个consumer是够用的。只是针对部分场景,才需要一对多

  2. 在注解中配置该参数

    在RabbitListeners注解上加上concurrency属性

    会覆盖配置文件中的参数

    直接在@RabbitListener上配置

    @Component
    public class SpringBootMsqConsumer {
        @RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10")
        public void receive(Message message) {
            System.out.println("receive message:" + new String(message.getBody()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    //@RabbitListener(queues = {"kinson"}, concurrency =  "2")
    @RabbitListener(queues = {"kinson"}, concurency="min-max") // 最小并发数是5,最大并发是10
    public void receiver(Message msg, Channel channel) throws InterruptedException {
    
    //        Thread.sleep(10000);
        byte[] messageBytes = msg.getBody();
    
        if (messageBytes != null && messageBytes.length > 0) {
            //打印数据
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【消3】:{}", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    @RabbitListener(
             bindings = @QueueBinding(
                     value = @Queue(value = "myQueue1"),
                     exchange = @Exchange(value = "myExchange1"),
                     key = "routingKey1"
             ),
             concurrency =  "10"
     )
     public void process1(Message message) throws Exception {
         // 执行程序
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    利用@RabbitListener中的concurrency属性进行指定就行,例如上面的concurrency = “5-10”

    就表示最小5个,最大10个consumer。启动消费端应用后,找到spring-boot-direct-queue这个队列的consumer,会发现有5个。
    在这里插入图片描述
    这5个消费者都可以从spring-boot-direct-queue这个队列中获取消息,加快队列中消息的消费速度,提高吞吐量。

限流

我们经过压测,来判断consumer的消费能力,如果单位时间内,consumer到达的消息太多,也可能把消费者压垮。
得到压测数据后,可以在@RabbitListener中配置prefetch count。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringBootMsqConsumer {
    @RabbitListener(queues = "spring-boot-direct-queue",concurrency = "5-10",containerFactory = "mqConsumerlistenerContainer")
    public void receive(Message message) {
        System.out.println("receive message:" + new String(message.getBody()));
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean(name = "mqConsumerlistenerContainer")
    public SimpleRabbitListenerContainerFactory mqConsumerlistenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPrefetchCount(50);
        return factory;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

factory.setPrefetchCount(50);,就是用于设置prefetch count的,启动后,会在spring-boot-direct-queue队列的consumer中体现出来。
在这里插入图片描述
配置成功后,consumer单位时间内接收到消息就是50条。


@Autowired
CachingConnectionFactory cachingConnectionFactory;

@Bean(name="limitContainerFactory")
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
    SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cachingConnectionFactory);
    factory.setConcurrentConsumers(60); // 并发消费者数量
    factory.setPrefetchCount(3);  // 每次最多拿3个,等这三个处理完之后,再去队列中拿第二批,起到了限流的作用
    return factory;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

@Bean("limitContainerFactory")
public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setPrefetchCount(2); //每个消费者预取数量
    factory.setConcurrentConsumers(60); //并发消费者数量
    configurer.configure(factory, connectionFactory);
    return factory;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

@RabbitHandler
@RabbitListener(queues = QueueContent.MESSAGE_QUEUE_NAME,containerFactory = "limitContainerFactory")
public void handler(String msg,Channel channel, Message message) throws IOException {
	...... 
}
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/386674
推荐阅读
相关标签
  

闽ICP备14008679号