当前位置:   article > 正文

线程池解决RabbitMQ消息堆积

线程池解决RabbitMQ消息堆积

引言

并发量非常大,服务器处理不够及时导致消息堆积的问题是很常见的。上篇文章简单的描述演示了一种解决方案就是增加消费者,提高消息处理速度,使用的是AMQP提供的监听器工厂SimpleRabbitListenerContainerFactory,本篇文章演示另一种方式,在消费者中开启线程池加快消息处理速度。

线程池ThreadPoolExecutor

Java.util提供的ThreadPoolExecutor有七个核心参数

  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数=核心线程数+备用线程数
  • keepAlivetime 备用线程的生存时间
  • unit 备用线程的生存时间
  • workWueue 阻塞队列
  • threadFactory 线程工厂,定义线程对象的创建
  • handler 拒绝策略 当最大线程数达到并且阻塞队列也满时,会触发拒绝策略。

线程池的种类有四种

固定线程数的线程池:newFixedThreadPool(int Threads)

特点:

  • 核心线程数与最大线程数一样,不存在备用线程
  • 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE

单线程化的线程池:newSingleThreadExecutor()

只使用为一的工作线程来执行任务,保证所有任务按照指定顺序执行FIFO

特点:

  • 核心线程数和最大线程数都是1
  • 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE

可缓存线程池:newCachedThreadPool()

特点:

  • 核心线程数为0
  • 最大线程数是Integer.MAX_VALUE
  • 阻塞队列为SynchronousQueue:不存储元素的阻塞队列,每个插入操作都需要等待一个移出操作

具有延迟和周期性执行:newScheduledThreadPoolExecutor(int corePoolSize)

可以执行延迟任务,支持定时以及周期性操作。

环境配置(参考上篇文章)

定义线程池

使用Spring提供的ThreadPoolTaskExecutor

  1. package cn.itcast.mq.thread;
  2. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  4. import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.scheduling.annotation.EnableAsync;
  8. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  9. import java.util.concurrent.*;
  10. @Configuration
  11. @EnableAsync
  12. public class ThreadPoolConfig {
  13. /*定义线程池,提供多个线程消费消息
  14. */
  15. @Bean("pooltoconsumer")
  16. public ThreadPoolTaskExecutor pooltoconsumer()
  17. {
  18. //
  19. ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  20. threadPoolTaskExecutor.setCorePoolSize(8);
  21. threadPoolTaskExecutor.setMaxPoolSize(20);
  22. threadPoolTaskExecutor.setQueueCapacity(100);
  23. threadPoolTaskExecutor.setKeepAliveSeconds(3);
  24. threadPoolTaskExecutor.setThreadNamePrefix("开启多个线程消费消息----");
  25. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
  26. threadPoolTaskExecutor.initialize();
  27. return threadPoolTaskExecutor;
  28. }
  29. }

 修改MQ监听器

  1. @Async("pooltoconsumer")
  2. @RabbitListener(bindings = @QueueBinding(value = @Queue(name="demo.queue2"),exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),key = {"川菜"}) )
  3. public void chuancai(String msg)
  4. {
  5. //拆分消息
  6. String[] split = msg.split(":");
  7. order order = new order(split[0], split[1]);
  8. System.out.println(order.toString());
  9. //保存MYSQL
  10. orderService.save(order);
  11. //测试是否多个消费者
  12. System.out.println("线程" + Thread.currentThread().getName() + " 执行异步任务" +"----"+"川菜"+"----"+split[1]);
  13. }

使用JMeter压测,查看结果,成功!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/546330
推荐阅读
相关标签
  

闽ICP备14008679号