Java语言开发人员在开发并发的多线程应用时,可能会觉得编写在多线程环境中执行良好的健壮代码并不总是那么简单。
业界流传着一句话说:“初级程序员认为并发性很难,有经验的程序员认为并发很容易,高级程序员认为并发性很难。”
什么意思呢?开始理解和学习并发多线编程感觉很难理解和掌握,但是当学习一段时间后发现并发多线程编程就是借助部分接口和类实现,基本固定模式的去编写,所以觉得很简单就实现了。但是随着对并发编程和多线程的理解,那些有经验的高级程序员却发现要写出高效安全的并发多线程应用程序本身就不是一件容易的事情。
我们知道Java在开发多线程应用程序时,创建线程最基础的方式是实现Runnable接口,然后将实现类传递给Thread构造函数,或者通过扩展Thread类重写其run()方法来实现新的Thread扩展类,直接实例化该类实例,启动线程。
它涉及到安全的数据结构和类的定义,以及通过各种安全同步处理的操作方法,因此Java的语言库为我们提供了一些经过测试和考验的线程安全的固定类库,来简化并发多线程应用的开发。它包括一些可以直接使用的数据结构和功能,它们都包含在我们熟悉的java.util.concurrent Jar包里。
对于这个Jar包的内容,我们理解它的入口点是接口Executor。它定义了一个唯一的方法execute()接收一个Runnable接口类型的实现对象。
public interface Executor {
\tvoid execute(Runnable command);
}
该接口是对一个指定Runnable实例的执行的封装。它表示在将来的某个时间点执行指定的Runnable实现的指令。
我们可以简单实现一下该Executor接口:
如此我们就有了一个个在未来某个时间点上执行指令的类,实例化它就可以调用其execute()方法执行某条指令实现。
由于我们的并发多线程编程过程中不可能只使用一到两个线程,我们可以手动去实例化定义以及管理它们,而当使用多个线程时,多次创建线程也是一个耗时的过程,为此JDK提供了一个ThreadPoolExecutor,它是一个线程池执行成熟的、可扩展的实现。
在底层,ThreadPoolExecutor维护一个线程池,并将为给定execute()方法的Runnable实例分派给池中线程实例执行。
传递给构造函数的参数控制线程池的行为。参数最多的构造函数如下:
下面我们来看一个简单的使用示例:
这里我们的run()实现只休眠5秒,但这不是这段代码的主要关注点。
ThreadPoolExecutor从启动5个核心线程开始,允许池最多增长8个线程。出于演示目的,我们只允许未使用的线程空闲大约1秒。
这里的队列实现是一个LinkedBlockingQueue,具有10个可运行实例的容量。为了跟踪线程的创建,我们还实现了一个简单的ThreadFactory。
为处理跟踪处理因线程饱和拒绝执行实例,我们实现了RejectedExecutionHandler。
main()方法中的循环现在在短时间内向池发出20个Runnable实例。
示例的输出显示,我们必须创建8个线程(最多8个)来处理所有挂起的可运行项:
结果还显示,taskId大于18的所有任务都被转发给RejectedExecutionHandler。
这是因为我们的Runnable实现休眠5秒。在前10个线程启动之后,队列只能容纳另外8个可运行实例。然后必须拒绝所有其他实例。
最后,shutdown()方法允许ThreadPoolExecutor拒绝所有进一步的任务,并等待已经发出的任务被执行。
我们可以将shutdown()调用替换为shutdownNow()调用。
shutdownNow()试图中断所有正在运行的线程,并在不等待所有线程完成的情况下关闭线程池。
在上面的示例中,我们将看到8个InterruptedException异常,因为我们的8个休眠线程将立即被唤醒。
我们知道上面定义的基于Runnable接口执行任务,其执行函数是一个无返回值run()方法,用于执行返回值为void的任务。
如果需要线程执行的任务有返回值,为此我们定义了Callable接口,该接口定义了方法
V call();
Executor接口非常简单,它只强制底层实现实现execute()方法。而ExecutorService在扩展Executor接口并添加一系列实用程序方法(例如,添加了完整的任务集合)、关闭线程池的方法,以及查询执行一个任务的结果的实现的能力。
那么JDK如何来处理任务返回值并提交给线程池以执行呢?
由于任务的提交者无法提前知道任务何时执行以及执行持续多长时间。所以让当前线程等待结果显然不是解决方案。
那么我们就需要通过另外实现一个检查线程结果是否已经可用的功能,来阻塞或等待一定的时间从而获取执行的结果。
这就是java.util.concurrent.Future
另外我们还有一个接口,它通过增加一些方法扩展Executor接口和ExecutorService接口,使得可以在给定的时间点调度任务。这个接口的名称是ScheduledExecutorService,它基本上提供了一个schedule()方法,该方法需要一个参数,知道任务执行需要多长时间:
schedule(Callablecallable, long delay, TimeUnit unit);
schedule(Runnable command, long delay, TimeUnit unit);
与ExecutorService类似,schedule()方法有两种变体:
一个用于Runnable接口,另一个用于Callable接口返回值的任务。
ScheduledExecutorService还提供了一种定期执行任务的方法:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
period参数可以指定任务应该运行的周期,unit是单位。
下面我们编写一个示例来展示了如何创建ThreadPoolExecutor。
这里ScheduledExecutorService的实现名为ScheduledThreadPoolExecutor,必须与上面使用的ThreadPoolExecutor非常相似地处理。但是通常不需要完全控制ExecutorService的所有特性。
比如我们想象一个简单的测试客户机,它应该使用一个简单的ThreadPool调用一些服务器方法。
为此JDK为我们创建了一个名为Executors的简单工厂类,这里请注意后面的“s”,该类提供了一些静态方法来创建一个可读的ThreadPoolExecutor。所有这些都可以让我们实现了一个简单的线程池,它执行一系列计算一些数字的任务。
注:这里的数字处理操作是为了演示目的,用一个简单的Thread.sleep()代替。
ExecutorService的创建只需要一行程序代码就完成了。这里为了执行一些任务,我们只需要一个for循环,它创建了ExecutorsExample的几个新实例,并将返回的Future存储在一个数组中。在向服务提交任务之后,我们只需要等待结果。Future的get()方法正在阻塞,即当前线程休眠,直到结果可用为止。
当然,此方法可以采用超时规范来重写,以便在任务未在定义的时间段内完成时,让等待的线程继续执行。
在Java的多线性并发编程中,线程的创建一般不要采用手动去创建,最好使用线程池来创建,从而减少线程创建过程中的性能损耗。
为此,我们可以使用java.util.concurrent包里定义的一些线程池实现接口和类,从而方便有效从处理多线程并发编程过程中线程的创建问题。其中Executors工厂方法还提供了很多诸如创建固定线程数量的线程池,还有创建弹性线程数量的线程池,以及指定线程执行周期等具体的方法,请参见Java类库的说明文档。