目录

AtomicInteger ctl表示了线程池的生命周期

线程池状态和转换过程

execute(Runnable command)

添加任务,addWork(Runnable firstTask, boolean core)

Worker结构

执行任务,runWorker(Worker w)

销毁线程,processWorkerExit


    在(并发编程工具 - 线程池核心参数和工作原理)理解了ThreadPoolExecutor的层级结构、创建方式和工作原理之后,进入源码查询具体的实现细节。最主要是实现了顶层接口Executorexecute(Runnable command)。不过在分析之前先了解一下,内部怎么在多线程下保证当前线程数的加减,让我们自己实现也很自然的会想到AtomicInteger。只是其内部还定义了当前线程池对外的状态(是否核心线程已经创建完成,是否队列已经装满等),并且内部将两者用同一个遍历进行表示。

AtomicInteger ctl表示了线程池的生命周期

    即 AtomicInteger ctl = 线程池对外状态【3位】 + 当前线程数【29位】;

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

 

线程池状态和转换过程

    线程池定义了自己当前的状态及转换的条件,先从源码的类注释开始:

// 32-3 = 29位 (下面都会用到该数,并且位移29为[2的29次方=536870912])
private static final int COUNT_BITS = Integer.SIZE - 3; 
// 最大线程容量 1右移29位减去1(1乘以2的29次方减1) 即:CAPACITY = 1 * 2^29 -1 = 536870911
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池当前的允许状态或者叫生命周期  runState is stored in the high-order bits

// 1、运行中 即可以接受新任务也执行队列中的任务 -1 * 536870912 = -536870912
private static final int RUNNING    = -1 << COUNT_BITS;
// 2、关闭 不接受新任务但是执行队列中的任务 0 * 536870912 = 0
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 3、停止 不接受新任务也不执行队列中的任务,中断正在执行中的任务 1 * 536870912 = 536870912
private static final int STOP       =  1 << COUNT_BITS;
// 4、整理 所有任务都已经结束,并且线程数为0,即将调用rerminated方法(整理铺盖回家状态)
//         2 * 536870912 = 1073741824
private static final int TIDYING    =  2 << COUNT_BITS;
// 5、终止 terminated()方法执行完成 3 * 536870912 = 1610612736
private static final int TERMINATED =  3 << COUNT_BITS;

1、RUNNING【运行中】

    即可以接受新任务也执行队列中的任务 -1 * 536870912 = -536870912
2、SHUTDOWN【关闭】

    不接受新任务但是执行队列中的任务 0 * 536870912 = 0
3、STOP【停止】

    不接受新任务也不执行队列中的任务,中断正在执行中的任务 1 * 536870912 = 536870912
4、TIDYING【整理】

    所有任务都已经结束,并且线程数为0,即将调用rerminated方法(整理铺盖回家状态) 2 * 536870912 = 1073741824
private static final int    =  2 << COUNT_BITS;

5、TERMINATED【终止】

    terminated()方法执行完成 3 * 536870912 = 1610612736

    那么ctl中的数即表示了当前是线程池的状态创建的线程数,只需要取出ctl值,并且获取低3位和高29位,修改之后再使用 AtomicInteger的线程安全的原子性 CAS地保存即可。

 

execute(Runnable command)

public void execute(Runnable var1) {
	// 任务为null直接抛空指针
	if (var1 == null) {
		throw new NullPointerException();
	} else {
		/*
		 * Proceed in 3 steps:
		 *
		 * 1. If fewer than corePoolSize threads are running, try to
		 * start a new thread with the given command as its first
		 * task.  The call to addWorker atomically checks runState and
		 * workerCount, and so prevents false alarms that would add
		 * threads when it shouldn't, by returning false.
		 *
		 * 2. If a task can be successfully queued, then we still need
		 * to double-check whether we should have added a thread
		 * (because existing ones died since last checking) or that
		 * the pool shut down since entry into this method. So we
		 * recheck state and if necessary roll back the enqueuing if
		 * stopped, or start a new thread if there are none.
		 *
		 * 3. If we cannot queue task, then we try to add a new
		 * thread.  If it fails, we know we are shut down or saturated
		 * and so reject the task.
		 */
		int var2 = this.ctl.get();
		//  当前线程数的个数小于核心线程数
		if (workerCountOf(var2) < this.corePoolSize) {
			// 添加一个Worker(将Runnable回调函数加入),添加成功直接返回
			if (this.addWorker(var1, true)) {
				return;
			}
			// 没有添加成功则再获取当前上下文值(状态[3位] + 线程数[29位])
			var2 = this.ctl.get();
		}
		// 判断核心线程已满,并且往队列中添加成功
		if (isRunning(var2) && this.workQueue.offer(var1)) {
			int var3 = this.ctl.get();
			// 任务成功添加到队列以后,再次检查是否需要添加新的线程,因为已存在的线程可能被销毁了
			if (!isRunning(var3) && this.remove(var1)) {
				// 如果线程池处于非运行状态,并且把当前的任务从任务队列中移除成功,则拒绝该任务
				this.reject(var1);
				// 如果之前的线程已被销毁完,新建一个线程
			} else if (workerCountOf(var3) == 0) {
				this.addWorker((Runnable)null, false);
			}
			// 在试着创建一个新的线程添加,如果不成功再执行拒绝策略
		} else if (!this.addWorker(var1, false)) {
			// 执行拒绝策略
			this.reject(var1);
		}
	}
}

    在理解完上一篇线程池工作原理图之后,再看execute方法就比较简单了,只是内部的细节需要addWork再分析一下,再贴一下工作原理图与该方法进行对比:

 

    addWorker是将我们写的Runnable任务添加到线程池中,而真正的执行是调用的Worker的runWork方法

