
6.1.1串行地执行任务当围绕任务执行来设计应用程序结构时,第一步就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。独立性有助于实现并发,因为如果存在足够多的处理资源,那么这些独立的任务都可以并行执行。为了在调度与负载均衡等过程中实现更高的灵活性,每项任务还应该表示应用程序的一小部分处理能力。
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。要实现上述目标,应该选择清晰的任务边界以及明确的任务执行策略。
大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界,这些服务器都能通过网络接受远程客户的连接请求。将独立的请求作为任务边界,既可以实现任务的独立性,又可以实现合理的任务规模。
在应用程序中可以通过多种策略来调度任务,而其中一些策略能够更好地利用潜在的并发性。
最简单的策略就是在单个线程中串行地执行各项任务:
class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}
}
6.1.2显式地为任务创建线程
通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性:
class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
}
6.1.3无限制创建线程的不足
6.2Executor框架在生产环境中,为每个任务分配一个线程这种方法存在一些缺陷,尤其是当需要创建大量的线程时:
- 线程生命周期的开销非常高。
- 资源消耗。
- 稳定性。
无论是串行执行还是将每个任务放在各自的线程中执行,这两种方式都存在一些严格的限制:串行执行的问题在于其糟糕的响应性和吞吐量,而为每个任务分配一个线程的问题在于资源管理的复杂性。
因此,需要线程池来进行管理。线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在java类库中,任务执行的主要抽象不是Thread,而是Executor:
public interface Executor {
void execute(Runnable command);
}
6.2.1示例:基于Executor的web服务器Executor基于生产者/消费者模式,提交任务的操作相对于生产者,执行任务的线程则相对于消费者。如果要在程序中实现一个生产者/消费者的设计,最简单的方式通常就是使用Executor。
通过使用Executor,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的Executor实现,就可以改变其行为。改变Executor实现或配置所带来的的影响要远远小于改变任务提交方式带来的影响。通常,Executor的配置是一次性的,因此在部署阶段可以完成,而提交任务的代码却会不断地扩散到整个程序中。
class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
}
可以很容易地将TaskExecutionWebServer 修改为类似ThreadPerTaskWebServer的行为,只需使用一个为每个请求都创建新线程的Executor:
public class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
同样,还可以编写一个Executor使TaskExecutionWebServer 的行为类似于单线程的行为,即以同步的方式执行每个任务,然后再返回:
public class WithinThreadExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
6.2.2执行策略
通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的各个方面,包括:
- 在什么线程中执行任务。
- 任务按照什么顺序执行(FIFO、LIFO、优先级)。
- 有多少个任务能并发执行。
- 在队列中有多少个任务在等待执行。
- 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个任务。另外,如何通知应用程序有任务被拒绝。
- 在执行一个任务之前或之后,应该进行哪些动作。
各种执行策略都是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求。通过限制并发任务的数量,可以确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重影响性能。通过将任务的提交与任务的执行策略分离开了,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。
每当看到下面这种形式的代码时:
new Thread(runnable).start();
6.2.3线程池并且希望获得一种更灵活的执行策略时,请考虑使用Executor来代替Thread。
6.2.4Executor的生命周期线程池是与工作队列密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。另一个好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:
- newFixedThreadPool。创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)。
- newCachedThreadPool。创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
- newSingleThreadExecutor。是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行。
- newScheduledThreadPool。创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。
Executor的实现通常会创建线程来执行任务。但JVM只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。
由于Executor以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。当关闭应用程序时,可能采用最平缓的关闭形式(完成所有已经启动的任务,并且不再接受任何新的任务),也可能采用最粗暴的关闭形式(直接关掉机房的电源),以及其他各种可能的形式。既然Executor是为应用程序提供服务的,因而它们也是可关闭的,并将在关闭操作中受影响的任务的状态反馈给应用程序。
为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法):
public interface ExecutorService extends Executor {
void shutdown();
List shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// ...其他用于任务提交的便利方法
}
在ExecutorService关闭后提交的任务将由拒绝执行处理器来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。
等所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。
class LifecycleWebServer {
private final ExecutorService exec = ...;
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown()) {
log("task submission rejected", e);
}
}
}
}
public void stop() {
exec.shutdown();
}
void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req)) {
stop();
} else {
dispatchRequest(req);
}
}
}
6.2.5延迟任务与周期任务
6.3找出可利用的并行性如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。
6.3.1示例:串行的页面渲染器本节将开发一些不同版本的组件,实现浏览器程序中的页面渲染功能,并且每个版本都实现了不同程序的并发性。
最简单的方法就是对HTML文档进行串行处理:
public abstract class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List imageData = new ArrayList();
for (ImageInfo imageInfo : scanForImageInfo(source)) {
imageData.add(imageInfo.downloadImage());
}
for (ImageData data : imageData) {
renderImage(data);
}
}
}
6.3.2携带结果的任务Callable与Future
Executor框架使用Runnable作为其基本的任务表示形式,然而却有很大的局限性,它不能返回一个值或抛出一个受检查的异常。
许多任务实际上都是存在延迟的计算,例如执行数据库查询、从网络上获取资源等,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象,可以返回一个值,并可能抛出一个异常。
由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。当某个任务完成后,它就永远停留在完成状态上。get方法的行为取决于任务的状态(尚未开始、正在运行、已完成):
- 如果任务已完成,那么get会立即返回或者抛出一个Exception。
- 如果任务没有完成,那么get将阻塞并直到任务完成。
- 如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。
- 如果任务被取消,那么get将抛出CancellationException。
- 如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。
6.3.3示例:使用Future实现页面渲染器可以通过多种方法创建一个Future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。还可以显式地为某个指定的Runnable或Callable实例化一个FutureTask(由于FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用它的run方法)。
从java 6开始,ExecutorService实现可以改写AbstractExecutorService中的newTaskFor方法,从而根据已提交的Runnable或Callable来控制Future的实例化过程。
在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程,即将Runnable或Callable从提交线程发布到最终执行任务的线程。类似的,在设置Future结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。
public abstract class FutureRenderer {
private final ExecutorService executor = Executors.newCachedThreadPool();
void renderPage(CharSequence source) {
final List imageInfos = scanForImageInfo(source);
Callable> task = new Callable>() {
public List call() {
List result = new ArrayList();
for (ImageInfo imageInfo : imageInfos) {
result.add(imageInfo.downloadImage());
}
return result;
}
};
Future> future = executor.submit(task);
renderText(source);
try {
List imageData = future.get();
for (ImageData data : imageData) {
renderImage(data);
}
} catch (InterruptedException e) {
// 重新设置线程的中断状态
Thread.currentThread().interrupt();
// 由于不需要结果,因此取消任务
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
6.3.4在异构任务并行化中存在的局限get方法拥有状态依赖的内在特性,因而调用者不需要知道任务的状态,此外在任务提交和获得结果中包含的安全发布属性也确保了这个方法是线程安全的。Future.get的异常处理代码将处理两个可能的问题:任务遇到一个Exception,或者调用get的线程在获得结果之前被中断。
6.3.5CompletionService:Executor与BlockingQueueFutureRenderer使用了两个任务,其中一个负责渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像的速度(可能性很大),那么程序的最终性能与串行执行时的性能差别不大,而代码却变得更复杂了。当使用两个线程时,至多能将速度提高一倍。因此,虽然做了许多工作来并发执行异构任务以提高并发度,但从中获得的并发性却是十分有限的。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。
6.3.6示例:使用CompletionService实现页面渲染器如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService)。
CompletionService将Executor和BlockingQueue的功能融合在一起。可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用FutureTask中的done方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中。
public abstract class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
final List info = scanForImageInfo(source);
CompletionService completionService =
new ExecutorCompletionService(executor);
for (final ImageInfo imageInfo : info) {
completionService.submit(new Callable() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
}
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
6.3.7为任务设置时限多个ExecutorCompletionService可以共享一个Executor,因此可以创建一个对于特定计算私有,又能共享一个公共Executor的ExecutorCompletionService。因此,CompletionService的作用就相当于一组计算的句柄,这与Future作为单个计算的句柄是非常类似的。通过记录提交给CompletionService的任务数量,并计算出已经获得的已完成结果的数量,即使使用一个共享的Executor,也能知道已经获得了所有任务结果的时间。
有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。
在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。此时可以使用Future.get:当结果可用时,它将立即返回,如果在指定时间内没有计算出结果,那么将抛出TimeoutException。
在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后终止执行或取消任务。此时可再次使用Future,如果一个限时的get方法抛出了TimeoutException,那么可以通过Future来取消任务。如果编写的任务是可取消的,那么可以提前终止它,以免消耗过多的资源。
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future f = exec.submit(new FetchAdTask());
// 在等待广告的同时显示页面
Page page = renderPageBody();
Ad ad;
try {
// 只等待指定的时间长度,将指定时限减去当前时间。这可能会得到负数,但此时会将负数视为0
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
// 如果get超时,那么将取消广告获取任务,并转而使用默认的广告信息
ad = DEFAULT_AD;
// Future.cancel的参数为true表示任务线程可以在运行过程中中断
f.cancel(true);
}
page.setAd(ad);
return page;
}
6.3.8示例:旅行预订门户网站
private class QuoteTask implements Callable{ private final TravelCompany company; private final TravelInfo travelInfo; public QuoteTask(TravelCompany company, TravelInfo travelInfo) { this.company = company; this.travelInfo = travelInfo; } TravelQuote getFailureQuote(Throwable t) { return null; } TravelQuote getTimeoutQuote(CancellationException e) { return null; } public TravelQuote call() throws Exception { return company.solicitQuote(travelInfo); } public TravelQuote call() throws Exception { return company.solicitQuote(travelInfo); } } public class TimeBudget { private static ExecutorService exec = Executors.newCachedThreadPool(); public List getRankedTravelQuotes(TravelInfo travelInfo, Set companies, Comparator ranking, long time, TimeUnit unit) throws InterruptedException { List tasks = new ArrayList (); for (TravelCompany company : companies) { tasks.add(new QuoteTask(company, travelInfo)); } // 调用限时的invokeAll。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时, // invokeAll将返回。当超过指定时限后,任务还未完成的任务都会取消。当invokeAll返回后,每个 // 任务要么正常完成,要么被取消,而客户端代码可以调用get或isCancelled来判断究竟是哪种情况。 List > futures = exec.invokeAll(tasks, time, unit); List quotes = new ArrayList (tasks.size()); Iterator taskIter = tasks.iterator(); for (Future f : futures) { QuoteTask task = taskIter.next(); try { quotes.add(f.get()); } catch (ExecutionException e) { quotes.add(task.getFailureQuote(e.getCause())); } catch (CancellationException e) { quotes.add(task.getTimeoutQuote(e)); } } Collections.sort(quotes, ranking); return quotes; } } interface TravelCompany { TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception; } interface TravelQuote { } interface TravelInfo { }