CompletableFuture 是 Java 8 引入的一个强大且灵活的异步编程工具类,位于 java.util.concurrent 包中。它同时实现了 Future<T> 和 CompletionStage<T> 两个接口,不仅支持获取异步计算结果,还提供了丰富的链式调用、组合、异常处理等能力,是构建高性能、非阻塞、响应式应用的核心组件之一。

CompletableFuture 常见接口

CompletableFuture 核心接口包括:创建、链式调用、组合、异常处理、聚合、执行策略控制等。

场景 方法
异步生成值 supplyAsync(Supplier<T>)
异步执行无返回 runAsync(Runnable)
转换结果 thenApply, thenApplyAsync
消费结果 thenAccept, thenAcceptAsync
无参动作 thenRun, thenRunAsync
顺序依赖 thenCompose
合并两个结果 thenCombine, thenAcceptBoth
任一完成 applyToEither, acceptEither
多任务全完成 allOf
多任务任一完成 anyOf
异常恢复 exceptionally
统一处理 handle
最终清理 whenComplete
手动完成 complete(T), completeExceptionally(Throwable)

创建 CompletableFuture

手动完成(无异步)

CompletableFuture<String> future = new CompletableFuture<>();
future.complete("Manual result");
System.out.println(future.join()); // Manual result

异步执行(有返回值)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "Async result";
});
System.out.println(future.join()); // Async result

异步执行(无返回值)

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Running async task");
});
future.join(); // 等待完成

可传入自定义线程池

ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "Custom pool", executor);

链式转换与消费(单阶段)

thenApply – 转换结果(同步)

CompletableFuture<Integer> f = CompletableFuture
    .supplyAsync(() -> "100")
    .thenApply(Integer::parseInt)
    .thenApply(x -> x * 2);
System.out.println(f.join()); // 200

thenApplyAsync – 异步转换(新线程)

CompletableFuture<String> f = CompletableFuture
    .completedFuture("hello")
    .thenApplyAsync(s -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
        return s.toUpperCase();
    });
System.out.println(f.join()); // HELLO(在 ForkJoinPool 线程中执行)

thenAccept – 消费结果(无返回)

CompletableFuture
    .supplyAsync(() -> "World")
    .thenAccept(s -> System.out.println("Hello " + s)); // Hello World

thenRun – 无输入

CompletableFuture
    .supplyAsync(() -> "ignored")
    .thenRun(() -> System.out.println("Task done!"));

扁平化嵌套(依赖另一个 CompletableFuture)

thenCompose – 顺序依赖(类似 flatMap)

thenCompose 适用于第二个异步操作依赖第一个的结果,并且第二个异步操作也希望继续返回 CompletableFuture<T> 的场景。

用 thenCompose 避免 CompletableFuture<CompletableFuture<T>> 嵌套:

CompletableFuture<String> getUser = CompletableFuture.supplyAsync(() -> "Alice");
CompletableFuture<Integer> getLength = getUser.thenCompose(name ->
    CompletableFuture.supplyAsync(() -> name.length())
);

// 1、可以直接通过 getLength.join() 得到5
// System.out.println(getLength.join()); 
// 2、也可以继续更多的链式调用:
CompletableFuture<String> userLevel = getLength.thenCompose(len ->
    CompletableFuture.supplyAsync(() -> {
        if (len >= 5) return "VIP";
        else return "Normal";
    })
);
System.out.println(userLevel.join());  // 直接通过 userLevel.join() 得到: VIP

对比错误写法(产生嵌套 future,难以处理):

CompletableFuture<String> getUser = CompletableFuture.supplyAsync(() -> "Alice");
CompletableFuture<CompletableFuture<Integer>> getLength = getUser.thenApply(name -> 
      CompletableFuture.supplyAsync(() -> name.length()) // CompletableFuture<Integer>
);
// 最终返回类型是 CompletableFuture<CompletableFuture<Integer>>
// 无法直接 .join() 得到 5,也难以增加更多的链式调用

当然,如果第二个异步操作不返回 CompletableFuture ,而是返回 String 等普通类型,那么使用 thenApplyAsync 就可以:

// 转换为普通值,可以不必要使用thenCompose:
CompletableFuture<String> f1 = getUserIdAsync()
    .thenApplyAsync(id -> "User-" + id); // 第二个异步操作里直接返回 String
System.out.println(f1.join());

组合两个 CompletableFuture

thenCombine – 合并两个结果(AND)

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = f1.thenCombine(f2, (a, b) -> a + " " + b);
System.out.println(combined.join()); // Hello World

thenAcceptBoth – 消费两个结果

f1.thenAcceptBoth(f2, (a, b) -> System.out.println(a + " " + b));

