一、CompletableFuture 核心概念

1.1 CompletableFuture是甚?

CompletableFuture 是 Java 8 引入的异步编程工具,实现了 FutureCompletionStage 接口。它解决了传统 Future 的局限性,提供了:

  • 异步任务编排:链式组合多个异步操作
  • 非阻塞编程:避免线程阻塞等待
  • 异常处理:完善的错误处理机制
  • 手动控制:主动完成异步操作

1.2 核心特性

CompletableFuture
链式调用
组合操作
异常处理
异步回调
手动控制

二、核心方法总结

2.1 创建CompletableFuture

// 1. 使用 runAsync 执行无返回值的异步任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    System.out.println("Running in: " + Thread.currentThread().getName());
});

// 2. 使用 supplyAsync 执行有返回值的异步任务, 使用的是默认线程池
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "Result from: " + Thread.currentThread().getName();
});

// 3. 使用自定义线程池, 注意【创建线程池的时候,最好将给个名称,一般取业务名称即可】
ExecutorService customPool = Executors.newFixedThreadPool(4);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try { Thread.sleep(1000); } catch (InterruptedException e) {}
    return 42;
}, customPool);

2.2 结果处理

2.2.1 基础转换方法

CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")       // 同步转换
    .thenApplyAsync(String::toUpperCase) // 异步转换
    .thenAccept(System.out::println)     // 消费结果
    .thenRun(() -> System.out.println("Processing complete"));

2.2.2 组合多个 Future

// 组合两个独立任务, 顺序随意
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> 30);
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> "Alice");

CompletableFuture<String> combined = ageFuture.thenCombine(nameFuture, 
    (age, name) -> name + " is " + age + " years old");

// 顺序组合(前一个结果作为下一个输入), 顺序固定
CompletableFuture<Double> bmiFuture = getUserWeight(userId)
    .thenCompose(weight -> getUserHeight(userId)
    .thenApply(height -> weight / (height * height));

2.3 异常处理

CompletableFuture.supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("Simulated error");
        }
        return "Success";
    })
    .exceptionally(ex -> { // 只有异常的时候的才会触发操作
        System.err.println("Exception: " + ex.getMessage());
        return "Fallback value";
    })
    .handle((result, ex) -> { // 不管是否会异常都会触发操作
        if (ex != null) {
            return "Handled error: " + ex.getMessage();
        }
        return "Result: " + result;
    });

2.4 多任务协调

  • allOf
  • anyOf
List<CompletableFuture<String>> futures = Arrays.asList(
    fetchData("API1"),
    fetchData("API2"),
    fetchData("API3")
);

// 1. 所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.thenRun(() -> {
    List<String> results = futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
    System.out.println("All results: " + results);
});

// 2. 任一任务完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
anyOf.thenAccept(result -> System.out.println("First result: " + result));

三、高级应用场景

3.1 微服务调用编排

Client ServiceA ServiceB ServiceC 请求用户数据 返回用户信息 请求订单数据(依赖用户ID) 返回订单列表 请求产品数据(并行) 返回产品信息 组合用户+订单+产品数据 Client ServiceA ServiceB ServiceC

示例代码

CompletableFuture<User> userFuture = getUserAsync(userId);

CompletableFuture<List<Order>> ordersFuture = userFuture
    .thenCompose(user -> getOrdersAsync(user.getId()));

CompletableFuture<List<Product>> productsFuture = getProductsAsync();

CompletableFuture<UserProfile> profileFuture = userFuture
    .thenCombine(ordersFuture, (user, orders) -> new UserData(user, orders))
    .thenCombine(productsFuture, (data, products) -> {
        data.setRecommendedProducts(products);
        return data.toProfile();
    });

profileFuture.thenAccept(profile -> 
    System.out.println("Generated profile: " + profile));

3.2 超时控制

