并发编程工具 - ThreadPoolExecutor源码解析
目录AtomicIntegerctl表示了线程池的生命周期线程池状态和转换过程execute(Runnable command)添加任务,addWork(Runnable firstTask, boolean core)Worker结构执行任务,runWorker(Worker w)销毁线程,processWorkerExit在(并发编程工具 - 线程池核心参数和工作原理)理解了ThreadPoo
目录
添加任务,addWork(Runnable firstTask, boolean core)
在(并发编程工具 - 线程池核心参数和工作原理)理解了ThreadPoolExecutor的层级结构、创建方式和工作原理之后,进入源码查询具体的实现细节。最主要是实现了顶层接口Executor的execute(Runnable command)。不过在分析之前先了解一下,内部怎么在多线程下保证当前线程数的加减,让我们自己实现也很自然的会想到AtomicInteger。只是其内部还定义了当前线程池对外的状态(是否核心线程已经创建完成,是否队列已经装满等),并且内部将两者用同一个遍历进行表示。
AtomicInteger ctl表示了线程池的生命周期
即 AtomicInteger ctl = 线程池对外状态【3位】 + 当前线程数【29位】;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
线程池状态和转换过程
线程池定义了自己当前的状态及转换的条件,先从源码的类注释开始:
// 32-3 = 29位 (下面都会用到该数,并且位移29为[2的29次方=536870912])
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大线程容量 1右移29位减去1(1乘以2的29次方减1) 即:CAPACITY = 1 * 2^29 -1 = 536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池当前的允许状态或者叫生命周期 runState is stored in the high-order bits
// 1、运行中 即可以接受新任务也执行队列中的任务 -1 * 536870912 = -536870912
private static final int RUNNING = -1 << COUNT_BITS;
// 2、关闭 不接受新任务但是执行队列中的任务 0 * 536870912 = 0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 3、停止 不接受新任务也不执行队列中的任务,中断正在执行中的任务 1 * 536870912 = 536870912
private static final int STOP = 1 << COUNT_BITS;
// 4、整理 所有任务都已经结束,并且线程数为0,即将调用rerminated方法(整理铺盖回家状态)
// 2 * 536870912 = 1073741824
private static final int TIDYING = 2 << COUNT_BITS;
// 5、终止 terminated()方法执行完成 3 * 536870912 = 1610612736
private static final int TERMINATED = 3 << COUNT_BITS;
1、RUNNING【运行中】
即可以接受新任务也执行队列中的任务 -1 * 536870912 = -536870912
2、SHUTDOWN【关闭】
不接受新任务但是执行队列中的任务 0 * 536870912 = 0
3、STOP【停止】
不接受新任务也不执行队列中的任务,中断正在执行中的任务 1 * 536870912 = 536870912
4、TIDYING【整理】
所有任务都已经结束,并且线程数为0,即将调用rerminated方法(整理铺盖回家状态) 2 * 536870912 = 1073741824
private static final int = 2 << COUNT_BITS;
5、TERMINATED【终止】
terminated()方法执行完成 3 * 536870912 = 1610612736
那么ctl中的数即表示了当前是线程池的状态和创建的线程数,只需要取出ctl值,并且获取低3位和高29位,修改之后再使用 AtomicInteger的线程安全的原子性 CAS地保存即可。
execute(Runnable command)
public void execute(Runnable var1) {
// 任务为null直接抛空指针
if (var1 == null) {
throw new NullPointerException();
} else {
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int var2 = this.ctl.get();
// 当前线程数的个数小于核心线程数
if (workerCountOf(var2) < this.corePoolSize) {
// 添加一个Worker(将Runnable回调函数加入),添加成功直接返回
if (this.addWorker(var1, true)) {
return;
}
// 没有添加成功则再获取当前上下文值(状态[3位] + 线程数[29位])
var2 = this.ctl.get();
}
// 判断核心线程已满,并且往队列中添加成功
if (isRunning(var2) && this.workQueue.offer(var1)) {
int var3 = this.ctl.get();
// 任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了
if (!isRunning(var3) && this.remove(var1)) {
// 如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务
this.reject(var1);
// 如果之前的线程已被销毁完,新建一个线程
} else if (workerCountOf(var3) == 0) {
this.addWorker((Runnable)null, false);
}
// 在试着创建一个新的线程添加,如果不成功再执行拒绝策略
} else if (!this.addWorker(var1, false)) {
// 执行拒绝策略
this.reject(var1);
}
}
}
在理解完上一篇线程池工作原理图之后,再看execute方法就比较简单了,只是内部的细节需要addWork再分析一下,再贴一下工作原理图与该方法进行对比:
addWorker是将我们写的Runnable任务添加到线程池中,而真正的执行是调用的Worker的runWork方法
添加任务,addWork(Runnable firstTask, boolean core)
/**
* 返回是否添加任务成功
* @param firstTask 我们自己写的Runnable回调任务
* @param core ture表示只使用核心线程执行,否则使用最大线程(即不够可以创建)
* @return 是否将任务添加成功,执行则是交给 New的Worker的runWorker方法
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 设置标志位
for (;;) { // 自旋
// 获取ctl值,并判断当前的线程池状态
int c = ctl.get();
int rs = runStateOf(c);
/*
* 线程池状态 >= SHUTDOWN, 即非RUNNING状态 并且非下面的状态
* 状态不是SHUTDOWN 并且 Worker的第一个任务为空 并且 工作队列不为空
* 1、线程池已经SHUTDOWN后,就不能添加任务了
* 2、SHUTDOWN状态可以执行队列中的任务(所以:传入的新任务为空,队列不为空)
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 再自旋
for (;;) {
// 任务会封装成Worker,获取工作线程数
int wc = workerCountOf(c);
// 线程池数超过最大值536870911 或者 当前使用核心线程还是最大线程执行任务
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 原子叠加工作线程数,自旋次数都不成功则退回标志点再执行(因为这时可能线程池状态和工作线程数发生变化了)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 获取最新的ctl值, cas检查是否不等于刚开始自旋时获取的值,否则还是调回标记点,再来
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建Worker,本身是一个 AQS
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加互斥锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加任务到队列中,使用的HashMap,不能保证线程安全,所以都依赖Worker本身是AQS
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// Worker本身的线程的start方法,那么后续就等操作系统调用其run方法,即Worker的runWorker方法,整个循环就运转起来了
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 最后检查是否因为异常,导致没有添加成功,如果是则肯定需要执行回退操作 1、线程数原子的减去一 2、workers中移除任务等
addWorkerFailed(w);
}
return workerStarted;
}
细节见代码中的注释,整体来说就是获取当前的线程池状态和线程数(判断了各种情况以及回归检查,大师考虑东西就是不一样,哈哈),就做了两件事:
1、将线程数原子的加一
2、将Runnable回调函数封装成Worker(一个AQS)
但是主要的是调用了创建的线程的start方法,那么就值等操作系统调用其run方法,即调用Worker的runWorker方法,将整体循环体系运转起来。即如果上面的execute控制着整个线程池的生命周期,那么Worker的runWorker就控制着线程池里每个线程的生命周期。
Worker结构
// Worker本身即是一个AQS(这里使用的独占锁),又是一个Runnable(启动后任务随操作系统调度)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** 线程工厂创建的线程 */
final Thread thread;
/** 存放addWorker方法添加的 Runnable,即线程池本身要创建线程时的我们自己写的Runnable任务 */
Runnable firstTask;
/** 当前线程执行了多少任务,用于统计 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 设置AQS的状态
this.firstTask = firstTask; // addWorker时传入的我们写的Runnable
this.thread = getThreadFactory().newThread(this); // 线程工厂创建一个线程
}
/**
* Delegates main run loop to outer runWorker
* 所有一切就绪,就等着操作系统切中当前线程,再看整个 loop怎么运转起来
*/
public void run() {
runWorker(this);
}
}
执行任务,runWorker(Worker w)
Worker本身是一个Runnable,并且在上面已经看到调用了其属下Thread的start方法,一切就交给了操作系统调度,触发:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 第一个任务处理完成,再从任务队列中获取,一直到队列中都执行完成
while (task != null || (task = getTask()) != null) {
w.lock();
// 每一个循环体都需要检查线程是否被中断,以及任务数,即处理最后的processWorkerExit,这里也允许调用中断方法
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 省略部分代码,查看我们的每个传入的Runnable执行前后,允许我们添加回调
// 前置回调
beforeExecute(wt, task);
try {
task.run(); // 执行我们传入的真正任务
} finally { // 后置回调
afterExecute(task, thrown);
}
} finally {
task = null;
// 当前线程处理的任务数叠加
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 当第一个任务处理完,并且队列中的任务也没有了,那么需要销毁当下线程本身
processWorkerExit(w, completedAbruptly);
}
}
销毁线程,processWorkerExit
该方法发送在上面的while执行后,firstTask执行完,并且从队列中获取的所以认为都执行完成,那么再调用该方法:
/**
* 销毁方法
* @param w 需要销毁的Worker
* @param completedAbruptly false:执行完成了队列中的所有任务
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try { // 将需要销毁的线程的总执行任务数,添加到线程池总执行数中,并加其从队列中移除
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
// 如果执行的线程数太少,那么需要再新创建一个线程,调用addWorker方法,整个循环就完成了
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
总结
1、execute中体现了线程池的工作原理,要不将任务添加到了队列中,要不调用了addWorker添加任务
2、Worker本身即是一个AQS又是一个Runnable, 调用addWorker则使用线程工厂创建了一个线程,指向属性thread,并且将addWorker时传入的我们写的Runnable放入了属性firstTask中
3、Worker本身是一个Runnable,当CPU时间切片切中时,则会运行其run方法,就会运行内部的runWorker方法。那么先执行addWorker时添加的任务,完了再从队列中获取。
4、执行完所有的队列任务,那么就需要销毁自己本身(processWorkerExit)
这样,整个流程就运转起来了......
肤水
更多推荐
所有评论(0)