赞
踩
首先还是看一下项目目录:
首先是在demo包下创建一个config层,主要是对于线程池管理创建等,然后创建两个类,一个是BusinessThread类及TestThreadPoolManager类,首先是BusinessThread类:
@Component @Scope("prototype")//spring 多例 public class BusinessThread implements Runnable{ private static Logger logger = LoggerFactory.getLogger(BusinessThread.class); private String acceptStr; public BusinessThread(String acceptStr) { this.acceptStr = acceptStr; } public String getAcceptStr() { return acceptStr; } public void setAcceptStr(String acceptStr) { this.acceptStr = acceptStr; } @Override public void run() { //业务操作 logger.info("多线程已经处理订单插入系统,订单号:"+acceptStr); //线程阻塞 /*try { Thread.sleep(1000); System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr); } catch (InterruptedException e) { e.printStackTrace(); }*/ } }
@Component:把普通pojo实例化到spring容器中
@Scope("prototype")//多实例,IOC容器启动创建的时候,并不会创建对象放在容器在容器当中,当你需要的时候,需要从容器当中取该对象的时候,就会创建。
@Scope("singleton")//单实例 IOC容器启动的时候就会调用方法创建对象,以后每次获取都是从容器当中拿同一个对象(map当中)。
@Scope("request")//同一个请求创建一个实例
@Scope("session")//同一个session创建一个实例
相当于创建一个多实例的线程,Runnable是多线程创建方式之一
Java中线程的创建有两种方式:
1. 通过继承Thread类,重写Thread的run()方法,将线程运行的逻辑放在其中
2. 通过实现Runnable接口,实例化Thread类
@Component public class TestThreadPoolManager implements BeanFactoryAware { private static Logger logger = LoggerFactory.getLogger(TestThreadPoolManager.class); //用于从IOC里取对象 private BeanFactory factory; //线程池维护的最少数量 private final static int CORE_POOL_SIZE = 2; //线程池维护线程的最大数值 private final static int MAX_POOL_SIZE = 10; //线程池维护线程所允许的空闲时间 private final static int KEEP_ALIVE_TIME = 0; //线程池所使用的缓冲队列的大小 private final static int WORK_QUEUE_SIZE = 50; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException{ factory = beanFactory; } /** * 用于储存在队列中的订单,防止重复提交,在真实场景中,可用redis代替 验证重复 */ Map<String,Object>cacheMap = new ConcurrentHashMap<>(); /** * 订单的缓冲队列,当线程池满了,则将订单存入到此缓冲队列,无界队列 */ Queue<Object>msgQueue = new LinkedBlockingQueue<>(); /** * 当线程池的容量满了,执行下面代码,将订单存入到缓冲队列 */ final RejectedExecutionHandler handler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //订单加入到缓冲队列 msgQueue.offer(((BusinessThread) r).getAcceptStr()); logger.info("系统任务太繁忙,将订单交给调度线程池处理,订单号:" + ((BusinessThread) r).getAcceptStr()); } }; //创建线程池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS,new ArrayBlockingQueue<>(WORK_QUEUE_SIZE),this.handler); /**将任务加入订单线程池*/ public void addOrders(String orderId){ logger.info("此订单准备添加到线程池,订单号: " + orderId); //验证当前进入的订单是否存在 if (cacheMap.get(orderId) == null){ cacheMap.put(orderId,new Object()); BusinessThread businessThread = new BusinessThread(orderId); threadPool.execute(businessThread); } } /** * 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。 */ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5); /** * 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池 */ final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { //判断缓冲队列是否存在记录 if (!msgQueue.isEmpty()){ //当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池 if (threadPool.getQueue().size()<WORK_QUEUE_SIZE){ String orderId = (String)msgQueue.poll(); BusinessThread businessThread = new BusinessThread(orderId); threadPool.execute(businessThread); logger.info("\"(调度线程池)缓冲队列出现订单业务,重新添加到线程池,订单号:"+orderId); } } } },0,1,TimeUnit.SECONDS); /**获取消息缓冲队列*/ public Queue<Object> getMsgQueue(){ return msgQueue; } /**终止订单线程池+调度线程池*/ public void shutdown(){ logger.info("终止订单线程池+调度线程池:"+scheduledFuture.cancel(false)); scheduler.shutdown(); threadPool.shutdown(); } }
实现 BeanFactoηAware 接口的 bean 可以直接访问 Spring 容器,被容器创建以后,它会拥有一个指向 Spring 容器的引用,可以利用该bean根据传入参数动态获取被spring工厂加载的bean
ConcurrentHashMap:专门用于解决并发问题,关于hashMap和ConcurrentHashMap的知识可以在以下链接查看:
hashMap && ConcurrentHashMap
使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
LinkedBlockingQueue:为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
RejectedExecutionHandler:线程池的拒绝策略,如果线程池满了,就把线程放到上面提高的缓冲队列中
new ThreadPoolExecutor:创建一个线程池,设置对应参数,
订单进入之后,检查是否存在,若存在则不处理,不存在的话就开启一个线程,放入线程池,使用一个定时任务检查缓冲队列及线程池状态,满足条件则将线程放入线程池
@RestController public class TestController { private static Logger logger = LoggerFactory.getLogger(TestController.class); @Autowired TestThreadPoolManager testThreadPoolManager; /** * 测试模拟下单请求 入口 * @param id * @return */ @GetMapping("/start/{id}") public String start(@PathVariable Long id) { //模拟的随机数 String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString(); testThreadPoolManager.addOrders(orderNo); return "Test ThreadPoolExecutor start"; } /** * 停止服务 * @param id * @return */ @GetMapping("/end/{id}") public String end(@PathVariable Long id) { testThreadPoolManager.shutdown(); Queue q = testThreadPoolManager.getMsgQueue(); logger.info("关闭了线程服务,还有未处理的信息条数:" + q.size()); return "Test ThreadPoolExecutor start"; } }
测试的话需要进行模拟高并发测试,可以通过jmeter压测进行,大家可以自行下载,解压之后进入bin目录下,双击jar包就可以了:
启动springboot服务,开启压测,可以看到日志
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。