public static <T> CompletableFuture<T> withTimeout(
        CompletableFuture<T> future, 
        long timeout, 
        TimeUnit unit) {
    
    CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
    
    // 设置超时
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> timeoutTask = scheduler.schedule(() -> {
        if (!future.isDone()) {
            timeoutFuture.completeExceptionally(new TimeoutException());
        }
    }, timeout, unit);
    
    // 正常完成时取消超时任务
    future.whenComplete((result, ex) -> {
        timeoutTask.cancel(true);
        if (ex != null) {
            timeoutFuture.completeExceptionally(ex);
        } else {
            timeoutFuture.complete(result);
        }
    });
    
    return timeoutFuture;
}

// 使用示例
CompletableFuture<String> future = fetchDataAsync();
withTimeout(future, 2, TimeUnit.SECONDS)
    .exceptionally(ex -> {
        if (ex instanceof TimeoutException) {
            return "Fallback data";
        }
        return "Error: " + ex.getMessage();
    });

3.3 批处理与限流

// 模拟100个异步任务
List<CompletableFuture<String>> tasks = IntStream.range(0, 100)
    .mapToObj(i -> fetchDataAsync("Task-" + i))
    .collect(Collectors.toList());

// 使用Semaphore限流(最大并发10)
Semaphore semaphore = new Semaphore(10);
List<CompletableFuture<String>> rateLimitedTasks = tasks.stream()
    .map(future -> {
        try {
            semaphore.acquire();
            return future.whenComplete((r, e) -> semaphore.release());
        } catch (InterruptedException e) {
            return CompletableFuture.failedFuture(e);
        }
    })
    .collect(Collectors.toList());

// 分批处理(每批20个)
int batchSize = 20;
List<CompletableFuture<Void>> batches = new ArrayList<>();
for (int i = 0; i < rateLimitedTasks.size(); i += batchSize) {
    List<CompletableFuture<String>> batch = rateLimitedTasks.subList(i, 
        Math.min(i + batchSize, rateLimitedTasks.size()));
    
    CompletableFuture<Void> batchFuture = CompletableFuture.allOf(
        batch.toArray(new CompletableFuture[0])
    );
    batches.add(batchFuture);
}

// 所有批次完成后处理
CompletableFuture.allOf(batches.toArray(new CompletableFuture[0]))
    .thenRun(() -> System.out.println("All batches completed"));

四、最佳实践

4.1 线程池管理

// 1. 为不同类型任务使用不同线程池, 【给线程起个名字吧】
ExecutorService ioPool = Executors.newFixedThreadPool(20);   // I/O密集型
ExecutorService cpuPool = Executors.newWorkStealingPool();  // CPU密集型

// 2. 避免使用默认的ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> dbQuery(), ioPool);

// 3. 资源清理
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    ioPool.shutdown();
    cpuPool.shutdown();
}));

4.2 异常处理策略

CompletableFuture.supplyAsync(() -> riskyOperation())
    .exceptionally(ex -> { // 捕获所有异常,返回默认值
        log.error("Operation failed", ex);
        return DEFAULT_VALUE;
    })
    .handle((result, ex) -> { // 同时处理结果和异常
        if (ex != null) {
            return handleException(ex);
        }
        return processResult(result);
    })
    .whenComplete((result, ex) -> { // 最终清理
        cleanupResources();
        if (ex != null) {
            metrics.recordFailure();
        }
    });

4.3 性能优化技巧

// 1. 避免阻塞操作
CompletableFuture.supplyAsync(() -> blockingDBCall())
    .thenApplyAsync(result -> process(result), cpuPool); // 切换到CPU密集型线程池

// 2. 缓存常用Future
Map<String, CompletableFuture<Product>> productCache = new ConcurrentHashMap<>();

public CompletableFuture<Product> getProduct(String id) {
    return productCache.computeIfAbsent(id, 
        key -> fetchProductAsync(key).toCompletableFuture());
}

// 3. 使用完成处理器替代join()
CompletableFuture<String> future = fetchData();
future.thenAccept(result -> {
    // 非阻塞处理结果
    updateUI(result);
});

