并发编程工具 - 线程池核心参数和工作原理
目录1、线程池的结构2、线程池的创建方式3、核心参数4、工作原理线程池绝对的面试高频,确实因为多线程是解决并发问题特别是提升某些核心项目接口的利器,但是使用不好也存在大量的问题,那么搞清楚线程池的工作原理尤为重要。之前接触线程池基本都存在于面试阶段,但是当第一次在项目上看别人使用线程池解决并行任务时特别的震惊,项目上一个接口并行了17个子任务。现在已经基本在项目上都会使用线程池来解决核心问题,理解
目录
线程池绝对的面试高频,确实因为多线程是解决并发问题特别是提升某些核心项目接口的利器,但是使用不好也存在大量的问题,那么搞清楚线程池的工作原理尤为重要。之前接触线程池基本都存在于面试阶段,但是当第一次在项目上看别人使用线程池解决并行任务时特别的震惊,项目上一个接口并行了17个子任务。现在已经基本在项目上都会使用线程池来解决核心问题,理解也比较深了才敢写这方面的博客【当然,理解线程池的原理就更为重要了,否则就是像在裸奔】。
首先我们知道Java的线程是与操作系统的线程一一映射,并且创建和销毁线程的代价非常大,那么线程复用就是基本思想,但是需要针对不同的执行任务(有的非常耗时,有的在短时间内需要处理大量的请求,波峰过后若线程还一直存在那么也是在消耗资源)。
1、线程池的结构
说到线程池首先想到的就是Executor的子类ThreadPoolExecutor,在创建之前我们先看看继承体系结构和每一层定义的接口方法,先看父类:
1)、顶层接口是Executor
public interface Executor {
void execute(Runnable var1);
}
2)、接口ExecutorService
public interface ExecutorService extends Executor {
// 1、线程关闭相关方法,这我们在两阶段终止模式中分析过
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
// 2、提交单个任务,有Runnable、Callable的任务
<T> Future<T> submit(Callable<T> var1);
<T> Future<T> submit(Runnable var1, T var2);
Future<?> submit(Runnable var1);
// 3、批量执行任务,等待最晚的任务执行完成返回(可以设置超时时间),我在项目上使用非常多,当然该场景还可以使用
// CountDownLatch协调,或者使用CompletionService执行,后续使用项目代码对比
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
// 4、批量执行任务,只要一个得到任务即可返回(这种主要用于获取相同的任务,拿到最快的一个即返回),
// 坏处浪费资源,好处就是防止最慢的任务拖长性能耗时
<T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}
3)、抽象类AbstractExecutorService
完成了对大部分上层接口的实现,如常用的submit、invokeAll等。
继承的常用子类:
1)、ScheduledThreadPoolExecutor
同样是juc包下面的类,看名字就知道是 定时任务 + 线程池
2)、ThreadPoolTeskExecutor
这个主要是Spring的异步编程机制,@Async的时候会用到,可以查看 Spring源码-@Async原理分析
3)、Tomcat的ThreadPoolExecutor
名字是一样的,但是继承自juc的ThreadPoolExecutor,主要在与官方的线程池的工作原理,在解决io型(Tomcat基本都是io型任务)时不是很适用,所以Tomcat做了一定的修改,后面专门分析。
2、线程池的创建方式
juc官方提供了Executors类的快速创建线程池的工具,但是底层也是使用了ThreadPoolExecutor的全参构造器,屏蔽了底层的实现,但是是非常可怕的。所以阿里开发规范明确规定了不能使用Executors进行创建线程池。
1)、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public LinkedBlockingQueue() {
this(2147483647);
}
核心线程和最大线程数一样,所以超时时间和超时单位没有意义了,但是使用了LinkedBlockingQueue无界队列无参构造,当任务非常多时根本没有执行拒绝策略的时候,非常危险。
2)、newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
核心线程和最大线程一样,都是1,那么同样超时时间和超时单位就没有意义了,同样使用了无界队列存放任务,问题同样如上。
3)、newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new
SynchronousQueue());
}
队列是本身没什么问题,核心线程为0,超时时间和单位都正常。但是最大线程数为Integer.MAX_VALUE,当任务非常多时创建1万个线程服务器早就崩了(Java线程与内核线程一一对应)。
4)、newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
public ScheduledThreadPoolExecutor(int var1) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new
ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
最大线程数也是Integer.MAX_VALUE,与上面存在相同相同的风险(线程创建过多,导致服务崩溃)。
所以,Executors创建的线程池,要不使用的队列太长要不可能创建的线程数太大,存下相当的风险。在项目上使用时最好直接使用ThreadPoolExecutor的全参数构造创建,自己心中有数,如下需要懂得核心参数和工作原理。
3、核心参数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
this.mainLock = new ReentrantLock();
this.workers = new HashSet();
this.termination = this.mainLock.newCondition();
if (var1 >= 0 && var2 > 0 && var2 >= var1 && var3 >= 0L) {
if (var6 != null && var7 != null && var8 != null) {
this.corePoolSize = var1;
this.maximumPoolSize = var2;
this.workQueue = var6;
this.keepAliveTime = var5.toNanos(var3);
this.threadFactory = var7;
this.handler = var8;
} else {
throw new NullPointerException();
}
} else {
throw new IllegalArgumentException();
}
}
1)、核心线程
核心线程数可以理解为常驻线程,当任务来了之后马上就可以响应的快速反应部队,但是核心线程数也不能过多因为没有任务时也在占用资源。当然我们可以调用 allowCoreThreadTimeOut 方法允许核心线程在超时时间(超时单位)后进行销毁。当线程池创建后默认其中是没有创建线程池的,但是我们可以调用prestartAllCoreThreads或 prestartCoreThread 方法来预热核心线程,而并不是等到接到第一个任务后才创建核心线程,对于抢购等性质的系统比较适用。
2)、最大线程
最大线程数很明细就是最多能创建的线程数,最大线程数字包含了核心线程数字。最大线程数的销毁跟超时时间和单位本身有关,但是最大线程数是在任务队列装满溢出的情况下才会创建,我有一年多时间理解的都是超过核心线程数就会创建。所以,这种机制并不适合纯io型任务,所以Tomcat的线程池就是基于juc线程池改造,超过核心线程就创建,超过最大线程才放入队列,队列满了才执行拒绝策略。
3)、超时时间和超时单位
那么在核心线程到最大线程数的线程,超时时间配合超时单位之后,就进行销毁。如上面说的,如果设置了allowCoreThreadTimeOut则核心线程也可以根据该超时规则销毁。
4)、工作队列
工作队列是存放来不及处理的Callable或者Runnable回调任务,大多数情况下我们使用LinkedBlockingQueue是AQS的子类。队列本身也比较重要,当线程执行完任务后会到队列中获取,那么队列是否先进先出,性能等。这与任务本身的性质有关,如果不怕任务等太久或者饥饿,那么可以不使用先进先出;如果队列的任务都是瞬时任务量特别高,但是任务执行时间会非常短,那么可以把队列的长度设置的大一点。否则,队列放不下,而执行拒绝策略本身也是对应用的保护,对调用者的快速失败,也未必不好。
5)、线程创建工厂
当需要创建新的线程时,则使用传入的工厂直接创建,如果我们没有传入,则会使用默认线程池,Executors.defaultThreadFactory();线程组使用了System或者当前线程池的,使用AtomicInteger来为创建的每个线程的名称后面加一个数字,但是创建的线程的前缀使用类似“pool-3-thread-1”这样的字样。但是我们在线上使用时,特别是在日志中查找问题时,给线程起一个有意义的名字是非常重要的。所以一般在项目上使用时,最好使用自己的 能创建具有业务意义的线程的线程工厂。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager var1 = System.getSecurityManager();
this.group = var1 != null ? var1.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
}
6)、拒绝策略
juc提供了四种拒绝策略,默认为AbortPolicy;
1)、AbortPolicy【直接抛出异常给调用者处理】
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable var1, ThreadPoolExecutor var2) {
throw new RejectedExecutionException("Task " + var1.toString() + " rejected from " + var2.toString());
}
}
2)、CallerRunsPolicy【让调用者自己执行】
之前的时候我一直认为返回给调用者自己执行,那么就不会丢弃认为,后面才意识到如果再交给调用者,并且调用者本身认为就比较重那么更加容易出问题。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() {}
public void rejectedExecution(Runnable var1, ThreadPoolExecutor var2) {
if (!var2.isShutdown()) {
var1.run();
}
}
}
3)、DiscardOldestPolicy【丢弃最老的任务】
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() {}
public void rejectedExecution(Runnable var1, ThreadPoolExecutor var2) {
if (!var2.isShutdown()) {
var2.getQueue().poll();
var2.execute(var1);
}
}
}
4)、DiscardPolicy【丢人当前任务】
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() {}
// 丢弃当前任务就是方法体为空,什么都不做
public void rejectedExecution(Runnable var1, ThreadPoolExecutor var2) {}
}
4、工作原理
我有在项目上看见过,要执行一个并行方法时,创建线程池方法执行完就销毁的,那线程池的工作原理基本没有意义了。所以,线程池正常情况下是与项目的生命周期一致的启动如Tomcat时(或推迟到真的有任务调用时)创建,Tomcat等服务销毁时销毁。工作原理图:
当一个新的任务进入时
1、如果线程数还没有到达核心线程数,则直接创建核心线程;如果预热了核心线程,或者有空闲的线程则直接执行
2、当核心线程数都满了并且正在工作,那么就将任务放入队列中【当下并不是直接创建最大线程数内的线程】,若创建的线程空闲时直接去消费核心线程中的任务
3、当队列也装满时,查看是否为只创建了核心线程数,是则创建核心线程到最大线程的线程执行任务【存在一种情况,可能上一个任务来了之后都还存放在队列中执行,这个任务发现队列满了,则创建(核心到最大线程数)线程执行任务,所以即使是先进先出队列,也不是先来的任务先执行】;否则直接执行拒绝策略。
先放入队列中,满了再创建到最大线程数的线程,对于CPU型、或者混合型的任务没有问题,但是如果在Tomcat这样的纯IO型(主要耗时在等待状态)的任务,那么就可能不是想要的效果。所以Tomcat自己集成自ThreadPoolExecutor实现了一套,先创建核心线程到最大线程的机制(重写了队列的offer方法,照成队列假满的假象),最后再放入队列中,等到真正的队列满了之后再执行拒绝策略。
我个人理解是,现在微服务并行获取其他服务结果再综合处理的情况非常多,那么线程本身处理等待状态的情况就比较多,所以自己在这种业务场景下直接创建(使用)Tomcat的线程池来处理任务。欢迎指正、讨论。
更多推荐
所有评论(0)