runAfterBoth – 两者都完成后执行

f1.runAfterBoth(f2, () -> System.out.println("Both done"));

任一完成即响应(OR)

applyToEither – 返回第一个完成的结果(可转换)

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    sleep(2000); return "Slow";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    sleep(500); return "Fast";
});

String result = f1.applyToEither(f2, s -> s + " wins!");
System.out.println(result); // Fast wins!

acceptEither – 消费第一个结果

f1.acceptEither(f2, System.out::println); // Fast

runAfterEither – 任一完成后执行

f1.runAfterEither(f2, () -> System.out.println("One finished"));

多任务聚合

allOf – 所有完成(无返回值)

CompletableFuture<Void> all = CompletableFuture.allOf(
    CompletableFuture.runAsync(() -> sleep(1000)),
    CompletableFuture.runAsync(() -> sleep(1500)),
    CompletableFuture.runAsync(() -> sleep(800))
);
all.join(); // 等待全部完成(约 1500ms)
System.out.println("All tasks done");

若需获取所有结果:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "A");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "B");

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2);
all.join();

System.out.println(f1.join() + f2.join()); // AB

anyOf – 任一完成即返回(返回 Object)

CompletableFuture<Object> any = CompletableFuture.anyOf(
    CompletableFuture.supplyAsync(() -> "First"),
    CompletableFuture.supplyAsync(() -> {
        sleep(1000); return "Second";
    })
);
System.out.println(any.join()); // First(类型为 Object,需强转)

异常处理

exceptionally – 仅处理异常(类似 catch)

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("Error!");
    })
    .exceptionally(ex -> "Fallback: " + ex.getMessage());

System.out.println(f.join()); // Fallback: java.lang.RuntimeException: Error!

handle – 统一处理正常/异常结果

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("Oops!");
    })
    .handle((result, ex) -> {
        // 可以统一处理结果,常用于提供fallback、错误恢复、统一结果格式等
        if (ex != null) {
            return "Default Value"; // 吞掉异常,返回默认值
        }
        return result; // 此处还可以修改返回值
    });

System.out.println(f.join()); // 输出: Default Value(无异常!)

whenComplete – 类似 finally(不改变结果)

CompletableFuture<String> f = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("Oops!");
    })
    .whenComplete((result, ex) -> {
        // 不可干预结果,常用于记录日志、关闭资源、指标统计等副作用操作
        if (ex != null) {
            System.out.println("Logged error: " + ex.getMessage());
        }
    });

// 异常仍然会抛出!
f.join(); // 抛出 CompletionException -> RuntimeException("Oops!")

whenComplete 不改变返回值,即使抛异常也会传播原始异常。

完成状态检查与获取

CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "Done");

// 阻塞等待(推荐)
String result = f.join(); // 不抛出受检异常

// 或使用 get(抛出 InterruptedException / ExecutionException)
// String result = f.get();

// 检查状态
System.out.println(f.isDone());     // true
System.out.println(f.isCompletedExceptionally()); // false
System.out.println(f.isCancelled()); // false

推荐使用 join() 而非 get(),避免处理受检异常。

执行策略控制(同步 vs 异步回调)

方法 执行线程
thenApply 前一个任务的完成线程(可能是同步或异步)
thenApplyAsync 总是在另一个线程中执行(默认 ForkJoinPool.commonPool(),可指定 Executor)
CompletableFuture.supplyAsync(() -> {
    System.out.println("Stage1: " + Thread.currentThread().getName());
    return "data";
})
.thenApply(s -> {
    System.out.println("thenApply (sync): " + Thread.currentThread().getName());
    return s;
})
.thenApplyAsync(s -> {
    System.out.println("thenApplyAsync: " + Thread.currentThread().getName());
    return s;
})
.join();

输出示例:

Stage1: ForkJoinPool.commonPool-worker-1
thenApply (sync): ForkJoinPool.commonPool-worker-1
thenApplyAsync: ForkJoinPool.commonPool-worker-2

为了避免阻塞主线程(因为可能不确定前一个任务是同步还是异步), I/O 或耗时操作一般建议都使用 xxxAsync

完整实战示例:电商下单流程

ExecutorService ioPool = Executors.newFixedThreadPool(3);

CompletableFuture<String> order = CompletableFuture.supplyAsync(() -> {
    // 1. 创建订单
    return "ORDER-1001";
}, ioPool);

CompletableFuture<String> payment = order.thenCompose(ordId ->
    CompletableFuture.supplyAsync(() -> {
        // 2. 支付(依赖订单ID)
        return "PAID-" + ordId;
    }, ioPool)
);