// 避免这样使用(会阻塞线程):
// String result = future.join();

五、常见陷阱与解决方案

5.1 回调地狱

// 错误示例:深度嵌套回调
getUserAsync(userId).thenAccept(user -> {
    getOrdersAsync(user.getId()).thenAccept(orders -> {
        getProductsAsync().thenAccept(products -> {
            // 三层嵌套...
        });
    });
});

// 正确方案:使用组合方法
CompletableFuture<Void> chained = getUserAsync(userId)
    .thenCompose(user -> getOrdersAsync(user.getId()))
    .thenCompose(orders -> getProductsAsync())
    .thenAccept(products -> processAll(products));

5.2 线程泄漏

// 错误:未关闭自定义线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletableFuture.runAsync(() -> longRunningTask(), pool);

// 正确:管理线程池生命周期
try (ExecutorService pool = Executors.newFixedThreadPool(5)) {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 任务逻辑
    }, pool);
    future.join(); // 等待完成
} // 自动关闭线程池

5.3 丢失异常

// 错误:未处理异常
CompletableFuture.runAsync(() -> {
    throw new RuntimeException("Oops!");
});

// 正确:显式处理异常
CompletableFuture.runAsync(() -> {
    throw new RuntimeException("Handled!");
}).exceptionally(ex -> {
    System.err.println("Caught exception: " + ex.getMessage());
    return null;
});

六、CompletableFuture 与其他技术对比

特性 CompletableFuture FutureTask RxJava Project Reactor
链式调用
异步组合
背压支持
热/冷发布
操作符丰富度 中等 丰富 丰富
Java 版本要求 8+ 5+ 6+ 8+
学习曲线 中等 简单 陡峭 陡峭
线程模型灵活性

七、典型应用场景总结

7.1 微服务编排:组合多个服务调用

CompletableFuture<Response> response = userService.getUser(userId)
    .thenCompose(user -> orderService.getOrders(user.getId()))
    .thenApply(orders -> buildResponse(orders));

7.2 并行数据处理:同时执行多个独立任务

List<CompletableFuture<Data>> futures = dataSources.stream()
    .map(source -> fetchDataAsync(source))
    .collect(Collectors.toList());

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
    .thenApply(v -> futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList()));

7.3 异步IO操作:非阻塞网络/文件操作

CompletableFuture<String> fileContent = CompletableFuture.supplyAsync(() -> {
    try {
        return new String(Files.readAllBytes(Paths.get("data.txt")));
    } catch (IOException e) {
        throw new CompletionException(e);
    }
}, ioPool);

7.4 事件驱动架构:响应式事件处理

eventBus.register(OrderEvent.class, event -> {
    CompletableFuture.runAsync(() -> {
        processOrder(event.getOrder());
        notifyWarehouse(event.getOrder());
    }, processingPool);
});

7.5 定时/延迟任务:结合ScheduledExecutorService

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

CompletableFuture<Result> delayedFuture = new CompletableFuture<>();
scheduler.schedule(() -> {
    Result result = computeResult();
    delayedFuture.complete(result);
}, 5, TimeUnit.SECONDS);

八、总结

8.1 核心内容

CompletableFuture 是 Java 异步编程的核心工具,提供了强大的异步操作组合能力:

  • 使用 supplyAsync/runAsync 创建异步任务
  • 通过 thenApply/thenCompose/thenCombine 构建处理链
  • 使用 allOf/anyOf 协调多个任务
  • 通过 exceptionally/handle 处理错误
  • 结合自定义线程池优化资源使用

8.2 最佳实践建议

  1. 始终为长时间操作指定专用线程池
  2. 优先使用组合方法而非嵌套回调
  3. 为每个 CompletableFuture 链添加异常处理
  4. 避免在异步链中使用阻塞操作
  5. 使用完成处理器(thenAccept/thenRun)替代 get()/join()

