赞
踩
为什么要引入消费者的并发消费?
当生产者的推送速度是远远超过消费者的能力的,可以提高消费者的消费速度。比如在java中我们可以启动多个 JVM 进程,实现多进程的并发消费,从而加速消费的速度,在mq中也可以通过设置配置。
@RabbitListener 注解中,有 concurrency 属性,它可以指定并发消费的线程数。
下面演示消费者并发消费实现
配置文件yml
spring: rabbitmq: #host为一般模式 若集群模式 将key换成addresses的形式 host: 192.168.9.104 port: 5672 #账号密码自行替换 username: admin password: admin listener: # 选择的 ListenerContainer 的类型。默认为 simple 类型 type: simple simple: # 每个 @ListenerContainer 的并发消费的线程数 concurrency: 2 # 每个 @ListenerContainer 允许的并发消费的线程数 max-concurrency: 10
rabbitmq.listener.type 的枚举值可以参考 ContainerType
SIMPLE 对应 SimpleMessageListenerContainer 消息监听器容器。它一共有两类线程:
Consumer 线程,负责从 RabbitMQ Broker 获取 Queue 中的消息,存储到内存中的 BlockingQueue 阻塞队列中。
Listener 线程,负责从内存中的 BlockingQueue 获取消息,进行消费逻辑
==================================》配置类代码 @Configuration public class DirectExchangeConfiguration { /** * 创建一个 Queue * * @return Queue */ @Bean public Queue queue08() { // Queue:名字 | durable: 是否持久化 | exclusive: 是否排它 | autoDelete: 是否自动删除 return new Queue( Message08.QUEUE, true, false, false); } /** * 创建 Direct Exchange * * @return DirectExchange */ @Bean public DirectExchange exchange08() { // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它 return new DirectExchange(Message08.EXCHANGE, true, false); } /** * 创建 Binding * Exchange:Message08.EXCHANGE * Routing key:Message08.ROUTING_KEY * Queue:Message08.QUEUE * * @return Binding */ @Bean public Binding binding08() { return BindingBuilder .bind(queue08()).to(exchange08()) .with(Message08.ROUTING_KEY); } ========================》生产者代码 @Component public class Producer08 { @Resource private RabbitTemplate rabbitTemplate; public void syncSend(String id, String routingKey) { // 创建 Message08 消息 Message08 message = new Message08(); message.setId(id); // 同步发送消息 rabbitTemplate.convertAndSend(Message08.EXCHANGE, routingKey, message); } } ===========================》消费者代码 @Component // 开启并发消费 @RabbitListener(queues = Message08.QUEUE, concurrency = "2") @Slf4j public class Consumer08 { @RabbitHandler public void onMessage(Message08 message) throws InterruptedException { log.info("[{}][Consumer08 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message); // 模拟消费耗时,为了让并发消费效果更好的展示 TimeUnit.SECONDS.sleep(1); } } @Resource Producer08 producer08; @Test void mock() throws InterruptedException { TimeUnit.SECONDS.sleep(20); } @SneakyThrows @Test void syncSend() { // 循环发送十个,观察消费者情况 for (int i = 0; i < 10; i++) { String id = UUID.randomUUID().toString(); producer08.syncSend(id, Message08.ROUTING_KEY); } log.info("[{}][test producer08 syncSend] 发送成功", LocalDateTime.now()); // 这里多睡一会,确保消息全部消费完成 TimeUnit.SECONDS.sleep(10); }
以上的是消费者并发消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。