并发编程工具 - CompletableFuture的API和项目使用
目录一、CompletableFuture的API1、CompletableFuture整体结构2、按照类型分类3、按照关系分类二、CompletableFuture的demo、项目使用1、使用Demo2、自己项目上的使用并发编程时的可以并行的应用场景非常多,比如C依赖于A、B并行的结果,但是整体可以和D并行等,如果是Java8之前的话,则需要将任务封装成每个Runnable(或者Callable
目录
并发编程时的可以并行的应用场景非常多,比如C依赖于A、B并行的结果,但是整体可以和D并行等,如果是Java8之前的话,则需要将任务封装成每个Runnable(或者Callable)也可以实现。但是Java8增加了CompletableFuture丰富的Api完全满足我们各种场景或者模型。在处理任务时会交给线程池,如果我们外部传入ThreadPoolExecutor则使用给线程池处理任务,否则是否Java8公共的ForkJoinPool线程池(Stream等都会公用该线程池),所以最好使用自己的线程池。
一、CompletableFuture的API
1、CompletableFuture整体结构
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// Either the result or boxed AltResult 执行结果或者异常,所以用Object表示
volatile Object result;
// Top of Treiber stack of dependent actions 栈顶部的动作,因为将其分为CompletionStage
volatile Completion stack;
}
可以看出实现了Future接口但是没有实现TaskFuture(后面专门分析Future的实现原理),实现了CompletionStage接口(定义了常用的API关系动作,如:thenApply等)。其所有类型的API都提供了大致三类:
1、同步API
2、异步 可以传入我们自己的ThreadPoolExecutor线程池
3、异步 使用内部默认的ForkJoinPool.commonPool(),传入的队列模式是LIFO【后进先出队列】
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinPool.ForkJoinWorkerThreadFactory factory = null;
Thread.UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinPool.ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new ForkJoinPool.InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
提供了两种构造函数:
/**
* Creates a new incomplete CompletableFuture.
*/
public CompletableFuture() {}
/**
* Creates a new complete CompletableFuture with given encoded result.
*/
private CompletableFuture(Object r) {
this.result = r;
}
或者使用静态方法创建对象,可以允许我们传入Callable或者Runnable:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
2、按照类型分类
1)、中间操作API
首先,Runnable类型的参数会忽略计算结果;Consumer是纯消费型计算结果;BiConsumer会组合另外一个CompletionStage纯消费;Function会对计算结果做转换;BiFunction会组合另一个CompletionStage的结果做转换。
2)、终止操作API
3)、阻塞或轮训获取结果
3、按照关系分类
1)、串行
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
2)、AND 汇聚关系
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
3)、OR 汇聚关系
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
4)、异常处理
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
二、CompletableFuture的demo、项目使用
1、使用Demo
使用CompletableFuture实现泡茶例子【步骤1和2可以并行;3需要等待1和2执行完成】
1、洗水壶,烧开水
2、洗茶壶,洗茶杯,那茶叶
3、泡茶
/**
* {@link java.util.concurrent.CompletableFuture} 实现泡茶例子
* 1、洗水壶,烧开水
* 2、洗茶壶,洗茶杯,那茶叶
* 3、泡茶
*
* 步骤1和2可以并行;3需要等待1和2执行完成
*
* <p>
* CompletableFuture方法基本都提供了两个,在没有传入线程池Executor的情况下,默认会使用ForkJoin的公共线程池,否则使用传入的线程池
* CompletableFuture继承自CompletableStage,其中定义了很多的并发执行的 AND、OR、分支合并等接口,并且该接口也分成两类,如果没有传入线程池Executor
* 则使用上面传入非线程池,否则使用传入的【后续可以打印一下线程名称】
*
* @author lihongmin
* @date 2020/8/4 15:32
* @since 1.0.0
*/
public class CompletableFutureDemo {
public static void main(String[] args) {
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
try {
long id = Thread.currentThread().getId();
System.out.println(id + "洗水壶");
Thread.sleep(300);
System.out.println(id + "烧开水");
Thread.sleep(500);
} catch (InterruptedException e) {}
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
try {
long id = Thread.currentThread().getId();
System.out.println(id + "洗茶壶");
Thread.sleep(500);
System.out.println(id + "洗茶杯");
Thread.sleep(500);
System.out.println(id + "拿茶叶");
Thread.sleep(500);
} catch (InterruptedException e) {}
return "龙井茶";
});
CompletableFuture<String> cf3 = cf1.thenCombineAsync(cf2, (cf1Obj, cf2Obj) -> {
long id = Thread.currentThread().getId();
System.out.println(id + "拿到茶叶:" + cf2Obj);
System.out.println(id + "泡茶。。。");
return "上茶" +cf2Obj;
});
System.out.println(cf3.join());
}
}
2、自己项目上的使用
自己在项目上需要获取一段字符串进行返回,而分析完业务模型后认为 字符串分三段进行处理,而第三段本身是可以并行的任务最后选择用ThreadPoolExecutor#invokeAll进行处理,但是第三段依赖第一段的结果(可能需要在第三段增加两个并行任务,可能返回默认值),最后所以结果都在第三段的最后拼装。所以把第二段也与第一段并行。
用到了CompletableFuture的supplyAsync、thenCombineAsync、join方法
private void purchaseSendPay(StringBuilder orderCode) {
final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));
// 根据创建订单码阶段,获取销售订单类型、销售订单订单码
CompletableFuture<DefSaleOrderDTO> cf1 = CompletableFuture.supplyAsync(() -> {
DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTO();
resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
return defSaleOrderDTO;
}, executor);
// 获取销售开单(即货源安排订单码)
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
String sendPay = confSupplyService.sendPayStage();
resultMap.put(SALE_CREATE, sendPay);
return sendPay;
}, executor);
// 初始化确认的任务
List<Callable<String>> taskList = Lists.<Callable<String>>newArrayList(
() -> resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage()),
() -> resultMap.put(PURCHASE_DEFINITION, defPurOrderService.sendPayStage()),
() -> resultMap.put(PURCHASE_AUDIT, confPurOrderAuditService.sendPayStage()),
() -> resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage())
);
// 合并任务
cf1.thenCombineAsync(cf2, (cf1Obj, cf2Obj) -> {
String saleType = cf1Obj.getSoCode();
String saleCreate = resultMap.get(SALE_CREATE);
// 重新设置销售订单类型,之前可能为空, SHIPPING_CONDITION也需要依赖
getInstance().get().sendPayDTO.setSoTypeCode(saleType);
// getInstance().set(param);
// 是否转Vso控制,是则订单码填充00
Boolean isVso = isControl(saleCreate, TRANSFER_VSO_CONTROL);
if (isVso) {
resultMap.put(VSO_TO_SO, VSO_TO_SO_DEFAULT);
resultMap.put(PRE_SELL_AUDIT, VSO_DEFAULT);
} else {
taskList.add(() -> resultMap.put(VSO_TO_SO, confVsoToSoService.sendPayStage()));
taskList.add(() -> resultMap.put(PRE_SELL_AUDIT, presellOrderService.sendPayStage()));
}
// 阻塞获取结果
SimpleThreadPool.executeAll(executor, taskList).forEach(this::getFuture);
for (StateConfigEnum configEnum : values()) {
String sendPay = resultMap.get(configEnum);
if (StringUtil.isNotBlank(sendPay)) {
orderCode.append(sendPay);
}
}
return null;
}, executor)
// 阻塞获取结果
.join();
}
更多推荐
所有评论(0)