8.3 thenApply/thenCompose/thenAccept/thenRun/thenCombine

核心区别一句话概括:

  • thenApply: 同步变换。对上一个阶段的结果进行同步转换,输入输出都是普通值。
  • thenCompose: 异步变换。对上一个阶段的结果进行异步转换,输入是值,输出是一个新的 CompletableFuture(用于“展平”嵌套的 Future)。
  • thenCombine: 组合汇聚。将两个独立的 CompletableFuture 的结果进行同步组合。
  • thenAccept: 消费者 - Consumer, 在当前阶段完成后,将其结果作为输入,执行一个有输入、无输出的消费操作。它接收一个 Consumer<T> 函数式接口
  • thenRun: (动作执行者), 在当前阶段完成后,执行一个既不需要输入、也不需要输出的操作。它接收一个 Runnable 函数式接口。

8.3.1 thenApply

核心概念:在当前阶段完成后,同步地对其结果进行转换处理,并返回一个新的值。它接收一个 Function<T, U> 函数式接口。

类比:类似于 Stream API 中的 map() 操作。你拿到一个输入,经过一个函数处理,直接返回一个输出。

flowchart LR
    A[Stage 1<br>CompletableFuture&lt;T&gt;] --> B["thenApply(Function<T, U>)"]
    B --> C[Stage 2<br>CompletableFuture&lt;U&gt;]
    subgraph B [同步操作]
        direction LR
        B1[输入 T] --> B2[Function T->U] --> B3[输出 U]
    end
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");

// thenApply: 同步地将字符串转换为大写
CompletableFuture<String> upperCaseFuture = future.thenApply(s -> {
    // 这是一个同步操作,立即执行
    return s.toUpperCase();
});

// 输出: HELLO
upperCaseFuture.thenAccept(System.out::println); 

典型场景:数据格式转换、计算、提取字段等所有不需要再发起异步操作的处理。

8.3.2 thenCompose (异步变换 / 展平)

核心概念:在当前阶段完成后,异步地对其结果进行转换处理。它接收一个 Function<T, CompletableFuture<U>> 函数。它用于解决嵌套的 CompletableFuture(即 CompletableFuture<CompletableFuture<U>>)问题,将其“展平”为单一的 CompletableFuture<U>

flowchart LR
    A[Stage 1<br>CompletableFuture&lt;T&gt;] --> B["thenCompose(Function<T, CompletableFuture<U>>)"]
    B --> C[启动新的异步任务]
    C --> D[Stage 2<br>CompletableFuture&lt;U&gt;]
// 模拟一个异步服务:根据用户ID获取用户详情
CompletableFuture<User> getUserDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        // 模拟网络IO
        return new User(userId, "Alice");
    });
}

CompletableFuture<String> userIdFuture = CompletableFuture.supplyAsync(() -> "123");

// 错误做法:使用 thenApply 会导致嵌套的 Future: CompletableFuture<CompletableFuture<User>>
CompletableFuture<CompletableFuture<User>> badFuture = userIdFuture.thenApply(userId -> getUserDetail(userId));

// 正确做法:使用 thenCompose 将嵌套的 Future “展平” 成单一的 CompletableFuture<User>
CompletableFuture<User> userFuture = userIdFuture.thenCompose(userId -> getUserDetail(userId));

// 现在可以直接操作 User 对象
userFuture.thenAccept(user -> System.out.println("User name: " + user.getName())); // Output: User name: Alice

典型场景异步链式调用。当一个异步操作(如调用A服务)的结果,是另一个异步操作(如调用B服务)的输入时,必须使用 thenCompose

8.3.3 thenCombine (组合汇聚)

核心概念:等待两个独立的 CompletableFuture 都完成后,将它们的结果作为输入,同步地进行组合处理,并返回一个新的结果。它接收另一个 CompletionStage 和一个 BiFunction<T, U, V>

类比:等待两个并行任务都完工,然后把它们的产出物组装成一个新产品。