CompletableFuture<String> inventory = CompletableFuture.supplyAsync(() -> {
    // 3. 扣减库存(并行)
    return "INVENTORY-OK";
}, ioPool);

CompletableFuture<String> shipping = payment.thenCombine(inventory, (pay, inv) -> {
    // 4. 发货(需支付成功 + 库存扣减)
    return "SHIPPED-" + pay;
});

// 异常兜底
CompletableFuture<String> finalResult = shipping.exceptionally(ex -> {
    System.err.println("Order failed: " + ex.getMessage());
    return "FAILED";
});

System.out.println(finalResult.join()); // SHIPPED-PAID-ORDER-1001

ioPool.shutdown();

CompletableFuture 实现原理分析

核心数据结构

CompletableFuture 的核心是非阻塞式异步计算,通过注册回调函数(如 thenApplythenAccept 等)在结果就绪时自动触发后续操作。

关键字段与结构:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    volatile Object result; // 存储结果或异常(BiRecord/AltResult)
    volatile Completion stack; // 指向依赖的 Completion 链表(栈结构)的头指针
}
  • result 字段
    • 若为 null:未完成。
    • 若为 AltResult:表示异常或 null 值。
    • 若为普通对象:表示成功完成的结果。
  • stack 字段
    • 类型为 Completion(抽象类),是一个栈式单向链表(LIFO,后进先出),记录所有依赖当前 CompletableFuture 的后续操作(即“依赖图”),其注册顺序是后注册的靠前(栈顶),执行顺序是先执行栈顶(后注册的)。
    • 所有 thenApplythenCompose 等方法都会创建一个 Completion 子类实例(如 UniApplyBiAcceptThenCompose 等),并压入此栈。

Completion (抽象类)是所有回调动作的基类,代表“当某个 future 完成后要做的事”。常见子类包括:

子类 作用
UniApply 对应 thenApply
UniAccept 对应 thenAccept
BiApply 对应 thenCombine
ThenCompose 对应 thenCompose
AsyncRun 对应 runAsync

核心流程源码分析

CompletableFuture 生命周期的核心流程是:

  • 注册回调(构建依赖)
  • 完成任务(设置结果)
  • 触发依赖(传播完成)

一个典型流程如下(JDK 8):

CompletableFuture<Integer> future1 = new CompletableFuture<>();
// 注册回调(构建依赖)
CompletableFuture<String> future2 = future1.thenApplyAsync(x -> "val=" + x);
CompletableFuture<Void> future3 = future2.thenAccept(System.out::println);

// 完成任务(设置结果),complete 内部的 postComplete 会触发依赖(传播完成)
future1.complete(42); // 触发 complete() → completeValue() → postComplete()
// postComplete() 会依次触发 f2(UniApply),然后 f3(UniAccept)

“注册回调”源码分析:

public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
    // 当当前 CompletableFuture 完成后,在指定线程池(这里是 asyncPool)中异步执行 fn 函数,
    // 并返回一个新的 CompletableFuture<U> 来表示这个转换的结果。
    return uniApplyStage(asyncPool, fn);
}

private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    // 创建一个新的 CompletableFuture<V> 对象 d,作为最终返回值(即 thenApplyAsync 返回的那个 future)
    CompletableFuture<V> d =  new CompletableFuture<V>();
    // 只要需要异步执行(e != null),或者无法立即完成(当前 future 未完成),就进入后续的“注册回调”逻辑。
    if (e != null || !d.uniApply(this, f, null)) {
        // 创建一个 UniApply 对象 c ,代表一个“单输入应用操作”(unary apply)
        // UniApply 是 Completion 的一种具体实现
        CompletableFuture.UniApply<T,V> c = new CompletableFuture.UniApply<T,V>(e, d, this, f);
        // 将 c(即这个依赖任务)压入当前 CompletableFuture(this)的栈式依赖链表中,
        // 这样,当 this 完成时,会遍历所有注册的依赖任务(如 c)并触发它们
        push(c);
        // 尝试立即触发这个依赖任务(一种优化,避免“刚注册就完成”时的延迟)
        c.tryFire(SYNC);
    }
    return d;
}

“完成任务”与“触发依赖”源码分析:

public boolean complete(T value) {
    // 尝试以正常值完成 future:返回 true 表示本次 CAS 成功,即我们是第一个完成者。
    boolean triggered = completeValue(value);
    // 无论是否由本线程完成,只要当前 CompletableFuture 已完成(包括刚被我们完成),
    // 就调用 postComplete(),触发所有已注册的依赖任务(如 thenApply, thenAccept 等)。
    // postComplete() 是一个非递归的、线程安全的、广度+深度混合遍历器,
    // 用于在 future 完成时,可靠地唤醒整个依赖网络,且不会栈溢出、不会死锁、不会漏任务。
    postComplete();
    return triggered;
}

