当前位置:   article > 正文

SpringBoot项目实战演练(三)_springboot 使用 executors.newscheduledthreadpool

springboot 使用 executors.newscheduledthreadpool

SpringBoot多线程模拟高并发

首先还是看一下项目目录:
在这里插入图片描述

项目架构

首先是在demo包下创建一个config层,主要是对于线程池管理创建等,然后创建两个类,一个是BusinessThread类及TestThreadPoolManager类,首先是BusinessThread类:

@Component
@Scope("prototype")//spring 多例
public class BusinessThread implements Runnable{
    private static Logger logger = LoggerFactory.getLogger(BusinessThread.class);
    private String acceptStr;

    public BusinessThread(String acceptStr) {
        this.acceptStr = acceptStr;
    }

    public String getAcceptStr() {
        return acceptStr;
    }

    public void setAcceptStr(String acceptStr) {
        this.acceptStr = acceptStr;
    }

    @Override
    public void run() {
        //业务操作
        logger.info("多线程已经处理订单插入系统,订单号:"+acceptStr);

        //线程阻塞
        /*try {
            Thread.sleep(1000);
            System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

@Component:把普通pojo实例化到spring容器中

@Scope("prototype")//多实例,IOC容器启动创建的时候,并不会创建对象放在容器在容器当中,当你需要的时候,需要从容器当中取该对象的时候,就会创建。
@Scope("singleton")//单实例 IOC容器启动的时候就会调用方法创建对象,以后每次获取都是从容器当中拿同一个对象(map当中)。
@Scope("request")//同一个请求创建一个实例
@Scope("session")//同一个session创建一个实例
  • 1
  • 2
  • 3
  • 4

相当于创建一个多实例的线程,Runnable是多线程创建方式之一
Java中线程的创建有两种方式:
1. 通过继承Thread类,重写Thread的run()方法,将线程运行的逻辑放在其中
2. 通过实现Runnable接口,实例化Thread类

@Component
public class TestThreadPoolManager implements BeanFactoryAware {
    private static Logger logger = LoggerFactory.getLogger(TestThreadPoolManager.class);
    //用于从IOC里取对象
    private BeanFactory factory;
    //线程池维护的最少数量
    private final static int CORE_POOL_SIZE = 2;
    //线程池维护线程的最大数值
    private final static int MAX_POOL_SIZE = 10;
    //线程池维护线程所允许的空闲时间
    private final static int KEEP_ALIVE_TIME = 0;
    //线程池所使用的缓冲队列的大小
    private final static int WORK_QUEUE_SIZE = 50;
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException{
        factory = beanFactory;
    }
    /**
     * 用于储存在队列中的订单,防止重复提交,在真实场景中,可用redis代替 验证重复
     */
    Map<String,Object>cacheMap = new ConcurrentHashMap<>();

    /**
     * 订单的缓冲队列,当线程池满了,则将订单存入到此缓冲队列,无界队列
     */
    Queue<Object>msgQueue = new LinkedBlockingQueue<>();
    /**
     * 当线程池的容量满了,执行下面代码,将订单存入到缓冲队列
     */
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //订单加入到缓冲队列
            msgQueue.offer(((BusinessThread) r).getAcceptStr());
            logger.info("系统任务太繁忙,将订单交给调度线程池处理,订单号:" + ((BusinessThread) r).getAcceptStr());
        }
    };
    //创建线程池
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS,new ArrayBlockingQueue<>(WORK_QUEUE_SIZE),this.handler);

    /**将任务加入订单线程池*/
    public void addOrders(String orderId){
        logger.info("此订单准备添加到线程池,订单号: " + orderId);
        //验证当前进入的订单是否存在
        if (cacheMap.get(orderId) == null){
            cacheMap.put(orderId,new Object());
            BusinessThread businessThread = new BusinessThread(orderId);
            threadPool.execute(businessThread);
        }
    }
    /**
     * 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。
     */
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
    /**
     * 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池
     */
    final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //判断缓冲队列是否存在记录
            if (!msgQueue.isEmpty()){
                //当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池
                if (threadPool.getQueue().size()<WORK_QUEUE_SIZE){
                    String orderId = (String)msgQueue.poll();
                    BusinessThread businessThread = new BusinessThread(orderId);
                    threadPool.execute(businessThread);
                    logger.info("\"(调度线程池)缓冲队列出现订单业务,重新添加到线程池,订单号:"+orderId);
                }
            }
        }
    },0,1,TimeUnit.SECONDS);
    /**获取消息缓冲队列*/
    public Queue<Object> getMsgQueue(){
        return msgQueue;
    }
    /**终止订单线程池+调度线程池*/
    public void shutdown(){
        logger.info("终止订单线程池+调度线程池:"+scheduledFuture.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

实现 BeanFactoηAware 接口的 bean 可以直接访问 Spring 容器,被容器创建以后,它会拥有一个指向 Spring 容器的引用,可以利用该bean根据传入参数动态获取被spring工厂加载的bean
ConcurrentHashMap:专门用于解决并发问题,关于hashMap和ConcurrentHashMap的知识可以在以下链接查看:
hashMap && ConcurrentHashMap
使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

LinkedBlockingQueue:为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。

RejectedExecutionHandler:线程池的拒绝策略,如果线程池满了,就把线程放到上面提高的缓冲队列中

new ThreadPoolExecutor:创建一个线程池,设置对应参数,

订单进入之后,检查是否存在,若存在则不处理,不存在的话就开启一个线程,放入线程池,使用一个定时任务检查缓冲队列及线程池状态,满足条件则将线程放入线程池

测试

@RestController
public class TestController {
    private static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Autowired
    TestThreadPoolManager testThreadPoolManager;

    /**
     * 测试模拟下单请求 入口
     * @param id
     * @return
     */
    @GetMapping("/start/{id}")
    public String start(@PathVariable Long id) {
        //模拟的随机数
        String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();

        testThreadPoolManager.addOrders(orderNo);

        return "Test ThreadPoolExecutor start";
    }

    /**
     * 停止服务
     * @param id
     * @return
     */
    @GetMapping("/end/{id}")
    public String end(@PathVariable Long id) {

        testThreadPoolManager.shutdown();

        Queue q = testThreadPoolManager.getMsgQueue();
        logger.info("关闭了线程服务,还有未处理的信息条数:" + q.size());
        return "Test ThreadPoolExecutor start";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

测试的话需要进行模拟高并发测试,可以通过jmeter压测进行,大家可以自行下载,解压之后进入bin目录下,双击jar包就可以了:
启动springboot服务,开启压测,可以看到日志

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/article/detail/52477
推荐阅读
相关标签
  

闽ICP备14008679号