flowchart TD
    A[Future 1<br>CompletableFuture&lt;T&gt;] --> C["thenCombine(Future2, BiFunction<T, U, V>)"]
    B[Future 2<br>CompletableFuture&lt;U&gt;] --> C
    C --> D[组合结果 Future<br>CompletableFuture&lt;V&gt;]
    subgraph C [同步组合操作]
        direction LR
        C1[输入 T] --> C2[BiFunction T,U->V]
        C3[输入 U] --> C2
        C2 --> C4[输出 V]
    end
// 两个完全独立的异步任务
CompletableFuture<Integer> weightFuture = CompletableFuture.supplyAsync(() -> 70); // 获取体重
CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 1.75); // 获取身高

// thenCombine: 等待两个独立任务都完成,然后组合它们的计算结果(BMI)
CompletableFuture<Double> bmiFuture = weightFuture.thenCombine(heightFuture, (weight, height) -> {
    // 这是一个同步计算函数
    double heightInMeter = height;
    return weight / (heightInMeter * heightInMeter);
});

// 输出: BMI: 22.857...
bmiFuture.thenAccept(bmi -> System.out.println("BMI: " + bmi)); 

典型场景:聚合多个独立异步任务的结果。例如,从商品服务和库存服务分别获取信息,然后组合成一个完整的商品详情VO。

8.3.4 thenAccept (消费者 - Consumer)

核心概念:在当前阶段完成后,将其结果作为输入,执行一个有输入、无输出的消费操作。它接收一个 Consumer<T> 函数式接口。

语义:“有了这个结果之后,我要拿它做点什么…”(比如打印它、保存它、发送它)

flowchart LR
    A[Stage 1<br>CompletableFuture&lt;T&gt;] --> B["thenAccept(Consumer<T>)"]
    B --> C[Stage 2<br>CompletableFuture&lt;Void&gt;]
    subgraph B [消费操作]
        direction LR
        B1[输入 T] --> B2[Consumer 消费 T] --> B3[输出 Void]
    end
CompletableFuture<String> asyncTask = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时计算
    try { Thread.sleep(1000); } catch (InterruptedException e) {}
    return "Processed_Data_123";
});

// thenAccept: 接收上一步的字符串结果,并消费它(例如保存到数据库或发送消息)
CompletableFuture<Void> saveFuture = asyncTask.thenAccept(result -> {
    System.out.println("成功获取到结果,正在保存: " + result);
    // repository.save(result); // 假设的保存操作
    // kafkaTemplate.send("topic", result); // 假设的发送消息操作
});

// 因为 thenAccept 返回的是 CompletableFuture<Void>,
// 所以你不能继续链式地转换数据,但可以链式地执行后续操作。
saveFuture.thenRun(() -> System.out.println("数据保存完毕!"));

典型场景

  • 将异步计算的结果保存到数据库。
  • 将结果发送到消息队列(Kafka, RabbitMQ)。
  • 打印日志或更新UI。

8.3.5 thenRun (动作执行者 - Runnable)

核心概念:在当前阶段完成后,执行一个既不需要输入、也不需要输出的操作。它接收一个 Runnable 函数式接口。

语义:“不管之前的结果是什么,现在这个阶段完成后,我就要做某件事…”(比如发通知、清理资源)

flowchart LR
    A[Stage 1<br>CompletableFuture&lt;T&gt;] --> B["thenRun(Runnable)"]
    B --> C[Stage 2<br>CompletableFuture&lt;Void&gt;]
    subgraph B [执行操作]
        direction LR
        B2[Runnable 执行] --> B3[输出 Void]
    end
CompletableFuture<String> asyncTask = CompletableFuture.supplyAsync(() -> {
    // 模拟一个复杂的文件处理任务
    try { Thread.sleep(2000); } catch (InterruptedException e) {}
    return "large_file.pdf";
});

