前言:在编程中我们为什么要使用线程池,线程池中的线程是怎么执行任务的,线程池中的线程是如何复用和销毁的;

1 什么是线程池:
提前创建一些线程放到一个地方,使用的时候直接获取,避免频繁的创建和销毁线程,节省内存和CPU资源;
2 Java 中已有的线程池:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

ExecutorService cashedThreadPool = Executors.newCachedThreadPool();

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);

3 一个线程池构建需要的参数:
我们的任务是由某一个线程具体去执行的,所以我们就要定义好池中线程的数量,并且当任务数量增加时,可以开辟一些临时线程进行任务处理,当池中的线程已经是多余的时候再回收掉一些线程来节约资源;当池中的线程都有任务执行,这个时候来了新的任务,需要有个地方能把这些任务先行储存,以便当有空余的线程时在执行存储下来的任务,而且考虑到资源的情况这个存储任务的队列也最好是有限量的,如果超出了程序的处理能力,使用者可以自己决定拒绝策略;
所以在创建线程池的时候有必要以下参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;

corePoolSize: 正式存在的线程数;
maximumPoolSize:允许存在的最大线程,扩容的线程数= maximumPoolSize-corePoolSize
keepAliveTime:临时线程存活的时间
Unit:临时线程存活的时间单位
workQueue:阻塞队列
threadFactory:线程工厂
Handler:拒绝策略;

4 线程池任务的执行和线程销毁:
demo:

ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Thread( () -> testGetFutureMap("param")));
private static Map<String, Object> testGetFutureMap(String param) {
   // 处理业务逻辑
   Map<String, Object>  mapData = new HashMap<>();
   /**
    * do some thing
    */
   System.out.printf("do some thing");
   return mapData;
}

ThreadPoolExecutor:execute 线程任务的执行

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    	// 当前工作的线程小于核心线程,直接新建线程,并且进行任务的执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 当前工作的线程大于核心线程,或者直接添加任务失败
    if (isRunning(c) && workQueue.offer(command)) {
    	// 添加到队列中,等待后续空闲线程执行任务
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
   //   当前工作的线程大于核心线程,或者直接添加任务失败,并且添加队列失败
   // 开启临时线程进行任务处理
    else if (!addWorker(command, false))
    	// 如果失败执行拒绝策略
        reject(command);
}

addWorker:线程任务的执行

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
		// 判断当前线程池是否可用	
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
			 // 判断当前工作线程数和核心线程数或者最大线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
			// 增加工作线程数,增加成功,直接跳出for 循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
			// 增加线程数失败,进行重试
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       // 创建线程,传入任务
      // 执行任务 new Thread(new Worker()).start();
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 添加任务  
		             workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
		         // 启动线程执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
	        // 添加任务失败,工作任务数量-1,移除任务
            addWorkerFailed(w);
    }
    return workerStarted;
}

compareAndIncrementWorkerCount:增加线程池中的线程数量

private boolean compareAndIncrementWorkerCount(int expect) {
	// 工作线程数量+1
     return ctl.compareAndSet(expect, expect + 1);
 }

构建Worker用于具体任务的执行:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

执行任务 t.start() ,调用Worker run():

public void run() {
    runWorker(this);
}
finalvoid runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
        	// 任务执行,没有任务的时候不进入while 循环
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 线程不可用
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
            	// 在任务执行之前,此方法可以被重写
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                	// 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                	// 任务执行以后
                    afterExecute(task, thrown);
                }
            } finally {
            	// 执行任务后
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
    	// 线程的退出,减少工作线程数量
        processWorkerExit(w, completedAbruptly);
    }
}

获取任务 getTask():

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 线程池不可用
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 当前工作线程大于核心线程,并且没有了任务,则将线程池中线程数量-1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
        	// 当前工作线程大于核心线程数 则进行规定时间内获取任务(规定时间内没有获取到则说明当前没有需要执行的任务)
        	// 否则直接获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;// 返回任务
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

任务执行完毕后退出线程,processWorkerExit:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       decrementWorkerCount();

   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
   		// 增加任务完成的数量
       completedTaskCount += w.completedTasks;
       // 移除任务
       workers.remove(w);
   } finally {
       mainLock.unlock();
   }

   tryTerminate();

   int c = ctl.get();
   if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
           int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
           if (min == 0 && ! workQueue.isEmpty())
               min = 1;
           if (workerCountOf(c) >= min)
               return; // replacement not needed
       }
       addWorker(null, false);
   }
}

以上为线程池中线程创建,执行任务,以及销毁线程的过程,流程图如下:
在这里插入图片描述
过程:
(1)当提交任务后,如果当前工作的线程没有超过核心线程,则创建线程然后进行任务的执行;
(2)当工作的现车超过核心线程数,则尝试添加到阻塞队列中,添加成功后,有空闲线程时从队列中获取任务并执行;
(3)如果阻塞队列已满,则判断是否要增加临时线程处理任务,如果已经达到最大线程数,则执行拒绝策略;否则创建临时线程,执行任务;
(4)当任务执行完毕后,如果没有了任务,并且当前工作的线程大于核心小程,则执行线程的销毁;

5 总结:
5.1 线程池的创建是为了避免线程频繁的创建和销毁,是为了线程的复用,增加线程池中的线程可以提高任务执行的效率,但是线程池中线程过多会造成频繁的上下文切换,所以线程数量并不是越多越好;
5.2 我们可以自定义线程池,通过重写方法的方式,更好的监控线程的执行:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceMonitor extends ThreadPoolExecutor {
    public ExecutorServiceMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
	// 线程执行任务之前
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        
        super.beforeExecute(t, r);
    }
  // 线程执行任务之后
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }
}
Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