并发编程工具 - Tomcat对ThreadPoolExecutor线程池的扩展
目录1、Tomcat定制自己的TaskThreadFactory2、Tomcat定制TaskQueue集成自LinkedBlockeingQueue3、Tomcat定制扩展ThreadPoolExecutor分析完juc的线程池源码并发编程工具 - ThreadPoolExecutor源码解析,理解了线程池执行的过程(即 execute(Runnable runnable)),再回顾一下线程池的执
目录
1、Tomcat定制自己的TaskThreadFactory
2、Tomcat定制TaskQueue集成自LinkedBlockeingQueue
3、Tomcat定制扩展ThreadPoolExecutor
分析完juc的线程池源码并发编程工具 - ThreadPoolExecutor源码解析,理解了线程池执行的过程(即 execute(Runnable runnable)),再回顾一下线程池的执行源代码:
当一个新的任务(Runnable)进入:
1、如果不足核心线程数,直接创建线程执行;
2、如果到达核心线程数,则往队列中添加任务成功(添加成功后会判断一些情况的检查)
3、执行拒绝策略
这样的执行过程,如果是针对CPU型或者混合型的任务是比较合理的,但是Tomcat线程池完全用于处理io任务,所以希望先不放入队列而是先创建线程执行任务,再放入队列中,最后再执行拒绝策略。那么,Tomcat则是在原来的代码上,对队列进行了扩展:
1)、到达核心线程数,调用队列的offer方法将Runnable添加到队列,只需要不成功就好。
2)、将执行拒绝策略时,抛出的异常进行补货,判断是否真的队列已满并且最大线程数【再真的抛异常】。
Tomcat默认创建线程池时使用了StandardThreadExecutor#startInternal方法,源码
protected boolean daemon = true;
protected String namePrefix = "tomcat-exec-";
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
所以Tomcat扩展如下:
1、Tomcat定制自己的TaskThreadFactory
Tomcat线程工厂没有太多扩展,只是可以定制自己的线程池名称前置(如上:"tomcat-exec-"),以及是否为常驻线程(默认为常驻线程,因为Tomcat服务器性质其实完全可以定义为常驻),默认优先级为5,具体源码:
public class TaskThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final boolean daemon;
private final int threadPriority;
public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
this.daemon = daemon;
this.threadPriority = priority;
}
}
2、Tomcat定制TaskQueue集成自LinkedBlockeingQueue
如上图所示,如果核心线程满了会调用队列的offer方法,将任务放入队列;但是TaskQueue重写了offer方法,并且将创建的ThreadPoolExecutor的引用,放入了自己的属性中(见上面创建的源代码taskqueue.setParent(executor);):
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
// 创建的线程池的引用
private transient volatile ThreadPoolExecutorImpl parent = null;
public TaskQueue() {
super();
}
// taskqueue.setParent(executor);
public void setParent(ThreadPoolExecutorImpl tp) {
parent = tp;
}
@Override
public boolean offer(Runnable o) {
// 一般parent肯定不为null,该情况是没有调用setParent方法
if (parent == null) return super.offer(o);
// 线程池的当前线程数已经是最大线程数
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
// 到此: 核心线程数(在外面判断的) < 当前线程数 < 最大线程数
// 如果提交的任务数小于当前线程数,则说明有空闲的线程,则添加到队列中也会被立刻获取走, 执行是在:Worker的runWorker方法中的getTask
if (parent.getSubmittedCount() <= (parent.getPoolSize())) return super.offer(o);
// 重点:当前线程数少于最大线程数,返回false,去创建新的Worker
if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;
// 默认,总是将任务添加到队列中
return super.offer(o);
}
}
1、检查如果没有传入ThreadPoolExecutor的引用,那么就还是直接放入队列(相当于与原生线程池的流程一致)。
2、检查如果当前已经是最大线程数了,就还是得放入队列中
3、如果提交的任务数小于当前线程数,则说明有空闲的线程,则添加到队列中也会被立刻获取走, 执行是在:Worker的runWorker方法中的getTask
4、重点:当前线程数少于最大线程数,返回false,去创建新的Worker
3、Tomcat定制扩展ThreadPoolExecutor
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
* This number is always greater or equal to {@link #getActiveCount()}.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
// 定义了很多构造大致都是直接调用父类,预热线程池
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
prestartAllCoreThreads();
}
// 定制了Worker#runWorker的后置回调
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedCount.decrementAndGet();
if (t == null) {
stopCurrentThreadIfNeeded();
}
}
}
1、名字就叫ThreadPoolExecutor并且直接继承自juc的ThreadPoolExecutor
2、自定义了一个AtomicInteger的技术器,计算真正的线程数。
3、提供了多个构造器,都是直接调用父类构造注入属性;并且直接预热核心线程池(因为Tomcat需要启动完成后快速响应请求)
4、定制了Worker#runWoker,即真正调用我们的Runnable任务后的后置回调函数(可能当前线程启动的时候,线程池已经执行了shutdown的方法,则当前线程池需要销毁)
5、核心:重写了父类的execute方法,进行扩展
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
public void execute(Runnable command, long timeout, TimeUnit unit) {
// 将自己的计算器叠加
submittedCount.incrementAndGet();
try { // 调用父类的模板流程
super.execute(command);
} catch (RejectedExecutionException rx) {
// 父类执行拒绝策略(肯定是到达了最大线程数)
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// 尝试着真正将任务添加到队列中,没有成功则说明队列也真的满了,该真的执行拒绝策略
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
1、将自己的计算器叠加
2、调用父类的模板方法执行
3、父类执行拒绝策略,说明已经到了核心线程数。那么获取父类的队列,尝试着将任务添加到队列中(这样就实现了先到创建最大线程,再放入队列中),如果队列也满了放不进去,再执行真正的拒绝策略。
更多推荐
所有评论(0)