// thenRun: 不关心上一步的结果是什么,只关心它“完成”了这个动作。
// 例如,在上面的文件处理完后,发送一个通知或清理临时资源。
CompletableFuture<Void> notificationFuture = asyncTask.thenRun(() -> {
    // 这里无法访问 asyncTask 的结果 "large_file.pdf"
    System.out.println("之前的异步处理任务已经完成,发送通知...");
    // notificationService.send("Task completed!");
    // tempFileCleaner.cleanup(); // 清理临时文件
});

notificationFuture.thenRun(() -> System.out.println("通知发送完毕!"));

典型场景

  • 发送完成通知(邮件、短信)。
  • 释放资源、清理临时文件。
  • 记录一个不依赖于具体结果的操作日志(例如“流程执行完毕”)。
  • 作为异步链的最终结束步骤。

8.3.6 图例

thenApply thenCompose thenCombine
核心作用 同步变换 异步变换(展平) 组合汇聚
输入 上一个阶段的结果 T 上一个阶段的结果 T 两个独立的 Future 的结果 TU
操作函数 Function<T, U> Function<T, CompletableFuture<U>> BiFunction<T, U, V>
输出 CompletableFuture<U> CompletableFuture<U> CompletableFuture<V>
解决痛点 简单的值转换 嵌套的 Future (CF<CF<U>>) 合并两个独立任务的结果
类比 Stream map Stream flatMap 等待两个并行任务并组装
特性 thenAccept thenRun
核心作用 消费前一个阶段的结果 在前一个阶段完成后执行一个动作
是否需要输入 需要(前一个阶段的结果 T 不需要
操作函数接口 Consumer<T> Runnable
返回值 CompletableFuture<Void> CompletableFuture<Void>
语义 有了这个,然后… 做完这个之后,然后…
类比 顾客拿到菜(结果)后开始吃(消费) 厨师做完菜(完成)后洗手(动作),不关心菜被谁吃了

8.4 exceptionally/handle/whenComplete

核心区别一句话概括:

  • exceptionally: 恢复。只在发生异常时被调用,相当于 catch 块,用于提供降级值或恢复业务。
  • handle: 转换。无论成功与否都会被调用,相当于 finally + 结果转换,可以同时访问结果和异常,并返回一个新结果。
  • whenComplete: 回调。无论成功与否都会被调用,相当于 finally,可以访问结果或异常,但无法改变原有结果或异常。

8.4.1 exceptionally (异常恢复 - Catch Block)

核心概念仅在前一阶段抛出异常时被触发。它接收一个 Function<Throwable, T> 函数,用于计算一个降级(fallback)值来恢复流程,使链式调用可以继续下去。

语义:“如果上一步出错了,我就用这个值来代替…


异常 ex
上一阶段
成功?
传递结果 (T)
exceptionally()
Function
返回降级值 (T)
下一阶段
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Data fetch failed!");
    }
    return "Data";
});

// 仅在异常时执行,提供降级值
CompletableFuture<String> safeFuture = future.exceptionally(ex -> {
    System.err.println("Operation failed, using fallback. Cause: " + ex.getMessage());
    return "Fallback Data"; // 提供一个降级值,让流程继续
});

// 输出可能是 "Data" 或 "Fallback Data"
safeFuture.thenAccept(System.out::println);

典型场景:服务降级、默认值返回、异常转换(将系统异常转换为业务异常)。

8.4.2 handle (结果和异常转换 - Finally + Transform)

核心概念无论前一阶段成功还是失败,都会被调用。它接收一个 BiFunction<T, Throwable, U> 函数,可以同时访问结果和异常(其中一个必为 null),并必须返回一个新结果,从而可以统一转换成功和失败两种情况的结果。

语义:“不管上一步是成功还是失败,我都要处理一下,并返回一个新的结果…


结果 result

异常 ex
上一阶段
成功?
handle()
BiFunction
返回新结果 (U)
下一阶段
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Calculation error!");
    }
    return 10;
});

