目录

1、Tomcat定制自己的TaskThreadFactory

2、Tomcat定制TaskQueue集成自LinkedBlockeingQueue

3、Tomcat定制扩展ThreadPoolExecutor


    分析完juc的线程池源码并发编程工具 - ThreadPoolExecutor源码解析,理解了线程池执行的过程(即 execute(Runnable runnable)),再回顾一下线程池的执行源代码:

当一个新的任务(Runnable)进入:

1、如果不足核心线程数,直接创建线程执行;

2、如果到达核心线程数,则往队列中添加任务成功(添加成功后会判断一些情况的检查)

3、执行拒绝策略

这样的执行过程,如果是针对CPU型或者混合型的任务是比较合理的,但是Tomcat线程池完全用于处理io任务,所以希望先不放入队列而是先创建线程执行任务,再放入队列中,最后再执行拒绝策略。那么,Tomcat则是在原来的代码上,对队列进行了扩展:

1)、到达核心线程数,调用队列的offer方法将Runnable添加到队列,只需要不成功就好。

2)、将执行拒绝策略时,抛出的异常进行补货,判断是否真的队列已满并且最大线程数【再真的抛异常】。

Tomcat默认创建线程池时使用了StandardThreadExecutor#startInternal方法,源码

protected boolean daemon = true;
protected String namePrefix = "tomcat-exec-";

protected void startInternal() throws LifecycleException {
	taskqueue = new TaskQueue(maxQueueSize);
	TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
	executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
	executor.setThreadRenewalDelay(threadRenewalDelay);
	if (prestartminSpareThreads) {
		executor.prestartAllCoreThreads();
	}
	taskqueue.setParent(executor);

	setState(LifecycleState.STARTING);
}

所以Tomcat扩展如下:

1、Tomcat定制自己的TaskThreadFactory

    Tomcat线程工厂没有太多扩展,只是可以定制自己的线程池名称前置(如上:"tomcat-exec-"),以及是否为常驻线程(默认为常驻线程,因为Tomcat服务器性质其实完全可以定义为常驻),默认优先级为5,具体源码:

public class TaskThreadFactory implements ThreadFactory {

    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final boolean daemon;
    private final int threadPriority;

    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.threadPriority = priority;
    }
}

2、Tomcat定制TaskQueue集成自LinkedBlockeingQueue

如上图所示,如果核心线程满了会调用队列的offer方法,将任务放入队列;但是TaskQueue重写了offer方法,并且将创建的ThreadPoolExecutor的引用,放入了自己的属性中(见上面创建的源代码taskqueue.setParent(executor);):

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
	// 创建的线程池的引用
	private transient volatile ThreadPoolExecutorImpl parent = null;
	public TaskQueue() {
		super();
	}
	// taskqueue.setParent(executor);
	public void setParent(ThreadPoolExecutorImpl tp) {
		parent = tp;
	}

	@Override
	public boolean offer(Runnable o) {
		// 一般parent肯定不为null,该情况是没有调用setParent方法
		if (parent == null) return super.offer(o);
		// 线程池的当前线程数已经是最大线程数
		if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);

		// 到此: 核心线程数(在外面判断的) < 当前线程数 < 最大线程数
		
		// 如果提交的任务数小于当前线程数,则说明有空闲的线程,则添加到队列中也会被立刻获取走, 执行是在:Worker的runWorker方法中的getTask
		if (parent.getSubmittedCount() <= (parent.getPoolSize())) return super.offer(o);
		// 重点:当前线程数少于最大线程数,返回false,去创建新的Worker
		if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;

		// 默认,总是将任务添加到队列中
		return super.offer(o);
	}
}

    1、检查如果没有传入ThreadPoolExecutor的引用,那么就还是直接放入队列(相当于与原生线程池的流程一致)。

    2、检查如果当前已经是最大线程数了,就还是得放入队列中

    3、如果提交的任务数小于当前线程数,则说明有空闲的线程,则添加到队列中也会被立刻获取走, 执行是在:Worker的runWorker方法中的getTask

    4、重点:当前线程数少于最大线程数,返回false,去创建新的Worker

 

3、Tomcat定制扩展ThreadPoolExecutor

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
	/**
	 * The number of tasks submitted but not yet finished. This includes tasks
	 * in the queue and tasks that have been handed to a worker thread but the
	 * latter did not start executing the task yet.
	 * This number is always greater or equal to {@link #getActiveCount()}.
	 */
	private final AtomicInteger submittedCount = new AtomicInteger(0);
	private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
	private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);

	// 定义了很多构造大致都是直接调用父类,预热线程池
	public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
		prestartAllCoreThreads();
	}

	// 定制了Worker#runWorker的后置回调
	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		submittedCount.decrementAndGet();
		if (t == null) {
			stopCurrentThreadIfNeeded();
		}
	}
}

    1、名字就叫ThreadPoolExecutor并且直接继承自juc的ThreadPoolExecutor

    2、自定义了一个AtomicInteger的技术器,计算真正的线程数。

    3、提供了多个构造器,都是直接调用父类构造注入属性;并且直接预热核心线程池(因为Tomcat需要启动完成后快速响应请求)

    4、定制了Worker#runWoker,即真正调用我们的Runnable任务后的后置回调函数(可能当前线程启动的时候,线程池已经执行了shutdown的方法,则当前线程池需要销毁)

    5、核心:重写了父类的execute方法,进行扩展

public void execute(Runnable command) {
    execute(command,0,TimeUnit.MILLISECONDS);
}

public void execute(Runnable command, long timeout, TimeUnit unit) {
    // 将自己的计算器叠加
	submittedCount.incrementAndGet();
	try { // 调用父类的模板流程
		super.execute(command);
	} catch (RejectedExecutionException rx) {
        // 父类执行拒绝策略(肯定是到达了最大线程数) 
		if (super.getQueue() instanceof TaskQueue) {
			final TaskQueue queue = (TaskQueue)super.getQueue();
			try {
                // 尝试着真正将任务添加到队列中,没有成功则说明队列也真的满了,该真的执行拒绝策略
				if (!queue.force(command, timeout, unit)) {
					submittedCount.decrementAndGet();
					throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
				}
			} catch (InterruptedException x) {
				submittedCount.decrementAndGet();
				throw new RejectedExecutionException(x);
			}
		} else {
			submittedCount.decrementAndGet();
			throw rx;
		}

	}
}

    1、将自己的计算器叠加

    2、调用父类的模板方法执行

    3、父类执行拒绝策略,说明已经到了核心线程数。那么获取父类的队列,尝试着将任务添加到队列中(这样就实现了先到创建最大线程,再放入队列中),如果队列也满了放不进去,再执行真正的拒绝策略。

 

 

 

 

 

 

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