当前位置:   article > 正文

JavaEE初阶Day 11:多线程(9)

JavaEE初阶Day 11:多线程(9)

Day 11:多线程(9)

生产者消费者模型

1. 阻塞队列实现

package thread;

class MyBlockingQueue {
    private String[] elems = null;
    //[head, tail)
    //head位置指向的是第一个元素,tail指向的是最后一个元素的下一个元素
    private volatile int head = 0;
    private volatile int tail = 0;
    private volatile int size = 0;

    public MyBlockingQueue(int capacity){
        elems = new String[capacity];
    }

    void put(String elem) throws InterruptedException {
        synchronized (this) {
            while (size >= elems.length){
                //队列满了,进行队列阻塞
                this.wait();
            }
            //把新的元素放到tail所在的位置上
            elems[tail] = elem;
            tail++;
            if (tail >= elems.length) {
                //到达末尾,就回到开头
                tail = 0;
            }
            //更新size的值
            size++;


            //唤醒下面 take 阻塞的wait
            this.notify();
        }


    }

    String take() throws InterruptedException {
        synchronized (this) {
            while (size == 0) {
                //队列空了,进行阻塞
                this.wait();
            }
            //取出 head 指向的元素
            String result = elems[head];
            head++;
            if (head >= elems.length) {
                head = 0;
            }

            size--;
            //take 成功一个元素,就唤醒上面put中的wait操作
            this.notify();
            return result;
        }
    }
}

