目录

一、CompletableFuture的API

    1、CompletableFuture整体结构

    2、按照类型分类

    3、按照关系分类

二、CompletableFuture的demo、项目使用

    1、使用Demo

    2、自己项目上的使用


    并发编程时的可以并行的应用场景非常多,比如C依赖于A、B并行的结果,但是整体可以和D并行等,如果是Java8之前的话,则需要将任务封装成每个Runnable(或者Callable)也可以实现。但是Java8增加了CompletableFuture丰富的Api完全满足我们各种场景或者模型。在处理任务时会交给线程池,如果我们外部传入ThreadPoolExecutor则使用给线程池处理任务,否则是否Java8公共的ForkJoinPool线程池(Stream等都会公用该线程池),所以最好使用自己的线程池。

一、CompletableFuture的API

    1、CompletableFuture整体结构

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

    // Either the result or boxed AltResult 执行结果或者异常,所以用Object表示
    volatile Object result;     

    // Top of Treiber stack of dependent actions 栈顶部的动作,因为将其分为CompletionStage
    volatile Completion stack;    
}

    可以看出实现了Future接口但是没有实现TaskFuture(后面专门分析Future的实现原理),实现了CompletionStage接口(定义了常用的API关系动作,如:thenApply等)。其所有类型的API都提供了大致三类:

1、同步API

2、异步 可以传入我们自己的ThreadPoolExecutor线程池

3、异步 使用内部默认的ForkJoinPool.commonPool(),传入的队列模式是LIFO【后进先出队列】

private static ForkJoinPool makeCommonPool() {
	int parallelism = -1;
	ForkJoinPool.ForkJoinWorkerThreadFactory factory = null;
	Thread.UncaughtExceptionHandler handler = null;
	try {  // ignore exceptions in accessing/parsing properties
		String pp = System.getProperty
				("java.util.concurrent.ForkJoinPool.common.parallelism");
		String fp = System.getProperty
				("java.util.concurrent.ForkJoinPool.common.threadFactory");
		String hp = System.getProperty
				("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
		if (pp != null)
			parallelism = Integer.parseInt(pp);
		if (fp != null)
			factory = ((ForkJoinPool.ForkJoinWorkerThreadFactory)ClassLoader.
					getSystemClassLoader().loadClass(fp).newInstance());
		if (hp != null)
			handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
					getSystemClassLoader().loadClass(hp).newInstance());
	} catch (Exception ignore) {
	}
	if (factory == null) {
		if (System.getSecurityManager() == null)
			factory = defaultForkJoinWorkerThreadFactory;
		else // use security-managed default
			factory = new ForkJoinPool.InnocuousForkJoinWorkerThreadFactory();
	}
	if (parallelism < 0 && // default 1 less than #cores
			(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
		parallelism = 1;
	if (parallelism > MAX_CAP)
		parallelism = MAX_CAP;
	return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
			"ForkJoinPool.commonPool-worker-");
}

    提供了两种构造函数

/**
 * Creates a new incomplete CompletableFuture.
 */
public CompletableFuture() {}

/**
 * Creates a new complete CompletableFuture with given encoded result.
 */
private CompletableFuture(Object r) {
	this.result = r;
}

或者使用静态方法创建对象,可以允许我们传入Callable或者Runnable

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
	return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
	return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
	return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
	return asyncRunStage(screenExecutor(executor), runnable);
}

 

    2、按照类型分类

1)、中间操作API

    首先,Runnable类型的参数会忽略计算结果;Consumer是纯消费型计算结果;BiConsumer会组合另外一个CompletionStage纯消费;Function会对计算结果做转换;BiFunction会组合另一个CompletionStage的结果做转换。

 

2)、终止操作API

3)、阻塞或轮训获取结果

 

 

    3、按照关系分类

1)、串行

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

2)、AND 汇聚关系

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3)、OR 汇聚关系

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

4)、异常处理

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

二、CompletableFuture的demo、项目使用

    1、使用Demo

使用CompletableFuture实现泡茶例子【步骤1和2可以并行;3需要等待1和2执行完成】
 1、洗水壶,烧开水
 2、洗茶壶,洗茶杯,那茶叶
 3、泡茶

/**
 *  {@link java.util.concurrent.CompletableFuture} 实现泡茶例子
 *  1、洗水壶,烧开水
 *  2、洗茶壶,洗茶杯,那茶叶
 *  3、泡茶
 *
 *  步骤1和2可以并行;3需要等待1和2执行完成
 *
 *  <p>
 *      CompletableFuture方法基本都提供了两个,在没有传入线程池Executor的情况下,默认会使用ForkJoin的公共线程池,否则使用传入的线程池
 *      CompletableFuture继承自CompletableStage,其中定义了很多的并发执行的 AND、OR、分支合并等接口,并且该接口也分成两类,如果没有传入线程池Executor
 *          则使用上面传入非线程池,否则使用传入的【后续可以打印一下线程名称】
 *
 * @author lihongmin
 * @date 2020/8/4 15:32
 * @since 1.0.0
 */
