赞
踩
应选择清晰的任务边界以及明确的任务执行策略。
一种自然的任务边界选择方式:以独立的客户请求为边界。
程序6-1-2-1 在Web服务器中为每一个请求启动一个新的线程(不要这么做)
class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
}
串行情况下可以提升性能。只要请求的到达速率不超出服务器的请求处理能力。
线程生命周期的开销非常高。 线程创建需要时间,延迟处理的请求,并且需要JVM和操作系统提供一些辅助操作。
资源消耗。 活跃线程会消耗系统资源,尤其是内存。大量空闲的线程会占用许多内存,给垃圾收集器带来压力,而且大量线程在竞争CPU资源时还会产生其他的性能开销。如果有足够多的线程使所有的CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。
稳定性。 在一定范围内,增加线程可以提高系统的吞吐率,但是如果超出了这个范围,再创建更多的线程会降低程序的执行速度。如果过多地创建一个线程,那么整个应用程序将崩溃。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程相当于消费者(执行完这些工作单元)。
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.Executors; class TaskExecutionWebServer { private static final int NTHREADS = 100; private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { @Override public void run() { handleRequest(connection); } }; exec.execute(task); } } }
使用线程池来取代new Thread方式。
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; public class LifeCycleWebServer { private final ExecutorService exec = ...; public void start() throws IOException { ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { @Override public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) { log("task submission rejected", e); } } } } public void stop() { exec.shutdown(); } void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) { stop(); } else { dispatchRequest(req); } } }
推荐使用ScheduleThreadPoolExecutor。
Timer存在的问题:
(1)Timer支持基于绝对时间的调度机制,因此任务的执行对系统时钟变化很敏感。ScheduleThreadPoolExecutor只支持基于相对时间的变化。
(2)Timer执行所有的定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。例如某个周期TimerTask需要每10ms执行一次,而另一个需要40ms,那么这个周期任务或者在40ms之后快速连续调用四次,或者彻底丢失4次调用。
(3)Timer线程并不捕获异常。如果TimerTask抛出异常终止了定时任务,timer不会回复线程执行,而是会错误的认为整个Timer都被取消了。 因此,已被调度但尚未执行的TimerTask将不会再执行,新的任务也不会被调度。
程序清单 6-2-5-1 错误的Timer行为
import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; public class OutOfTime { public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); Timer timer = new Timer(); timer.schedule(new ThrowTask(), 1); TimeUnit.SECONDS.sleep(1); long endTime = System.currentTimeMillis(); System.out.println(endTime - startTime); timer.schedule(new ThrowTask(), 1); TimeUnit.SECONDS.sleep(5); } static class ThrowTask extends TimerTask { @Override public void run() { throw new RuntimeException(); } } } Exception in thread "Timer-0" java.lang.RuntimeException at executor.OutOfTime$ThrowTask.run(OutOfTime.java:22) at java.base/java.util.TimerThread.mainLoop(Timer.java:566) at java.base/java.util.TimerThread.run(Timer.java:516) 1002 Exception in thread "main" java.lang.IllegalStateException: Timer already cancelled. at java.base/java.util.Timer.sched(Timer.java:409) at java.base/java.util.Timer.schedule(Timer.java:205) at executor.OutOfTime.main(OutOfTime.java:15)
程序一秒钟就结束了,并抛出了异常。
在Java5.0或者更高的JDK中,将很少使用Timer。
import java.util.ArrayList; import java.util.List; public class SingleThreadRender { void renderPage(CharSequence source) { renderText(source); List<ImageData> imageData = new ArrayList<>(); for (ImageInfo imageInfo : scanForImageInfo(source)) { imageData.add(imageInfo.downloadImage()); } for (ImageData data : imageData) { renderImage(data); } } }
程序清单6-3-3-1 使用Future等待图像下载
import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; public class FutureRender { private final ExecutorService executor = ...; void renderPage(CharSequence source) { final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task = new Callable<List<ImageData>>() { @Override public List<ImageData> call() throws Exception { List<ImageData> result = new ArrayList<>(); for (ImageInfo imageInfo : imageInfos) { result.add(imageInfo.downloadImage()); } return result; } }; Future<List<ImageData>> future = executor.submit(task); renderText(source); try { List<ImageData> imageDataList = future.get(); for (ImageData imageData : imageDataList) { renderImage(imageData); } } catch (InterruptedException e) { // 重新设置线程的中断状态 Thread.currentThread().interrupt(); // 由于不需要结果,因此取消任务 future.cancel(true); } catch (ExecutionException e) { throw launcherThrowable(e.getCause()); } } }
只有大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; public class Render { private final ExecutorService excutor; Render(ExecutorService excutor) { this.excutor = excutor; } void renderPage(CharSequence source) { List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService<>(excutor); for (final ImageInfo imageInfo: info) { completionService.submit(new Callable<ImageData>() { @Override public ImageData call() throws Exception { return imageInfo.downloadImage(); } }); } renderText(source); try { for (int t = 0, n = info.size(); t < n; t++) { Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw launcherThrowable(e.getCause()); } } }
Page renderPageWithAd() throws InterruptedException { long endNanos = System.nanoTime() + END_BUDGET; Future<Ad> f = excutor.submit(new FetchAdTask()); // 在等待广告的同时显示页面。 Page page = renderPageBody(); Ad ad; try { long timeLeft = endNanos - System.nanoTime(); ad = f.get(timeLeft, TimeUnit.NANOSECONDS); } catch (ExecutionException e) { ad = DEFAULT_AD; } catch (TimeoutException e) { ad = DEFAULT_AD; f.cancel(true); } page.setAd(ad); return page; }
import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; private class QuoteTask implements Callable<TravelQuote> { private final TravelCompany company; private final TravelInfo travelInfo; @Override public TravelQuote call() throws Exception { return company.solicitQuote(travelInfo); } } public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException { List<QuoteTask> tasks = new ArrayList<>(); for (TravelCompany company : companies) { tasks.add(new QuoteTask(company, travelInfo)); } List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit); List<TravelQuote> quotes = new ArrayList<>(futures.size()); Iterator<QuoteTask> iterator = tasks.iterator(); for (Future<TravelQuote> f : futures) { QuoteTask task = iterator.next(); try { quotes.add(f.get()); } catch (ExecutionException e) { quotes.add(task.getFailureQuote(e.getCause())); } catch (CancellationException e) { quotes.add(task.getTimeoutQuote(e.getCause())); } } Collections.sort(quotes, ranking); return quotes; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。