当前位置 博文首页 > RtxTitanV的博客:Java并发总结之线程池
本文主要对线程池进行一个总结,文中的源码解析都是基于JDK1.8。
线程池是指管理一组同构工作线程的资源池。线程池是与工作队列(Work Queue)密切相关的,其中在工作队列中保存了所有等待执行的任务。工作线程(Worker Thread)的任务就是从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。
与线程池相关的主要的类和接口的实现继承体系如下:
Executor
:线程池的顶级接口,但并不是一个线程池,而是任务执行(Executor)框架的核心接口,它将任务的提交和执行分离开来。ExecutorService
:真正的线程池接口。ScheduledExecutorService
:主要用来解决任务需要重复执行的问题。ThreadPoolExecutor
:线程池的核心实现类,也是Executor框架最核心的类,用来执行被提交的任务。ScheduledThreadPoolExecutor
:继承自ThreadPoolExecutor
类并实现ScheduledExecutorService
接口,用于周期性的执行任务。创建线程池主要是ThreadPoolExecutor
类来完成,ThreadPoolExecutor
类中提供的四个构造方法可以创建自定义的线程池,推荐使用这种方式创建线程池。除了通过ThreadPoolExecutor
构造方法实现外,还可以通过Executor框架的工具类Executors
来实现,可以创建newFixedThreadPool
、newCachedThreadPool
、newSingleThreadExecutor
、newScheduledThreadPool
这四种常见的线程池。
《阿里巴巴Java开发手册》中强制线程池不允许使用
Executors
创建,而是通过ThreadPoolExecutor
构造函数的方式,这样的处理方式让创建者更加明确线程池的运行规则,规避资源耗尽的风险。
Executors
创建线程池对象的弊端如下:
- FixedThreadPool和SingleThreadExecutor:堆积的请求处理队列可能会耗费非常大的内存,从而导致OOM。
- CachedThreadPool和ScheduledThreadPool:允许创建的线程数量为
Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
当一个任务提交给线程池时,任务的执行过程如下:
可以通过调用线程池的shutdown()
和shutdownNow()
方法来关闭线程池。它们的原理都是遍历线程池中的工作线程,然后逐个调用线程的interrupt
方法来中断线程,而无法响应中断的任务可能无法终止。shutdown()
和shutdownNow()
的区别:
shutdown()
只是关闭了提交通道,用submit()
是无效的,但是已经提交的任务得执行完毕。shutdownNow()
能立即停止线程池,会终止当前正在运行的任务,并停止等待执行的任务并返回正在等待执行的任务列表。只要调用了这两个方法的任意一个,isShutdown
方法都会返回true,当所有的任务都成功关闭后,才表示线程池关闭成功,这时调用isTerminated
方法会返回true。isShutdown()
和isTerminated()
的区别:
isShutDown
当调用shutdown()
方法后返回为true。isTerminated
当调用shutdown()
方法后并且所有提交的任务都完成后返回为true。通常调用shutdown
方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow
方法。
想要合理的配置线程池,可以从任务的性质、优先级、执行时长、依赖性几个角度考虑。任务的性质分为两种:
而优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,它可以让优先级高的任务先执行。注意,如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时长不同的任务也可以使用优先级队列,让执行时间短的任务先执行。具有依赖性的任务可能需要等待其他资源,等待时间越长,CPU空闲时间就越长,这种情况应该设置较大线程数,充分利用CPU。
工作队列建议使用有界队列,有界队列能增加系统的稳定性和预警能力。使用无界队列时,队列中堆积的大量任务可能会导致OOM
可以通过线程池提供的参数可以对线程池进行监控,可以使用ThreadPoolExecutor
中的以下方法进行监控:
getTaskCount()
:线程池需要执行的任务数量。getCompletedTaskCount()
:已完成的任务数量。getLargestPoolSize()
:线程池里曾经创建过的最大线程数量。getPoolSize()
:线程池的线程数量。getActiveCount()
:活动的线程数量。ThreadPoolExecutor
是可扩展的,通过扩展线程池进行监控。它提供了几个可以在子类中重写的方法:beforelExecute
、afterExecute
和terminated
,这些方法可以用于扩展ThreadPoolExecutor
的行为。
在执行任务的线程中将调用beforeExecute
和afterExecute
等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run
中正常返回,还是抛出一个异常而返回,afterExecute
都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute
。)如果beforeExecute
抛出一个RuntimeException,那么任务将不被执行,并且afterExecute
也不会被调用。
在线程池完成关闭操作时调用terminated
,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated
可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize
统计信息等操作。
创建线程池主要是ThreadPoolExecutor
类来完成,ThreadPoolExecutor
类中提供的四个构造方法,参数最多的这个构造方法是最通用的构造方法,其他三个构造方法都是在这个构造方法的基础上产生的,比较简单。
/**
* @param corePoolSize 线程池的核心线程数量
* @param maximumPoolSize 线程池的最大线程数量
* @param keepAliveTime 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
* @param unit keepAliveTime的时间单位
* @param workQueue 任务队列(工作队列),用来储存等待执行任务的队列
* @param threadFactory 线程工厂
* @param handler 饱和策略
* @throws 如果出现以下之一 抛出IllegalArgumentException :<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws 如果 {@code workQueue} 或 {@code threadFactory}
* 或 {@code handler} 为 null 抛出NullPointerException
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法的参数如下:
corePoolSize
:线程池的核心线程数量(核心线程池的大小,线程池的基本大小),当线程数小于核心线程数时,每提交一个任务就创建一个线程来执行,即使当前有线程处于空闲状态,直到当前线程数达到核心线程数。 只有在工作队列满了的情况下才会创建超出这个数量的线程。如果调用了prestartCoreThread()
或者prestartAllCoreThreads()
,线程池创建的时候所有的核心线程都会被创建并且启动。maximumPoolSize
:线程池的最大线程数量,如果工作队列已满并且当前线程池线程个数小于最大线程数,就会创建新的线程来执行任务。注意,如果工作队列使用无界队列,这个参数就没有什么效果。keepAliveTime
:空闲线程存活时间,如果某个线程的空闲时间超过了存活时间,将被标记为可回收,并且当线程池的当前线程数超过了核心线程数时,这个线程将被终止。unit
:keepAliveTime
参数的时间单位。workQueue
:用于保存等待执行的任务工作队列(阻塞队列,任务队列)。threadFactory
:线程工厂,可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。handler
:饱和策略,当工作队列已满并且线程数已达到最大线程数,说明当前线程池已处于饱和状态,如果再提交任务,则会触发饱和策略。在调用完ThreadPoolExecutor
的构造器后,仍然可以通过设置方法(Setter
)来修改大多数传递给它的构造器的参数,例如线程池的核心线程数、最大线程数、存活时间、线程工厂以及饱和策略(拒绝执行处理器,Rejected Execution Handler)。如果Executor是通过Executors
中的某个工厂方法创建的(newSingleThreadExecutor
除外),那么可以将结果的类型转换为ThreadPoolExecutor
以访问设置方法。
在Executors
中包含一个unconfigurableExecutorService
工厂方法,该方法对一个现有的ExecutorService
进行包装,使其只暴露出ExecutorService
的方法,因此不能对它进行配置。newSingleThreadExecutor
返回按这种方式封装的ExecutorService
,而不是最初的ThreadPoolExecutor
。
ThreadPoolExecutor
中基础属性和相关方法解释如下:
/**
* 主池控制状态ctl是包含两个概念字段的原子整数
* workerCount:有效线程数量(工作线程数量)
* runState:运行状态
* 为了将workerCount和runState用一个int表示, 我们将workerCount限制为 (2^29)-1而不是(2^31)-1
* 即用int的低29位用来表示workerCount,用int的高3位用来表示runState
* workerCount是已被允许启动但不允许停止的工作线程数,该值可能与活动线程的实际数量暂时不同
*
* runState提供主要的生命周期控制,并具有以下值:
* RUNNING:接受新任务并处理排队任务
* SHUTDOWN:不接受新任务,但处理排队任务
* STOP:不接受新任务,不处理排队任务,并中断正在进行的任务
* TIDYING:所有任务已经终止,workerCount为零,线程转换到状态TIDYING将运行terminate()钩子方法;
* TERMINATED:terminated()已经完成,该方法执行完毕代表线程池已经完全终止
*
* 运行状态之间并不是随意转换的,大多数状态都只能由固定的状态转换而来,转换关系:
* RUNNING -> SHUTDOWN:在调用shutdown()时,可能隐含在finalize()
* (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()
* SHUTDOWN -> TIDYING:当队列和线程池都为空时
* STOP -> TIDYING:当线程池为空时
* TIDYING -> TERMINATED:当terminate()方法完成时
*
* 当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回
* 检测从SHUTDOWN到TIDYING的转换并不像想像的那样简单,因为在SHUTDOWN状态期间,队列在非空之后可能变为空
* 反之亦然,但是只有在看到它为空之后才能看到workerCount为0(有时需要重新检查)
*/
// 初始化时的有效线程数为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 高3位表示运行状态,此值用于运行状态向左移动的位数,即29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 有效线程容量,低29位表示有效的线程数,00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高3位表示线程池状态
private static final int RUNNING = -1 << COUNT_BITS; //11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; //00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS; //00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS; //01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; //01100000 00000000 00000000 00000000
/**
* 获取运行状态,c位ctl值,~CAPACITY高3位为1,低29位为0,运算结果为ctl的高3位,即运行状态
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/**
* 获取有效线程数,c位ctl值,CAPACITY高3位为0,低29位为1,运算结果为ctl的低29位,即有效线程数
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 获取ctl值,高3位的运行状态和低29位的有效线程数进行或运算
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* 判断状态c是否小于s
*/
private static boolean runStateLessThan(int c, int s) { return c < s; }
/*
* 判断状态c是否大于等于s
*/
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
/*
* 判断状态c是否为RUNNING
*/
private static boolean isRunning(int c) { return c < SHUTDOWN; }
/**
* 使用CAS增加一个有效的线程
*/
private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }
/**
* 使用CAS减少一个有效的线程
*/
private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); }
/**
* 减少一个有效的线程
*/
private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
// 工作队列
private final BlockingQueue<Runnable> workQueue;
// 锁
private final ReentrantLock mainLock = new ReentrantLock();
// 包含线程池中的所有工作线程的Set,只有在持有mainLock的情况下才能访问
private final HashSet<Worker> workers = new HashSet<Worker>();
// 等待条件以支持awaitTermination
private final Condition termination = mainLock.newCondition();
// 跟踪线程池达到的最大大小,仅在mainLock下访问
private int largestPoolSize;
// 完成的任务总数,仅在终止工作线程时更新,仅在mainLock下访问。
private long completedTaskCount;
// 线程工厂,用于创建线程
private volatile ThreadFactory threadFactory;
// 饱和策略