赞
踩
为什么要用消费端批量消费?
在一些业务场景下,我们希望使用 Consumer 批量消费消息,提高消费速度。可以通过对 SimpleRabbitListenerContainerFactory 进行配置实现批量消费能力
==========================>配置类 @Configuration public class ConsumerConfiguration { @Resource ConnectionFactory connectionFactory; @Resource SimpleRabbitListenerContainerFactoryConfigurer configurer; /** * 配置一个批量消费的 SimpleRabbitListenerContainerFactory */ @Bean(name = "consumer10BatchContainerFactory") public SimpleRabbitListenerContainerFactory consumer10BatchContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); // 这里是重点 配置消费者的监听器是批量消费消息的类型 factory.setBatchListener(true); // 一批十个 factory.setBatchSize(1000); // 等待时间 毫秒 , 这里其实是单个消息的等待时间 指的是单个消息的等待时间 // 也就是说极端情况下,你会等待 BatchSize * ReceiveTimeout 的时间才会收到消息 factory.setReceiveTimeout(10 * 1000L); factory.setConsumerBatchEnabled(true); return factory; } } ====================》生产者 @Component public class Producer10 { @Resource RabbitTemplate rabbitTemplate; public void sendSingle(String id, String routingKey) { Message10 message = new Message10(); message.setId(id); rabbitTemplate.convertAndSend(Message10.EXCHANGE, routingKey, message); } } ================================》消费者 @RabbitListener(queues = Message10.QUEUE, containerFactory = "consumer10BatchContainerFactory") @Component @Slf4j public class Consumer10 { /** * 批量消费 * * @param message 一批消息 */ @RabbitHandler public void onMessage(List<Message10> message) { log.info("[{}][Consumer10 批量][线程编号:{}][消息个数:{}][消息内容:{}]" , LocalDateTime.now() , Thread.currentThread().getId() , message.size() , message); } /** * 单个消费 * * @param message 一个消息 */ @RabbitHandler public void onMessage(Message10 message) { log.info("[{}][Consumer10 单个][线程编号:{}][消息内容:{}]" , LocalDateTime.now() , Thread.currentThread().getId() , message); } } ==================================》测试类 @Test void sendSingle() throws InterruptedException { // 假设 一秒一个,发送 1000 个,观察消费者的情况 for (int i = 0; i < 15; i++) { TimeUnit.SECONDS.sleep(1); String id = UUID.randomUUID().toString(); producer10.sendSingle(id, Message10.ROUTING_KEY); if (i == 9) { log.info("[{}][test producer10 sendSingle] 发送成功10个", LocalDateTime.now()); } } log.info("[{}][test producer10 sendSingle] 发送成功", LocalDateTime.now()); TimeUnit.SECONDS.sleep(20); } }
以上的是RabbitMQ之消费者批量消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。