// 无论成功失败都执行,并返回一个新结果
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
    if (ex != null) {
        return "Computation failed: " + ex.getMessage(); // 异常时返回字符串
    } else {
        return "Computation succeeded: " + (result * 2); // 成功时也返回字符串
    }
    // 注意:必须返回一个值,这个值会成为新的 CompletableFuture 的结果
});

// 输出是统一的字符串格式
handledFuture.thenAccept(System.out::println);

典型场景:统一封装响应体(将成功结果和异常信息都封装成固定的API响应格式)、结果日志记录、监控指标上报。

8.4.3 whenComplete (最终回调 - Finally Block)

核心概念无论前一阶段成功还是失败,都会被调用。它接收一个 BiConsumer<T, Throwable> 函数,可以访问结果或异常,但无法改变它们。它执行完后,原来的结果(或异常)会原封不动地传递给下一阶段。

语义:“不管上一步是成功还是失败,我都要看一下,做点事(比如日志、清理),但不会改变最终结果…


结果 result

异常 ex
上一阶段
成功?
whenComplete()
BiConsumer
传递原结果或异常
下一阶段
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Network issue!");
    }
    return "Important Result";
});

// 无论成功失败都执行,但不会改变结果
CompletableFuture<String> loggedFuture = future.whenComplete((result, ex) -> {
    if (ex != null) {
        // 记录错误日志,但异常依然会传播
        System.err.println("Operation completed with exception: " + ex.getMessage());
    } else {
        // 记录成功日志,结果依然是 "Important Result"
        System.out.println("Operation completed successfully: " + result);
    }
    // 注意:这是一个 Consumer,没有返回值
});

// 尝试获取结果,如果上面有异常,这里依然会抛出
try {
    System.out.println("Final result: " + loggedFuture.get());
} catch (Exception e) {
    System.out.println("Still got exception: " + e.getCause().getMessage());
}

典型场景:资源清理、日志记录、指标监控(如记录耗时)、发送通知。它不会阻止异常传播

特性 exceptionally handle whenComplete
调用时机 仅异常时 总是 总是
输入 Throwable(异常对象) (T result, Throwable ex) (一个非null) (T result, Throwable ex) (一个非null)
函数接口 Function<Throwable, T> BiFunction<T, Throwable, U> BiConsumer<T, Throwable>
返回值 (降级值 T (新结果 U
是否改变结果/异常 (用返回值替代异常) (用返回值替代原有结果和异常) (观察者,原样传递)
类比 catch finally + 结果转换 finally
核心用途 服务降级,提供备选值 统一转换,格式化成功/失败输出 副作用操作,如日志、清理

8.4.4 组合使用与最佳实践

CompletableFuture.supplyAsync(() -> queryDatabase(userId))
    .thenApply(data -> transformData(data)) // 转换数据
    .whenComplete((result, ex) -> { // 记录日志或监控,不影响结果
        metrics.record("dbQuery", ex == null);
        log.debug("Query finished for user {}", userId);
    })
    .exceptionally(ex -> { // 如果上述任何一步出错,提供降级
        log.error("Full operation failed", ex);
        return getCachedData(userId); // 尝试返回缓存数据
    })
    .handle((result, ex) -> { // 最终统一响应格式
        if (ex != null) {
            return new ApiResponse<>("FAILURE", null, ex.getMessage());
        } else {
            return new ApiResponse<>("SUCCESS", result, null);
        }
    })
    .thenAccept(response -> sendResponse(response)); // 发送最终响应

总结一下选择策略

  • 吃掉异常并提供一个备用结果 -> 用 exceptionally
  • 统一处理成功和失败两种情况,并返回一个新的统一类型 -> 用 handle
  • 只是想看一下结果或异常(记录日志、清理),但不想改变它们 -> 用 whenComplete

往期资料获取地址:

  • https://pan.baidu.com/s/1FbUp2xFFidxCfeXbzYjmOw 提取码: jzm9
    在这里插入图片描述
Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