public class CompletableFutureDemo {

    public static void main(String[] args) {
        CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
            try {
                long id = Thread.currentThread().getId();
                System.out.println(id + "洗水壶");
                Thread.sleep(300);

                System.out.println(id + "烧开水");
                Thread.sleep(500);
            } catch (InterruptedException e) {}
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                long id = Thread.currentThread().getId();
                System.out.println(id + "洗茶壶");
                Thread.sleep(500);

                System.out.println(id + "洗茶杯");
                Thread.sleep(500);

                System.out.println(id + "拿茶叶");
                Thread.sleep(500);

            } catch (InterruptedException e) {}
            return "龙井茶";
        });

        CompletableFuture<String> cf3 = cf1.thenCombineAsync(cf2, (cf1Obj, cf2Obj) -> {
            long id = Thread.currentThread().getId();
            System.out.println(id + "拿到茶叶:" + cf2Obj);
            System.out.println(id + "泡茶。。。");
            return "上茶" +cf2Obj;
        });

        System.out.println(cf3.join());
    }
}

 

    2、自己项目上的使用

自己在项目上需要获取一段字符串进行返回,而分析完业务模型后认为 字符串分三段进行处理,而第三段本身是可以并行的任务最后选择用ThreadPoolExecutor#invokeAll进行处理,但是第三段依赖第一段的结果(可能需要在第三段增加两个并行任务,可能返回默认值),最后所以结果都在第三段的最后拼装。所以把第二段也与第一段并行。

用到了CompletableFuturesupplyAsync、thenCombineAsync、join方法

private void purchaseSendPay(StringBuilder orderCode) {
    final ConcurrentHashMap<StateConfigEnum, String> resultMap = new ConcurrentHashMap<>(16);
    ExecutorService executor = TtlExecutors.getTtlExecutorService(SimpleThreadPool.THREAD_POOL_EXECUTOR_MAP.get(SEND_PAY.name()));

    // 根据创建订单码阶段,获取销售订单类型、销售订单订单码
    CompletableFuture<DefSaleOrderDTO> cf1 = CompletableFuture.supplyAsync(() -> {
        DefSaleOrderDTO defSaleOrderDTO = defSaleOrderService.sendPayDTO();
        resultMap.put(SALE_DEFINITION, defSaleOrderDTO.getCursorCode());
        return defSaleOrderDTO;
    }, executor);
    // 获取销售开单(即货源安排订单码)
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        String sendPay = confSupplyService.sendPayStage();
        resultMap.put(SALE_CREATE, sendPay);
        return sendPay;
    }, executor);
    // 初始化确认的任务
    List<Callable<String>> taskList = Lists.<Callable<String>>newArrayList(
            () -> resultMap.put(SHIPPING_CONDITION, confShippingConditionService.sendPayStage()),
            () -> resultMap.put(PURCHASE_DEFINITION, defPurOrderService.sendPayStage()),
            () -> resultMap.put(PURCHASE_AUDIT, confPurOrderAuditService.sendPayStage()),
            () -> resultMap.put(SALE_AUDIT, confSaleOrderAuditService.sendPayStage())
    );
    // 合并任务
    cf1.thenCombineAsync(cf2, (cf1Obj, cf2Obj) -> {
        String saleType = cf1Obj.getSoCode();
        String saleCreate = resultMap.get(SALE_CREATE);
        // 重新设置销售订单类型,之前可能为空, SHIPPING_CONDITION也需要依赖
        getInstance().get().sendPayDTO.setSoTypeCode(saleType);
//            getInstance().set(param);
        // 是否转Vso控制,是则订单码填充00
        Boolean isVso = isControl(saleCreate, TRANSFER_VSO_CONTROL);
        if (isVso) {
            resultMap.put(VSO_TO_SO, VSO_TO_SO_DEFAULT);
            resultMap.put(PRE_SELL_AUDIT, VSO_DEFAULT);
        } else {
            taskList.add(() -> resultMap.put(VSO_TO_SO, confVsoToSoService.sendPayStage()));
            taskList.add(() -> resultMap.put(PRE_SELL_AUDIT, presellOrderService.sendPayStage()));
        }

        // 阻塞获取结果
        SimpleThreadPool.executeAll(executor, taskList).forEach(this::getFuture);
        for (StateConfigEnum configEnum : values()) {
            String sendPay = resultMap.get(configEnum);
            if (StringUtil.isNotBlank(sendPay)) {
                orderCode.append(sendPay);
            }
        }
        return null;
        
    }, executor)
    // 阻塞获取结果
    .join();
}

 

 

 

 

 

 

 

 

 

 

 

 

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