并发编程工具 - 线程池的使用和自己的封装
目录线程池的使用线程池参数设置自己封装的线程池工具根据阿姆达尔(Amdal)定律可知[并发编程基础 - 安全性、活跃性、性能问题#性能问题],若串行度为5%那么无论使用任何技术,性能最高提升20%,所以减小串行度(减小锁粒度、算法优化等)是前提。但是除此之外使用多线程并行任务也是比较常用的手段,效果也非常明显,只是使用不好坑会非常多,所以最好深入理解其运行原理,这当然也是面试的高频。线程池的使用上
目录
根据阿姆达尔(Amdal)定律可知[并发编程基础 - 安全性、活跃性、性能问题#性能问题],若串行度为5%那么无论使用任何技术,性能最高提升20%,所以减小串行度(减小锁粒度、算法优化等)是前提。但是除此之外使用多线程并行任务也是比较常用的手段,效果也非常明显,只是使用不好坑会非常多,所以最好深入理解其运行原理,这当然也是面试的高频。
线程池的使用
上面的多线程并行是最理想的状态,真正执行的时候与CPU的核数(并行度)以及上下文的切换,GC垃圾回收线程(特别是Stop The World阶段)的运行等综合的结果。理论上串行那么就是所有耗时的总和,并行使用合理则耗时为最长一个的时间;结合具体的线程池工作原理,为了方便理解执行相同耗时的任务比如上图,真实耗时介于1到三倍中间,即最差的耗时也就相当于串行。这种情况下我们可以使用下面线程池封装的executeAll方法,底层是调用了ThreadPoolExecutor的invokeAll接口。类似这样的方法,我们还可以使用Java提供了CompletionService从队列中获取;或者CompletableFuture#allOf方法;当然也可以使用execute方法结合CountDownLatch,后面专门分析和项目使用。针对更复杂的情况,CompletableFuture提供了强大的API,自己仅仅是在项目上使用部分。
比如项目上,自己将获取一个分段的字符串,每段使用线程池任务获取,只是多线程处理的结果在合并到主线程(当前主线程一般是比如Tomcat线程池中的一个线程)时,需要注意数据的安全性,我下面使用了ConcurrentHahsMap。最终放入结合下面的线程池工具类如下:
private void aaa(StringBuilder orderCode) {
final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
List<Callable<String>> taskList = Lists.<Callable<String>>newArrayList(
() -> {
SendPayDTO sendPayDTO = getInstance().get().sendPayDTO;
DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTOBySoCode(sendPayDTO.getSoTypeCode());
return resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
},
() -> resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage()),
() -> resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage())
);
// 阻塞获取结果
ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));
SimpleThreadPool.executeAll(executor, taskList).forEach(this::getFuture);
orderCode.append(BEFORE_SALE_DEFINITION_DEFAULT)
.append(resultMap.get(SALE_DEFINITION))
.append(AFTER_SALE_DEFINITION_DEFAULT)
.append(resultMap.get(SALE_AUDIT))
.append(resultMap.get(SHIPPING_CONDITION));
}
使用CountDownLatch实现相同的效果,上面的方法可以修改为:
private void byCountDownLatch(StringBuilder orderCode) throws InterruptedException {
final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
CountDownLatch countDownLatch = new CountDownLatch(3);
ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));
executor.execute(() -> {
SendPayDTO sendPayDTO = getInstance().get().sendPayDTO;
DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTOBySoCode(sendPayDTO.getSoTypeCode());
resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
countDownLatch.countDown();
});
executor.submit(() -> {
resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage());
countDownLatch.countDown();
});
executor.submit(() -> {
resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage());
countDownLatch.countDown();
});
// 阻塞获取结果
countDownLatch.await();
orderCode.append(BEFORE_SALE_DEFINITION_DEFAULT)
.append(resultMap.get(SALE_DEFINITION))
.append(AFTER_SALE_DEFINITION_DEFAULT)
.append(resultMap.get(SALE_AUDIT))
.append(resultMap.get(SHIPPING_CONDITION));
}
线程池参数设置
CPU密集型: 一般设置为CPU核(或者核线程,如果我们现在电脑是4核8线程)+ 1;增加1是为了防止线程偶发性的缺页中断或者其他原因导致的任务暂停,导致资源使用不充分。
I/O密集型:一般设置为CPU核 * 2
上面是极端的情况,很多时候我们的任务是综合型的,那么可以根据下面公式大概计算(我们可以使用VisualVM等来查询WT/ST):
线程数=N(CPU核数)*(1+WT(线程等待时间)/ST(线程时间运行时间))
但是不管怎么计算,上面的只是一个基础参数值,具体的还需要根据压测来判断。理论上,随着参数的增加性能会提升,替身到一定的值后会出现拐点,再比较机具的向下走,所以压测就是为了找到拐点是的参数值。
因为上面也提到了,还跟GC情况,上下文切换,CPU和内存动态等有关,所以压测是不仅要看Tps值,还要随时注意CPU和内存的使用情况,GC的回收耗时和情况,必要也可以查看线程上下文的切换[并发编程基础 - 多线程的上下文切换问题]。
线程池监控
使用线程池时,一般情况下只有等到问题出现时再去dump等,当然ThreadPoolExecutor提供了调用的API,比如查看处理的任务数,当前的线程数等,那么我们可以基于定时任务打印到日志中,前面还提到了每个线程执行Work#runWorker时,在调用我们的写的Runnable的run方法前后都允许定义回调任务。并且上一篇看到,Tomcat的ThreadPoolExecutor类也重写了后置回调方法。
1、定时任务打印线程池情况到日志
这种方式在下面工具的ThreadPoolInit中也有体现,具体使用了juc的单线程的ScheduledThreadPoolExecutor线程池,如下:
public static final ScheduledExecutorService SINGLE_POOL = Executors.newSingleThreadScheduledExecutor();
private static void printScheduledThreadStats() {
SINGLE_POOL.scheduleAtFixedRate(() -> {
THREAD_POOL_EXECUTOR_MAP.forEach((name, threadPool) -> {
log.info("{} Pool Size: {}", name, threadPool.getPoolSize());
log.info("{} Active Threads: {}", name, threadPool.getActiveCount());
log.info("{} Number of Tasks Completed: {}", name, threadPool.getCompletedTaskCount());
log.info("{} Number of Tasks in Queue: {}", name, threadPool.getQueue().size());
});
}, 0L, 5L, TimeUnit.SECONDS);
}
2、每个我们的Runnable#run方法前后打印
这样需要像Tomcat一下继承自juc的ThreadPoolExecutor,并且按需要重新前置和后置方法(如果我们的任务执行非常快,并且量非常大则肯定不能使用该方式,否则打印太多了):
@Slf4j
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
@Override
protected void beforeExecute(Thread thread, Runnable runnable) {}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
log.info("{} Pool Size: {}", super.getPoolSize());
log.info("{} Active Threads: {}", super.getActiveCount());
log.info("{} Number of Tasks Completed: {}", super.getCompletedTaskCount());
log.info("{} Number of Tasks in Queue: {}", super.getQueue().size());
}
// 省略重写父类的构造。。。
}
自己封装的线程池工具
1、自己封装的线程池管理工具,仅仅是为了统一管理,以及多个微服务项目需要使用时候,提高代码的复用性。
2、使用时发现一个项目启动了5个及以上的线程池,那么波峰是无法估计的,也比较危险,压测也只压一个线程池或者方法。之前一直不理解怎么去控制,后面理解了如果一个项目要启动那么多线程池,其实项目本身也早就该拆分了(拆分到不同的项目JVM 中)。
3、而每一个应用的线程池性质本身;服务器的配置情况只有开发的人自己清楚,所以使用了Java原生的Spi机制实现
public class ThreadPoolImpl implements ThreadPool {
@Override
public List<ThreadPoolEntity> appendThreadPool() {
return Lists.newArrayList(
new ThreadPoolEntity(ThreadPoolEnum.TRANSFER_ORDER.name(), SimpleThreadPool.PoolModel.FAST_IO, Boolean.FALSE, Boolean.TRUE,
"****线程池【Per PO Per Thread】", 5, 8, 30, TimeUnit.SECONDS,
new TaskQueue(50), new ThreadPoolExecutor.AbortPolicy())
);
}
}
4、定义的线程池类型枚举,CPU型、IO型、FAST_IO型【就是原生的Tomcat类型,先创建最大线程数,再放入队列,详细见编发编程工具 - Tomcat对juc线程池的扩展】
代码如下(欢迎讨论):
@Slf4j
@Component
@SuppressWarnings("ALL")
public class SimpleThreadPool extends ThreadPoolInit implements EnvironmentAware {
/**
* 默认最大的超时时间
*/
private static int DEFAULT_TIMEOUT = 50000;
@Override
public void setEnvironment(Environment environment) {
// 执行父类中的环境设置
super.setEnvironment(environment);
String[] activeProfiles = environment.getActiveProfiles();
if (activeProfiles == null) {
return;
}
String pro = "prod";
for (String activeProfile : activeProfiles) {
if (pro.equals(activeProfile)) {
if (log.isInfoEnabled()) {
log.info("active spring profile prod");
}
DEFAULT_TIMEOUT = 2000;
}
}
if (log.isInfoEnabled()) {
log.info("SimpleThreadPool DEFAULT_TIMEOUT = {} ms", DEFAULT_TIMEOUT);
}
}
/**
* 执行没有返回值的任务
* @param key 线程池枚举
* @param runnable 任务
*/
public static void execute(String key, Runnable runnable) {
if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
throw new IllegalArgumentException("未找到线程池:" + key);
}
// 执行任务
THREAD_POOL_EXECUTOR_MAP.get(key).execute(runnable);
}
/**
* 执行有返回值任务
*
* @param key 线程池名称
* @param callable 需要执行的任务可变数组
* @param <T> 任务类型
* @return 任务结果
*/
public static <T> List<Future<T>> execute(String key, Callable<T>... callable) {
if (callable == null || callable.length == 0) {
throw new IllegalArgumentException("任务不能为空!");
}
List<Callable<T>> taskList = Arrays.stream(callable).collect(Collectors.toList());
return executeAll(key, taskList);
}
/**
* 批量执行并行任务, 使用默认的最大超时时间,单位毫秒
* @param key 线程池名称
* @param callableList 任务列表
* @return 结果
*/
public static <T> List<Future<T>> executeAll(String key, List<Callable<T>> callableList) {
return executeAll(key, DEFAULT_TIMEOUT, callableList);
}
/**
* 批量执行并行任务, 使用默认的最大超时时间,单位毫秒
* @param key 线程池名称
* @param callableList 任务列表
* @return 结果
*/
public static <T> List<Future<T>> executeAll(ExecutorService service, List<Callable<T>> callableList) {
return executeAll(service, DEFAULT_TIMEOUT, callableList);
}
/**
* 批量执行并行任务
* @param key 线程池名称
* @param maxTimeout 最大超时时间,没有取默认值,单位2000毫秒
* @param callableList 任务列表
* @return 结果
*/
public static <T> List<Future<T>> executeAll(ExecutorService service, int maxTimeout, List<Callable<T>> callableList) {
try {
return service.invokeAll(callableList, maxTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("线程池批量执行任务异常失败", e);
} catch (Exception e) {
log.error("线程池批量执行任务异常失败", e);
}
return new ArrayList<>();
}
/**
* 批量执行并行任务
* @param key 线程池名称
* @param callableList 任务列表
* @return 结果
*/
public static <T> List<Future<T>> executeAllUntil(String key, List<Callable<T>> callableList) {
if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
throw new IllegalArgumentException("未配置线程池" + key);
}
try {
return THREAD_POOL_EXECUTOR_MAP.get(key).invokeAll(callableList);
} catch (InterruptedException e) {
log.error("线程池批量执行任务异常失败", e);
} catch (Exception e) {
log.error("线程池批量执行任务异常失败", e);
}
return new ArrayList<>();
}
/**
* 批量执行并行任务
* @param key 线程池名称
* @param maxTimeout 最大超时时间,没有取默认值,单位2000毫秒
* @param callableList 任务列表
* @return 结果
*/
public static <T> List<Future<T>> executeAll(String key, int maxTimeout, List<Callable<T>> callableList) {
if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
throw new IllegalArgumentException("未配置线程池" + key);
}
try {
return THREAD_POOL_EXECUTOR_MAP.get(key).invokeAll(callableList, maxTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("线程池批量执行任务异常失败", e);
} catch (Exception e) {
log.error("线程池批量执行任务异常失败", e);
}
return new ArrayList<>();
}
/**
* @param key 线程池名称
* @param callableList 任务列表
* @return 结果
*/
public static void executeRunnable(String key, Runnable... r) {
if (!THREAD_POOL_EXECUTOR_MAP.containsKey(key)) {
throw new IllegalArgumentException("未配置线程池" + key);
}
ThreadPoolExecutor executor = THREAD_POOL_EXECUTOR_MAP.get(key);
for (int i = 0; i < r.length; i++) {
executor.submit(r[i]);
}
}
}
@Slf4j
public class ThreadPoolInit implements EnvironmentAware {
/**
* 定时执行任务的单线程, 所有任务公用
* 当前使用的有: 打印线程信息,装运点等定时刷新任务
*/
public static final ScheduledExecutorService SINGLE_POOL = Executors.newSingleThreadScheduledExecutor();
/**
* 线程池容器
*/
public static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR_MAP = new ConcurrentHashMap<>(16);
/**
* 是否打印
*/
private static Boolean printThreadPoolInfoInterval = Boolean.TRUE;
static {
// 初始化线程池
ServiceLoader<ThreadPool> load = ServiceLoader.load(ThreadPool.class);
load.forEach(threadPool -> threadPool.appendThreadPool().forEach(SimpleThreadPool::putThreadPool));
}
@Override
public void setEnvironment(Environment environment) {
String[] activeProfiles = environment.getActiveProfiles();
String pro = "dev";
for (String activeProfile : activeProfiles) {
if (pro.equals(activeProfile)) {
return;
}
}
// 初始化线程池状态信息订单打印
if (printThreadPoolInfoInterval) {
printScheduledThreadStats();
}
}
/**
* 线程池类型, 只是作为标识当前任务是CPU型还是IO型为主
*/
@SuppressWarnings("unused")
public enum PoolModel {
/** io型 */
IO,
/** CPU型 */
CPU,
/** io型,Tomcat扩展的 juc线程池 */
FAST_IO
}
/**
* 线程工厂
*/
static class DefaultThreadFactory implements ThreadFactory {
/**
* 定义线程组
*/
static ThreadGroup threadGroup;
/**
* 定义每个线程池中每个线程的名称后缀数字
*/
static final AtomicInteger THREAD_NUMBER = new AtomicInteger(1);
/**
* 定义每个线程词的名称前缀
*/
static String namePrefix;
public DefaultThreadFactory(String name) {
final SecurityManager securityManager = System.getSecurityManager();
threadGroup = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name + "-thread-";
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(threadGroup, runnable, namePrefix + THREAD_NUMBER.getAndIncrement(), 0);
if (thread.isDaemon()) {
thread.setDaemon(false);
}
if(thread.getPriority() != Thread.NORM_PRIORITY){
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}
/**
* 启动打印线程
*/
private static void printScheduledThreadStats() {
SINGLE_POOL.scheduleAtFixedRate(() -> THREAD_POOL_EXECUTOR_MAP.forEach((name, threadPool) -> {
log.info("{} Pool Size: {}", name, threadPool.getPoolSize());
log.info("{} Active Threads: {}", name, threadPool.getActiveCount());
log.info("{} Number of Tasks Completed: {}", name, threadPool.getCompletedTaskCount());
log.info("{} Number of Tasks in Queue: {}", name, threadPool.getQueue().size());
}), 0, 5, TimeUnit.SECONDS);
}
/**
* 往线程池容器中放入线程池池
* @param threadPoolEntity 线程池定义对象
*/
public static void putThreadPool(ThreadPoolEntity threadPoolEntity) {
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(threadPoolEntity);
log.info("name: {}, threadPoolExecutor = {}", threadPoolEntity.taskName, threadPoolExecutor);
THREAD_POOL_EXECUTOR_MAP.put(threadPoolEntity.taskName, threadPoolExecutor);
}
/**
* 根据枚举获取线程池
* @param entity 线程池枚举
*/
private static ThreadPoolExecutor getThreadPoolExecutor(ThreadPoolEntity entity) {
ThreadPoolExecutor executor;
if (entity.poolModel == PoolModel.FAST_IO) {
if (!(entity.blockingQueue instanceof TaskQueue)) {
throw new RuntimeException("PoolModel.FAST_IO 类型的线程池,只能创建 " + TaskQueue.class.getName() + " 类型的队列!");
}
TaskQueue taskQueue = (TaskQueue)entity.blockingQueue;
ThreadPoolExecutorImpl threadPoolExecutor = new ThreadPoolExecutorImpl(entity.corePoolNum, entity.maxPoolNum, entity.deleteThreadNum,
entity.deleteTreadUnit, taskQueue, new TaskThreadFactory(entity.taskName), entity.rejectedExecutionHandler);
// 设置父类,用于判断线程的对列表是否真的满了
taskQueue.setParent(threadPoolExecutor);
executor = threadPoolExecutor;
log.info("init ThreadPoolExecutorImpl and TaskQueue!");
} else {
executor = new ThreadPoolExecutor(entity.corePoolNum, entity.maxPoolNum, entity.deleteThreadNum, entity.deleteTreadUnit,
entity.blockingQueue, new DefaultThreadFactory(entity.taskName), entity.rejectedExecutionHandler);
// 是否预热核心线程
if (entity.preStartAllCoreThreads) {
executor.prestartAllCoreThreads();
}
log.info("init ThreadPoolExecutor and {}}!", entity.blockingQueue.getClass().getSimpleName());
}
try {
// 是否允许核心线程超时
if (entity.allowsCoreThreadTimeOut) {
executor.allowsCoreThreadTimeOut();
}
} catch (NullPointerException e) {
log.error("初始化线程池错误:" + e);
}
return executor;
}
}
SPI机制使用的接口和定义如下:
@FunctionalInterface
public interface ThreadPool {
/**
* 添加线程池
* @return 线程池定义对象
*/
List<ThreadPoolEntity> appendThreadPool();
}
@AllArgsConstructor
public final class ThreadPoolEntity {
/** 先池名称 */
public final String taskName;
/** 线程池任务类型 */
public final SimpleThreadPool.PoolModel poolModel;
/** 是否允许核心线程超时 */
public final Boolean allowsCoreThreadTimeOut;
/** 是否预热核心线程池 */
public final Boolean preStartAllCoreThreads;
/** 线程池说明 */
public final String detail;
/** 核心线程数 */
public final int corePoolNum;
/** 最大线程数 */
public final int maxPoolNum;
/** 超时时间 */
public final int deleteThreadNum;
/** 超时单位 */
public final TimeUnit deleteTreadUnit;
/** 任务队列,没有特殊理由不能使用无界队列 */
public final BlockingQueue<Runnable> blockingQueue;
/** 拒绝策略 */
public final RejectedExecutionHandler rejectedExecutionHandler;
}
更多推荐
所有评论(0)