在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简单,但是存在一个问题:
如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁地创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?
这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。
什么时候使用线程池?
- 单个任务处理时间比较短
- 需要处理的任务数量很大
使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行
- 提高线程的可管理性。线程时稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
Executor框架接口
Executor框架是一个根据一组执行策略调动,调度,执行和控制的异步任务的框架,目的是提供一种将“任务提交”与“任务如何运行”分离开来的机制。
JUC中有三个Executor接口:
- Executor:一个运行新任务的简单接口
- ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法
- ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。
Executor接口
1 | public interface Executor { |
Executor接口只有一个execute方法,用来替代通常创建或启动线程的方法。
ExecutorService接口
ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成的Future方法。增加了shutDown()
、shutDownNow()
、invokeAll()
、invokeAny()
、submit()
等方法。
ScheduledExecutorService接口
ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()
方法和scheduleWithFixedDelay()
方法。
几个重要的字段
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
ctl
是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分的信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
下面再介绍下线程池的运行状态. 线程池一共有五种状态, 分别是:
- RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
- STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
- TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
- TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
下图为线程池的状态转换过程:
ctl相关方法
1 | private static int runStateOf(int c) { return c & ~CAPACITY; } |
- runStateOf:获取运行状态(高3位)
- workerCountOf:获取活动线程数(低29位)
- ctlOf:获取运行状态和活动线程数的值
execute
1 | public void execute(Runnable command) { |
- 如果
workerCount < corePoolSize
,则创建并启动一个线程来执行新提交的任务 - 如果
workerCount >= corePoolSize
- 线程池内的阻塞队列未满,则将任务添加到阻塞队列中。
- 线程池内的阻塞队列已满
- 如果
workerCount < maximumPoolSize
,则创建并启动一个线程来执行新提交的任务 - 如果
workerCount >= maximumPoolSize
,则根据拒绝策略来处理该任务,默认的处理方式是直接抛出异常
- 如果
这里需要注意一下addWorker(null, false)
也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workerQueue中获取任务。所以在workerCountOf(recheck) == 0
时执行addWorker(null, false)
也是为了保证线程池在RUNNING状态下必须有一个线程来执行任务。
execute方法执行流程如下:
addWorker
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程是否少于corePoolSize,false表示新增线程前需要判断当前线程数是否少于maximumPoolSize:
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
注意一下这里的t.start()这个语句,启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
Worker类
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象。
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable { |
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。(setCorePoolSize会调用interruptIdleWorkers)
所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢?是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:
1 | protected boolean tryAcquire(int unused) { |
tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。(调用了tryRelease将state设置为0)
runWorker
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:
1 | final void runWorker(Worker w) { |
这里说明一下第一个if判断,目的是:
- 如果线程池正在停止,那么要保证当前线程是中断状态
- 如果不是的话,则要保证当前线程不是中断状态
这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:
不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。
总结一下runWorker方法的执行过程:
- while循环不断地通过getTask()方法获取任务;
- getTask()方法从阻塞队列中取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法;
- runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
getTask
getTask方法用来从阻塞队列中获取任务
1 | private Runnable getTask() { |
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。在执行execute方法时,如果当前线程池的数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timeOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?当然是runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自行回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,然后会执行processWorkerExit
方法。
processWorkerExit
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束:
tryTerminate
tryTerminate方法根据线程池状态进行判断是否结束线程池
1 | final void tryTerminate() { |
interruptIdleWorkers(ONLY_ONE)
的作用是因为在getTask方法中执行workQueue.take()
时,如果不执行中断会一直阻塞。在下面介绍的shutdown方法中,会中断所有空闲的工作线程,如果在执行shutdown时工作线程没有空闲,然后又去调用了getTask方法,这时如果workQueue中没有任务了,调用workQueue.take()
时就会一直阻塞。所以每次在工作线程结束时调用tryTerminate方法来尝试中断一个空闲工作线程,避免在队列为空时取任务一直阻塞的情况。
shutdown
shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。
1 | public void shutdown() { |
这里思考一个问题:在runWorker方法中,执行任务时对Worker对象w进行了lock操作,为什么要在执行任务的时候对每个工作线程都加锁呢?
下面仔细分析一下:
- 在getTask方法中,如果这时线程池的状态是SHUTDOWN并且workQueue为空,那么就应该返回null来结束这个工作线程,而使线程池进入SHUTDOWN状态需要调用shutdown方法
- shutdown方法会调用interruptIdleWorkers来中断空闲的线程,interruptIdleWorkers持有mainLock,会遍历workers来逐个判断工作线程是否空闲。但getTask方法中没有mainLock
- 在getTask中,如果判断当前线程池状态是RUNNING,并且阻塞队列为空,那么会调用
workQueue.take()
进行阻塞 - 如果在判断当前线程池状态是RUNNING后,这时调用了shutdown方法把状态改为了SHUTDOWN,这时如果不进行中断,那么当前的工作线程在调用了
workQueue.take()
后会一直阻塞而不会被销毁,因为在SHUTDOWN状态下不允许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了 - 由上可知,shutdown方法与getTask方法(从队列中获取任务时)存在竞争条件
- 解决这一问题就需要用到线程的中断,也就是为什么要用interruptIdleWorkers方法。在调用
workQueue.take()
时,如果发现当前线程在执行之前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态 - 但是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断
- 所以Worker继承自AQS,在工作线程处理任务时会进行lock,
interruptIdleWorkers
在进行中断时会使用tryLock来判断该工作线程是否正在处理任务,如果tryLock返回true,说明该工作线程当前未执行任务,这时才可以被中断(执行线程阻塞在workQueue.take()中)。
interruptIdleWorkers
1 | private void interruptIdleWorkers() { |
1 | private void interruptIdleWorkers(boolean onlyOne) { |
interruptIdleWorkers遍历workers中所有的工作线程,若线程没有被中断且tryLock获取锁成功,就中断该线程。
为什么需要持有mainLock?因为workers是HashSet类型的,不能保证线程安全
shutdownNow
1 | public List<Runnable> shutdownNow() { |
shutdownNow方法与shutdown方法类似,不同的地方在于:
- 设置状态为STOP
- 中断所有工作线程,无论是否是空闲的
- 取出阻塞队列中没有被执行的任务并返回
shutdownNow方法执行完之后调用tryTerminate方法,目的就是使线程池的状态设置为TERMINATED
interruptWorkers
1 | private void interruptWorkers() { |
interruptWorkers
不检测工作线程是否空闲,调用interruptIfStarted
强制中断线程。
1 | void interruptIfStarted() { |
线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
- getTaskCount:线程池已经执行的和未执行的任务总数
- getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
- getPoolSize:线程池当前的线程数量
- getActiveCount:当前线程池中正在执行任务的线程数量
通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供的几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。