当前位置:   article > 正文

RabbitMQ之消费者并发消费

RabbitMQ之消费者并发消费

为什么要引入消费者的并发消费?
当生产者的推送速度是远远超过消费者的能力的,可以提高消费者的消费速度。比如在java中我们可以启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度,在mq中也可以通过设置配置。
@RabbitListener 注解中,有 concurrency 属性,它可以指定并发消费的线程数。
下面演示消费者并发消费实现
配置文件yml

spring:
  rabbitmq:
  	#host为一般模式 若集群模式 将key换成addresses的形式
    host: 192.168.9.104
    port: 5672
    #账号密码自行替换
    username: admin
    password: admin
    listener:
      # 选择的 ListenerContainer 的类型。默认为 simple 类型
      type: simple
      simple:
        # 每个 @ListenerContainer 的并发消费的线程数
        concurrency: 2
        # 每个 @ListenerContainer 允许的并发消费的线程数
        max-concurrency: 10 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

rabbitmq.listener.type 的枚举值可以参考 ContainerType

SIMPLE 对应 SimpleMessageListenerContainer 消息监听器容器。它一共有两类线程:

Consumer 线程,负责从 RabbitMQ Broker 获取 Queue 中的消息,存储到内存中的 BlockingQueue 阻塞队列中。
Listener 线程,负责从内存中的 BlockingQueue 获取消息,进行消费逻辑

==================================》配置类代码
@Configuration
public class DirectExchangeConfiguration {
    /**
     * 创建一个 Queue
     *
     * @return Queue
     */
    @Bean
    public Queue queue08() {
        // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除
        return new Queue(
                Message08.QUEUE,
                true,
                false,
                false);
    }

    /**
     * 创建 Direct Exchange
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange exchange08() {
        // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
        return new DirectExchange(Message08.EXCHANGE,
                true,
                false);
    }

    /**
     * 创建 Binding
     * Exchange:Message08.EXCHANGE
     * Routing key:Message08.ROUTING_KEY
     * Queue:Message08.QUEUE
     *
     * @return Binding
     */
    @Bean
    public Binding binding08() {
        return BindingBuilder
                .bind(queue08()).to(exchange08())
                .with(Message08.ROUTING_KEY);
    }
========================》生产者代码
@Component
public class Producer08 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void syncSend(String id, String routingKey) {
        // 创建 Message08 消息
        Message08 message = new Message08();
        message.setId(id);
        // 同步发送消息
        rabbitTemplate.convertAndSend(Message08.EXCHANGE, routingKey, message);
    }
}
===========================》消费者代码
@Component
// 开启并发消费
@RabbitListener(queues = Message08.QUEUE, concurrency = "2")
@Slf4j
public class Consumer08 {

    @RabbitHandler
    public void onMessage(Message08 message) throws InterruptedException {
        log.info("[{}][Consumer08 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
        // 模拟消费耗时,为了让并发消费效果更好的展示
        TimeUnit.SECONDS.sleep(1);
    }
}
@Resource
    Producer08 producer08;

    @Test
    void mock() throws InterruptedException {
        TimeUnit.SECONDS.sleep(20);
    }

    @SneakyThrows
    @Test
    void syncSend() {
        // 循环发送十个,观察消费者情况
        for (int i = 0; i < 10; i++) {
            String id = UUID.randomUUID().toString();
            producer08.syncSend(id, Message08.ROUTING_KEY);
        }
        log.info("[{}][test producer08 syncSend] 发送成功", LocalDateTime.now());
        // 这里多睡一会,确保消息全部消费完成
        TimeUnit.SECONDS.sleep(10);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

以上的是消费者并发消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/539727
推荐阅读
相关标签
  

闽ICP备14008679号