// 以非异常结果完成 future,除非它已经完成
final boolean completeValue(T t) {
    return UNSAFE.compareAndSwapObject(this, RESULT, null,
            (t == null) ? NIL : t);
}

// 当确定当前 future 已完成时,弹出并尝试触发所有可达的依赖任务
// 此方法应在 future 完成后调用(如 complete()、obtrudeValue()、内部完成逻辑等)
final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; CompletableFuture.Completion h;
    while ((h = f.stack) != null ||
            (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; CompletableFuture.Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

注册 → 完成 → 传播的流程总结:

  • 当调用 thenApplyAsync 等方法时,会创建一个表示后续操作的 Completion(如 UniApply),若当前任务未完成,则将其压入自身的 stack 依赖栈中(注册
  • 当任务通过 complete(value) 被完成时,使用 CAS 原子地设置 result 字段(完成);
  • 随后立即调用 postComplete(),从 stack 中逐个弹出并执行所有已注册的 Completion,每个 Completion 在执行时会消费当前结果、计算新值,并完成其关联的下游 CompletableFuture,从而递归触发整个依赖链的级联执行(传播)。
  • 整个过程无锁、非阻塞,依靠 volatile + CAS + 回调栈实现高效异步流水线。

任务依赖结构图解

CompletableFuture 的依赖关系可从两个层面理解:

  • Future 层的依赖(逻辑关系):不同 CompletableFuture 实例之间的依赖关系。具体来说,当一个 future 完成后,它会触发另一个 future 的完成。这种依赖关系是由 Completion 对象来管理的。
  • Completion 链表层的依赖(存储关系):每个 CompletableFuture 内部维护了一个单向链表,用于存储所有依赖于该 future 的 Completion 对象。这些 Completion 对象代表了“当这个 future 完成后要执行的操作”。
层级 名称 结构 作用
第一层 Future 依赖图 DAG(有向无环图) 描述“哪个 future 依赖哪个”的逻辑关系
第二层 Completion 链表 每个 future 内部的单向链表(栈) 存储“当这个 future 完成后要执行哪些具体操作”

以下面代码为例:

CompletableFuture<String> f1 = new CompletableFuture<>();

// 第一层:f1 完成后,触发两个独立的后续 future
CompletableFuture<Integer> f2 = f1.thenApply(s -> s.length());          // 分支 A
CompletableFuture<String> f3 = f1.thenApply(s -> s.toUpperCase());      // 分支 B

// 第二层:f2 和 f3 各自又有多个下游
CompletableFuture<Void> f4 = f2.thenAccept(x -> System.out.println("Len: " + x));   // f2 → f4
CompletableFuture<Void> f5 = f2.thenAccept(x -> System.out.println("Double: " + x * 2)); // f2 → f5

CompletableFuture<Void> f6 = f3.thenAccept(s -> System.out.println("Upper: " + s)); // f3 → f6

这个例子中,f1 是源头;f2 和 f3 并行依赖于 f1f2 有两个下游 f4f5f3 有一个下游 f6

逻辑层面的依赖:Future 之间的依赖关系(DAG 图)

存储层面的依赖每个 Future 内部的 Completion 链表

  • Completion 链表是实现 DAG 依赖的底层机制:每个“依赖边”都对应一个 Completion 对象。
  • 整个系统是一个由 Completion 链表组成的网络,通过 postComplete() 动态传播完成状态。

每个 Completion 都持有:

  • src: 源 CompletionFuture(当前这个 completion 所属的 future
  • dep: 目标 CompletionFuture(要被完成的那个 future
  • fn: 要执行的函数

执行流程(当 f1.complete("hello") 被调用):

  1. f1 完成,值为 "hello"
  2. f1.postComplete() 开始处理 f1.stack
  • 先弹出 c2f3 的任务):
    • 执行 toUpperCase("hello") → "HELLO"
    • 完成 f3(设置其 result)
    • 触发 f3.postComplete()
      • 执行 c5:打印 "Upper: HELLO"
  • 再弹出 c1f2 的任务):
    • 执行 length("hello") → 5
    • 完成 f2
    • 触发 f2.postComplete()
      • 先执行 c4:打印 "Double: 10"
      • 再执行 c3:打印 "Len: 5"

注意:虽然 f2 和 f3 是并行分支,但在这个单线程完成场景下,它们是串行执行的(因为 postComplete 是循环处理)。但在异步或并发场景中,它们可能真正并行(如果用了不同线程池)。

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