添加任务,addWork(Runnable firstTask, boolean core)

/**
 *  返回是否添加任务成功
 * @param firstTask 我们自己写的Runnable回调任务
 * @param core ture表示只使用核心线程执行,否则使用最大线程(即不够可以创建)
 * @return 是否将任务添加成功,执行则是交给 New的Worker的runWorker方法
 */
private boolean addWorker(Runnable firstTask, boolean core) {
	retry: // 设置标志位
	for (;;) { // 自旋
		// 获取ctl值,并判断当前的线程池状态
		int c = ctl.get();
		int rs = runStateOf(c);
		/*
		 * 线程池状态 >= SHUTDOWN, 即非RUNNING状态 并且非下面的状态
		 *   状态不是SHUTDOWN 并且 Worker的第一个任务为空 并且 工作队列不为空
		 * 1、线程池已经SHUTDOWN后,就不能添加任务了
		 * 2、SHUTDOWN状态可以执行队列中的任务(所以:传入的新任务为空,队列不为空)
		 */
		if (rs >= SHUTDOWN &&
				! (rs == SHUTDOWN &&
						firstTask == null &&
						! workQueue.isEmpty()))
			return false;
		// 再自旋
		for (;;) {
			// 任务会封装成Worker,获取工作线程数
			int wc = workerCountOf(c);
			// 线程池数超过最大值536870911 或者 当前使用核心线程还是最大线程执行任务
			if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
				return false;
			// 原子叠加工作线程数,自旋次数都不成功则退回标志点再执行(因为这时可能线程池状态和工作线程数发生变化了)
			if (compareAndIncrementWorkerCount(c))
				break retry;
			c = ctl.get();
			// 获取最新的ctl值, cas检查是否不等于刚开始自旋时获取的值,否则还是调回标记点,再来
			if (runStateOf(c) != rs)
				continue retry;
			// else CAS failed due to workerCount change; retry inner loop
		}
	}

	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		// 创建Worker,本身是一个 AQS
		w = new Worker(firstTask);
		final Thread t = w.thread;
		if (t != null) {
			// 加互斥锁
			final ReentrantLock mainLock = this.mainLock;
			mainLock.lock();
			try {
				// Recheck while holding lock.
				// Back out on ThreadFactory failure or if
				// shut down before lock acquired.
				int rs = runStateOf(ctl.get());

				if (rs < SHUTDOWN ||
						(rs == SHUTDOWN && firstTask == null)) {
					if (t.isAlive()) // precheck that t is startable
						throw new IllegalThreadStateException();
					// 添加任务到队列中,使用的HashMap,不能保证线程安全,所以都依赖Worker本身是AQS
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					workerAdded = true;
				}
			} finally {
				mainLock.unlock();
			}
			if (workerAdded) {
                // Worker本身的线程的start方法,那么后续就等操作系统调用其run方法,即Worker的runWorker方法,整个循环就运转起来了
				t.start();
				workerStarted = true;
			}
		}
	} finally {
		if (! workerStarted)
			// 最后检查是否因为异常,导致没有添加成功,如果是则肯定需要执行回退操作 1、线程数原子的减去一 2、workers中移除任务等
			addWorkerFailed(w);
	}
	return workerStarted;
}

    细节见代码中的注释,整体来说就是获取当前的线程池状态和线程数(判断了各种情况以及回归检查,大师考虑东西就是不一样,哈哈),就做了两件事:

1、将线程数原子的加一

2、将Runnable回调函数封装成Worker(一个AQS)

    但是主要的是调用了创建的线程的start方法,那么就值等操作系统调用其run方法,即调用Worker的runWorker方法,将整体循环体系运转起来。即如果上面的execute控制着整个线程池的生命周期,那么Worker的runWorker就控制着线程池里每个线程的生命周期。

Worker结构

// Worker本身即是一个AQS(这里使用的独占锁),又是一个Runnable(启动后任务随操作系统调度)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

	/** 线程工厂创建的线程 */
	final Thread thread;
	/** 存放addWorker方法添加的 Runnable,即线程池本身要创建线程时的我们自己写的Runnable任务 */
	Runnable firstTask;
	/** 当前线程执行了多少任务,用于统计 */
	volatile long completedTasks;
	
	Worker(Runnable firstTask) {
		setState(-1); // 设置AQS的状态
		this.firstTask = firstTask; // addWorker时传入的我们写的Runnable
		this.thread = getThreadFactory().newThread(this); // 线程工厂创建一个线程
	}

	/**
	 * Delegates main run loop to outer runWorker
	 * 所有一切就绪,就等着操作系统切中当前线程,再看整个 loop怎么运转起来
	 */
	public void run() {
		runWorker(this);
	}
}

 

执行任务,runWorker(Worker w)

    Worker本身是一个Runnable,并且在上面已经看到调用了其属下Thread的start方法,一切就交给了操作系统调度,触发:

final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
	Runnable task = w.firstTask;
	w.firstTask = null;
	w.unlock(); // allow interrupts
	boolean completedAbruptly = true;
	try {
		// 第一个任务处理完成,再从任务队列中获取,一直到队列中都执行完成
		while (task != null || (task = getTask()) != null) {
			w.lock();
			// 每一个循环体都需要检查线程是否被中断,以及任务数,即处理最后的processWorkerExit,这里也允许调用中断方法
			if ((runStateAtLeast(ctl.get(), STOP) ||
					(Thread.interrupted() &&
							runStateAtLeast(ctl.get(), STOP))) &&
					!wt.isInterrupted())
				wt.interrupt();
			try {
				// 省略部分代码,查看我们的每个传入的Runnable执行前后,允许我们添加回调
				// 前置回调
				beforeExecute(wt, task);
				try {
					task.run(); // 执行我们传入的真正任务
				} finally { // 后置回调
					afterExecute(task, thrown);
				}
			} finally {
				task = null;
				// 当前线程处理的任务数叠加
				w.completedTasks++;
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		// 当第一个任务处理完,并且队列中的任务也没有了,那么需要销毁当下线程本身
		processWorkerExit(w, completedAbruptly);
	}
}

销毁线程,processWorkerExit

    该方法发送在上面的while执行后,firstTask执行完,并且从队列中获取的所以认为都执行完成,那么再调用该方法:

/**
 * 销毁方法
 * @param w 需要销毁的Worker
 * @param completedAbruptly false:执行完成了队列中的所有任务
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
	if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
		decrementWorkerCount();
	// 加锁
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try { // 将需要销毁的线程的总执行任务数,添加到线程池总执行数中,并加其从队列中移除
		completedTaskCount += w.completedTasks;
		workers.remove(w);
	} finally {
		mainLock.unlock();
	}

	tryTerminate();

	int c = ctl.get();
	// 如果执行的线程数太少,那么需要再新创建一个线程,调用addWorker方法,整个循环就完成了
	if (runStateLessThan(c, STOP)) {
		if (!completedAbruptly) {
			int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
			if (min == 0 && ! workQueue.isEmpty())
				min = 1;
			if (workerCountOf(c) >= min)
				return; // replacement not needed
		}
		addWorker(null, false);
	}
}

总结

1、execute中体现了线程池的工作原理,要不将任务添加到了队列中,要不调用了addWorker添加任务

2、Worker本身即是一个AQS又是一个Runnable, 调用addWorker则使用线程工厂创建了一个线程,指向属性thread,并且将addWorker时传入的我们写的Runnable放入了属性firstTask中

3、Worker本身是一个Runnable,当CPU时间切片切中时,则会运行其run方法,就会运行内部的runWorker方法。那么先执行addWorker时添加的任务,完了再从队列中获取。

4、执行完所有的队列任务,那么就需要销毁自己本身(processWorkerExit)

这样,整个流程就运转起来了......

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

肤水

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