public class Demo31 {
    public static void main(String[] args) {
        MyBlockingQueue queue = new MyBlockingQueue(1000);

        Thread t1 = new Thread(() -> {
            try {
                int count = 1;
                while (true) {
                    queue.put(count + "");
                    System.out.println("生产" + count);
                    count++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread t2 = new Thread(() -> {
            try {
                while (true){
                    String result = queue.take();
                    System.out.println("消费" + result);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });

        t1.start();
        t2.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • wait不仅仅会被notify/notifyAll唤醒,也可能被其他的操作唤醒,比如interrupt
  • 上述代码加锁后,采用while本质上是当wait被唤醒之后,再次确认条件是否满足,是否能往下执行
  • 如果采用if进行条件判断,而不是while,那么一旦wait被唤醒了,此时逻辑就会往下走,不管条件判断是否成立,当前这个代码,理论上不会出现这个情况,即便是被interrupt唤醒,也是方法直接结束
  • 但是按照文档的建议,使用while的方法,是更稳妥的做法,被唤醒之后,再次确认一下,确认一下条件是否成立
  • 最好的做法就是只要使用wait的时候,都搭配while判定条件的方式

线程池

并发编程,使用多线程就可以了,线程比进程更加轻量,在频繁创建和销毁的时候,更有优势,但是随着时代的发展,对于“频繁”有个新的定义,现有的要求下,频繁创建销毁线程,开销也变得越来越明显了

如何进行优化

  • 线程池
  • 协程(纤程)

后续高版本Java中引入的虚拟线程,本质上就是协程;Go语言的主打卖点,就是使用协程处理并发编程

为什么引入线程池/协程之后能够提升效率,最关键的要点是

  • 直接创建/销毁线程,是需要用户态+内核态配合完成的工作,直接调用api创建线程和销毁线程,这个过程需要内核完成,内核完成工作很多时候是不太可控
  • 线程池/协程,创建和销毁,只通过用户态即可,不需要内核态的配合,使用线程池,提前把线程都创建好,放到用户态代码中写的数据结构里面,后面使用的时候,随时从池子里去取,用完了放回池子里去,这个过程完全是用户态代码,不需要和内核进行交互

简单来讲,就是一次性从系统申请出10个线程,可以用10个变量表示,也可以用一个长度为10的元素的数组来表示,也可以用hash等表示

协程本质上也是纯用户态操作,规避内核操作,不是在内核里把线程提前创建好,而是用一个内核的线程来表示多个协程(纯用户态,进行协程之间的调度)

常量池、数据库连接池、线程池、进程池、内存池的思想都是一样的

1. 标准库线程池(ThreadPoolExecutor)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
1.1 corePoolSize & maximumPoolSize
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数

标准库的线程池把线程分为两类

(1)核心线程:corePoolSize

(2)非核心线程:maximumPoolSize = 核心线程数 + 非核心线程数

动态扩展

  • 一个线程池,刚被创建出来的时候,里面就包含核心线程数这么多的线程
  • 假设此时线程池里包含4个线程,线程池会提供一个submit方法,往里面添加任务,每个任务都是一个Runnable对象
  • 如果当前添加的任务比较少,4个线程就足以能够处理,就只有4个线程在工作了
  • 如果添加的任务比较多,4个线程处理不过来,有很多的任务在排队等待执行,这个时候线程就会自动创建出新的线程,来支撑更多的任务
  • 创建出来的线程总数,不能超过最大线程数
  • 过了一段时间之后,任务没那么多了,线程清闲了,部分线程就会被释放掉(回收了),回收只是把非核心线程回收掉,至少保证线程池中线程数目不少于核心线程数

既可以保证任务多的时候的效率,也能保证任务少的时候,系统开销小

实际开发中,线程数应该设置成多少合适

不仅和电脑配置有关,更重要的是,和程序的实际特点有关系

极端一点,可以将程序分成两个大类

  • CPU密集型程序:代码完成的逻辑,都是要通过CPU来完成的,此时线程数目,不应该超过CPU逻辑核心数,例如代码逻辑都是在进行算数运算、条件判定、循环判定和函数调用
  • IO密集型程序:代码大部分时间在等待IO,等待IO是不消耗CPU的,不参与调度,不仅仅是标准输入输出,也可能是sleep、操作硬盘、操作网络等,此时瓶颈不在CPU上,每个线程消耗CPU只有一点点,更多的是考虑其他方面,比如网络程序,要考虑网卡带宽的瓶颈

上述两类都太理想化了,实际开发中,一个程序中既包含CPU操作,也包含IO操作,介于CPU密集和IO密集两者之间

最终的答案是:根据实验的方式,找到一个合适的值来设置线程数,对程序进行性能测试,测试过程中,设定不同的线程池数值,最终根据实际程序的响应速度和系统开销综合权衡,找到一个你觉得最合适的值

1.2 keepAliveTime & unit
  • keepAliveTime:非核心线程,允许空闲的最大时间,在线程池不忙的时候回收掉
  • unit:上述时间的单位,比如秒、分钟、小时、毫秒等

例如,设定保留时间3s,3s之内,非核心线程没有任务执行了,此时就可以回收了

1.3 BlockingQueue workQueue

线程池的任务队列,线程池会提供submit方法,让其他线程把任务提交给线程池

线程池内部需要有一个队列这样的数据结构,把要执行的任务保存起来,后续线程池内部的工作线程,就会消费这个队列,从而来完成具体的任务执行

1.4 ThreadFactory threadFactory

标准库中提供的用来创建线程的工厂类,这个线程工厂主要是为了批量的给要创建的线程设置一些属性,在工厂方法中,把线程的属性提前初始化好了

工厂模式也是一种设计模式,主要解决的是:基于构造方法创建对象存在的问题

例子:创建一个类来表示一个点

class Point {
	public Point(double x, double y) {...} //笛卡尔坐标系
	public Point(double r, double a) {...} //极坐标系
}
  • 1
  • 2
  • 3
  • 4

上述代码会出现编译报错,这两个版本的构造方法不能构成重载,此时无法通过构造方法来表示不同的构造点的方式了,本质上因为构造方法名字是固定的,无法搞成不同的方法名字,要想提供不同的版本,就只能想办法在参数上做出区分

工厂模式的核心思路是:不再使用构造方法创建对象,而是给构造方法包装一层

package thread;

class Point {

}

class PointBuilder {
    public static Point makePointByXY(double x, double y) {
        Point p = new Point();
        p.setX(x);
        p.setY(y);
        return p;
    }

    public static Point makePointByRA(double r, double a) {
        Point p = new Point();
        p.setR(r);
        p.setA(a);
        return p;
    }
}

public class Demo33 {
    public static void main(String[] args) {
        Point p = PointBuilder.makePointByXY(10, 20);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
1.5 RejectedExecutionHandler handler

面试官问:线程池的参数都是什么意思,本质是在考拒绝策略

拒绝策略:是一个枚举类型,采用哪种拒绝策略

例如:如果当前任务队列满了,仍然要继续添加任务怎么办,下列四种拒绝策略

  • ThreadPoolExecutor.AbortPolicy:直接抛出异常
  • ThreadPoolExecutor.CallerRunsPolicy:谁负责添加任务,谁负责执行,线程池本身不管了
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃掉最老的任务,让新的任务去队列中排队
  • ThreadPoolExecutor.DiscardPolicy:丢弃最新的任务,按照原有的节奏来执行
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/433661
推荐阅读
相关标签
  

闽ICP备14008679号