赞
踩
运用多线程只有一个目的,就是为了更好的利用cpu的资源。当然, 大部分多线程代码都可以用单线程实现,但是现实中多数的高并发场景(抢票、改名等)都需要通过多线程编程来进行模拟
尽管可以通过new Thread创建多个线程,但通过该方法创建线程时可能有以下问题
相比较之下,通过线程池创建多个线程更利于管理和操作,也能提高线程和系统资源的利用率
较为常用的有两大种方法,这两种方法都是通过创建 ExecutorService 接口类(实现了 ExecutorService 接口)的对象实现
通过 ThreadPoolExecutor 实现线程池最大的优势就是自定义线程池,ThreadPoolExecutor 继承于 AbstractExecutorService,其构造方法有四种重构
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,//前五项为基础参数
ThreadFactory threadFactory,//后两项通过排列组合构成三种重构
RejectedExecutionHandler handler)
其中 corePoolSize 为线程池的基本大小,可以理解为*“可以同时进行的最大线程数”*,maximumPoolSize 线程池允许的最大线程数,keepAliveTime 线程最长存活时间,TimeUnit unit 设置存活时间单位
TimeUnit.DAYS //天 TimeUnit.HOURS //小时 TimeUnit.MINUTES //分钟 TimeUnit.SECONDS //秒 TimeUnit.MILLISECONDS //毫秒
- 1
- 2
- 3
- 4
- 5
BlockingQueue 阻塞队列,常用的是 LinkedBlockingQueue<>(int len),一种基于链表实现的可选容量的阻塞队列,如果在初始化时没有指定容量,那么将会默认使用 int 的最大值作为容量
ThreadFactory 线程工厂,主要用于创建线程
RejectedExecutionHandler 当拒绝处理任务时的策略,有以下选项
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
- 1
- 2
- 3
- 4
Executors.newFixThreadPool(int nThreads) :创建一个固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
线程池无界队列中的线程可以被重用(并在需要时使用提供的ThreadFactory创建新线程),如果所有线程都在活动中时提交了额外任务,那么任务将在队列中等待直到有可用线程。如果某一线程在执行任务过程中失败而被终止,那么在执行后续任务时将用新的线程代替,池中线程将一直存在直到其被显式指定为 shutdown
Executors.newSingleThreadExecutors() :创建一个单线程的执行程序
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
该执行程序在无界队列上操作单一线程(并在需要时使用提供的ThreadFactory创建新线程),如果某一线程在执行任务过程中失败而被终止,那么在执行后续任务时将用新的线程代替。线程将被保证按顺序进行,而且在给定时间内的活动线程不会大于一个。该方法保证返回的执行程序不会被配置去使用其他的线程
Executors.newCachedThreadPool() :创建一个可根据需要创建新线程的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
该线程池可以按需创建新的线程(并在需要时使用提供的ThreadFactory创建新线程),但如果之前创建的线程可用,将优先使用之前已创建的线程,如果没有可用的现有线程,则创建一个新线程并将其加入至池中。60 s 内未被使用的线程将被终结并从缓存中删除
Executors.newSingleThreadScheduledExecutor() :创建一个可以延时的单线程执行程序
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
该执行程序可以延迟至给定时间后或定期执行(并在需要时使用提供的ThreadFactory创建新线程),如果处于任务中的单线程由于任务失败而终止,那么在执行后续任务时将用新的线程代替。线程将被保证按顺序进行,而且在给定时间内的活动线程不会大于一个。该方法保证返回的执行程序不会被配置去使用其他的线程
为什么不推荐使用 Executors 创建线程池
在《阿里巴巴java开发手册》中明确禁止不允许使用 Executors 创建线程池
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
为加深对多线程的理解,尝试写了一个多线程抢票系统,该系统需满足以下要求:
package tickets; import java.util.Random; import java.util.concurrent.CountDownLatch; class Client extends Thread{ private int num;//剩余票数 private int sold_num = 0;//已卖出票数 private CountDownLatch latch;//---------1 public Client(int tickets_left, CountDownLatch latch) { this.latch = latch; this.num = tickets_left; } @Override public synchronized void run() {//---------2 Random rand = new Random(); int buy_in = rand.nextInt(10) + 1; if(num >= buy_in) { num -= buy_in; sold_num += buy_in; System.out.println(Thread.currentThread().getName() + " thread buys in " + buy_in + " ticket(s)"); System.out.println(num + " tickets left"); } else { System.out.println("Tickets sold out"); } latch.countDown(); } public int sold() { return sold_num; } }
package tickets; import java.util.concurrent.*; public class server { private static ExecutorService pool; private static int TICKETS = 5000; private static CountDownLatch latch; public static void main(String[] args){ pool = new ThreadPoolExecutor(50, 1000, 300, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));//---------3 latch = new CountDownLatch(1000); Client task = new Client(TICKETS, latch); for(int i = 0; i < 1000; i++){ pool.execute(task); } try { latch.await(); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println(task.sold() + " tickets sold"); pool.shutdown(); } }
pool-1-thread-1 thread buys in 8 ticket(s)
4992 tickets left
pool-1-thread-50 thread buys in 3 ticket(s)
4989 tickets left
pool-1-thread-49 thread buys in 7 ticket(s)
4982 tickets left
......
pool-1-thread-36 thread buys in 7 ticket(s)
4 tickets left
pool-1-thread-36 thread buys in 4 ticket(s)
0 tickets left
Tickets sold out
......
5000 tickets sold
CountDownLatch
可以理解为线程间的计数器,将 CountDownLatch 的计数器初始化为 new CountDownLatch(n),每当一个线程结束自己的任务计数器就会减一,当计数器的值为0时,在 CountDownLatch 上等待的线程便可以继续执行
其缺点是,CountDownLatch 在构造方法中只能初始化一次,之后没有任何方法能设置它的值,在计数器归零后不能被再次使用
在本例中,Client 类初始化了一个 CountDownLatch 计数器,每一个线程执行完任务后,latch 调用 latch.countDown() 方法使计数器减一,同时在 server 类中也构造了一个计数器,并将其初始化为1000(即假设的顾客人数),在后面调用 latch.await() 方法使其等待直到计数器减为0,即所有顾客完成购票后,再打印总的购票信息,完成抢票
synchronized
Java中的一种同步锁。Java中每一个对象都可以作为锁,这是 synchronized 实现同步的基础。由于在并发编程中存在线程安全问题(有共享数据,多线程同时操作共享数据等),同步锁的存在可以防止多个线程同时对某个共享数据进行读写,确保当一个线程调用该上锁方法时,其他线程必须处于阻塞中
synchronized 的修饰对象有以下几种:
在本例中,因为在创建线程池时设置了 corePoolSize 为50,意味着同步进行的线程最多可以有50个,所以需要将重载的 run 方法加上 synchronized 修饰,限制了同一时间段只有一个线程能执行 run 方法,即实现排队售票。
当然,最简单的解决方法就是我们可以将线程池中的 corePoolSize 设置为1,即直接在初始化线程池时设置最大并发数量为1,就可以不用给 run 加锁
参数详见创建线程池的第一大方法
更多文章详见个人网站ouz2hou.top,最新文章将第一时间在个人网站发表
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。