JUC之CompletableFuture总结与补充
本文介绍了Java 8引入的CompletableFuture异步编程工具。CompletableFuture实现了Future和CompletionStage接口,支持异步任务编排、非阻塞编程、异常处理和手动控制等特性。文章详细讲解了其核心方法:创建方式(runAsync/supplyAsync)、结果处理(thenApply/thenAccept)、任务组合(thenCombine/thenC
文章目录
一、CompletableFuture 核心概念
1.1 CompletableFuture是甚?
CompletableFuture 是 Java 8 引入的异步编程工具,实现了 Future
和 CompletionStage
接口。它解决了传统 Future 的局限性,提供了:
- 异步任务编排:链式组合多个异步操作
- 非阻塞编程:避免线程阻塞等待
- 异常处理:完善的错误处理机制
- 手动控制:主动完成异步操作
1.2 核心特性
二、核心方法总结
2.1 创建CompletableFuture
// 1. 使用 runAsync 执行无返回值的异步任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("Running in: " + Thread.currentThread().getName());
});
// 2. 使用 supplyAsync 执行有返回值的异步任务, 使用的是默认线程池
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Result from: " + Thread.currentThread().getName();
});
// 3. 使用自定义线程池, 注意【创建线程池的时候,最好将给个名称,一般取业务名称即可】
ExecutorService customPool = Executors.newFixedThreadPool(4);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return 42;
}, customPool);
2.2 结果处理
2.2.1 基础转换方法
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 同步转换
.thenApplyAsync(String::toUpperCase) // 异步转换
.thenAccept(System.out::println) // 消费结果
.thenRun(() -> System.out.println("Processing complete"));
2.2.2 组合多个 Future
// 组合两个独立任务, 顺序随意
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> 30);
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> "Alice");
CompletableFuture<String> combined = ageFuture.thenCombine(nameFuture,
(age, name) -> name + " is " + age + " years old");
// 顺序组合(前一个结果作为下一个输入), 顺序固定
CompletableFuture<Double> bmiFuture = getUserWeight(userId)
.thenCompose(weight -> getUserHeight(userId)
.thenApply(height -> weight / (height * height));
2.3 异常处理
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Simulated error");
}
return "Success";
})
.exceptionally(ex -> { // 只有异常的时候的才会触发操作
System.err.println("Exception: " + ex.getMessage());
return "Fallback value";
})
.handle((result, ex) -> { // 不管是否会异常都会触发操作
if (ex != null) {
return "Handled error: " + ex.getMessage();
}
return "Result: " + result;
});
2.4 多任务协调
- allOf
- anyOf
List<CompletableFuture<String>> futures = Arrays.asList(
fetchData("API1"),
fetchData("API2"),
fetchData("API3")
);
// 1. 所有任务完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOf.thenRun(() -> {
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println("All results: " + results);
});
// 2. 任一任务完成
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
anyOf.thenAccept(result -> System.out.println("First result: " + result));
三、高级应用场景
3.1 微服务调用编排
示例代码
CompletableFuture<User> userFuture = getUserAsync(userId);
CompletableFuture<List<Order>> ordersFuture = userFuture
.thenCompose(user -> getOrdersAsync(user.getId()));
CompletableFuture<List<Product>> productsFuture = getProductsAsync();
CompletableFuture<UserProfile> profileFuture = userFuture
.thenCombine(ordersFuture, (user, orders) -> new UserData(user, orders))
.thenCombine(productsFuture, (data, products) -> {
data.setRecommendedProducts(products);
return data.toProfile();
});
profileFuture.thenAccept(profile ->
System.out.println("Generated profile: " + profile));
3.2 超时控制
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future,
long timeout,
TimeUnit unit) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
// 设置超时
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> timeoutTask = scheduler.schedule(() -> {
if (!future.isDone()) {
timeoutFuture.completeExceptionally(new TimeoutException());
}
}, timeout, unit);
// 正常完成时取消超时任务
future.whenComplete((result, ex) -> {
timeoutTask.cancel(true);
if (ex != null) {
timeoutFuture.completeExceptionally(ex);
} else {
timeoutFuture.complete(result);
}
});
return timeoutFuture;
}
// 使用示例
CompletableFuture<String> future = fetchDataAsync();
withTimeout(future, 2, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
return "Fallback data";
}
return "Error: " + ex.getMessage();
});
3.3 批处理与限流
// 模拟100个异步任务
List<CompletableFuture<String>> tasks = IntStream.range(0, 100)
.mapToObj(i -> fetchDataAsync("Task-" + i))
.collect(Collectors.toList());
// 使用Semaphore限流(最大并发10)
Semaphore semaphore = new Semaphore(10);
List<CompletableFuture<String>> rateLimitedTasks = tasks.stream()
.map(future -> {
try {
semaphore.acquire();
return future.whenComplete((r, e) -> semaphore.release());
} catch (InterruptedException e) {
return CompletableFuture.failedFuture(e);
}
})
.collect(Collectors.toList());
// 分批处理(每批20个)
int batchSize = 20;
List<CompletableFuture<Void>> batches = new ArrayList<>();
for (int i = 0; i < rateLimitedTasks.size(); i += batchSize) {
List<CompletableFuture<String>> batch = rateLimitedTasks.subList(i,
Math.min(i + batchSize, rateLimitedTasks.size()));
CompletableFuture<Void> batchFuture = CompletableFuture.allOf(
batch.toArray(new CompletableFuture[0])
);
batches.add(batchFuture);
}
// 所有批次完成后处理
CompletableFuture.allOf(batches.toArray(new CompletableFuture[0]))
.thenRun(() -> System.out.println("All batches completed"));
四、最佳实践
4.1 线程池管理
// 1. 为不同类型任务使用不同线程池, 【给线程起个名字吧】
ExecutorService ioPool = Executors.newFixedThreadPool(20); // I/O密集型
ExecutorService cpuPool = Executors.newWorkStealingPool(); // CPU密集型
// 2. 避免使用默认的ForkJoinPool.commonPool()
CompletableFuture.supplyAsync(() -> dbQuery(), ioPool);
// 3. 资源清理
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
ioPool.shutdown();
cpuPool.shutdown();
}));
4.2 异常处理策略
CompletableFuture.supplyAsync(() -> riskyOperation())
.exceptionally(ex -> { // 捕获所有异常,返回默认值
log.error("Operation failed", ex);
return DEFAULT_VALUE;
})
.handle((result, ex) -> { // 同时处理结果和异常
if (ex != null) {
return handleException(ex);
}
return processResult(result);
})
.whenComplete((result, ex) -> { // 最终清理
cleanupResources();
if (ex != null) {
metrics.recordFailure();
}
});
4.3 性能优化技巧
// 1. 避免阻塞操作
CompletableFuture.supplyAsync(() -> blockingDBCall())
.thenApplyAsync(result -> process(result), cpuPool); // 切换到CPU密集型线程池
// 2. 缓存常用Future
Map<String, CompletableFuture<Product>> productCache = new ConcurrentHashMap<>();
public CompletableFuture<Product> getProduct(String id) {
return productCache.computeIfAbsent(id,
key -> fetchProductAsync(key).toCompletableFuture());
}
// 3. 使用完成处理器替代join()
CompletableFuture<String> future = fetchData();
future.thenAccept(result -> {
// 非阻塞处理结果
updateUI(result);
});
// 避免这样使用(会阻塞线程):
// String result = future.join();
五、常见陷阱与解决方案
5.1 回调地狱
// 错误示例:深度嵌套回调
getUserAsync(userId).thenAccept(user -> {
getOrdersAsync(user.getId()).thenAccept(orders -> {
getProductsAsync().thenAccept(products -> {
// 三层嵌套...
});
});
});
// 正确方案:使用组合方法
CompletableFuture<Void> chained = getUserAsync(userId)
.thenCompose(user -> getOrdersAsync(user.getId()))
.thenCompose(orders -> getProductsAsync())
.thenAccept(products -> processAll(products));
5.2 线程泄漏
// 错误:未关闭自定义线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletableFuture.runAsync(() -> longRunningTask(), pool);
// 正确:管理线程池生命周期
try (ExecutorService pool = Executors.newFixedThreadPool(5)) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 任务逻辑
}, pool);
future.join(); // 等待完成
} // 自动关闭线程池
5.3 丢失异常
// 错误:未处理异常
CompletableFuture.runAsync(() -> {
throw new RuntimeException("Oops!");
});
// 正确:显式处理异常
CompletableFuture.runAsync(() -> {
throw new RuntimeException("Handled!");
}).exceptionally(ex -> {
System.err.println("Caught exception: " + ex.getMessage());
return null;
});
六、CompletableFuture 与其他技术对比
特性 | CompletableFuture | FutureTask | RxJava | Project Reactor |
---|---|---|---|---|
链式调用 | ✓ | ✗ | ✓ | ✓ |
异步组合 | ✓ | ✗ | ✓ | ✓ |
背压支持 | ✗ | ✗ | ✓ | ✓ |
热/冷发布 | ✗ | ✗ | ✓ | ✓ |
操作符丰富度 | 中等 | 低 | 丰富 | 丰富 |
Java 版本要求 | 8+ | 5+ | 6+ | 8+ |
学习曲线 | 中等 | 简单 | 陡峭 | 陡峭 |
线程模型灵活性 | 高 | 低 | 高 | 高 |
七、典型应用场景总结
7.1 微服务编排:组合多个服务调用
CompletableFuture<Response> response = userService.getUser(userId)
.thenCompose(user -> orderService.getOrders(user.getId()))
.thenApply(orders -> buildResponse(orders));
7.2 并行数据处理:同时执行多个独立任务
List<CompletableFuture<Data>> futures = dataSources.stream()
.map(source -> fetchDataAsync(source))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
7.3 异步IO操作:非阻塞网络/文件操作
CompletableFuture<String> fileContent = CompletableFuture.supplyAsync(() -> {
try {
return new String(Files.readAllBytes(Paths.get("data.txt")));
} catch (IOException e) {
throw new CompletionException(e);
}
}, ioPool);
7.4 事件驱动架构:响应式事件处理
eventBus.register(OrderEvent.class, event -> {
CompletableFuture.runAsync(() -> {
processOrder(event.getOrder());
notifyWarehouse(event.getOrder());
}, processingPool);
});
7.5 定时/延迟任务:结合ScheduledExecutorService
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
CompletableFuture<Result> delayedFuture = new CompletableFuture<>();
scheduler.schedule(() -> {
Result result = computeResult();
delayedFuture.complete(result);
}, 5, TimeUnit.SECONDS);
八、总结
8.1 核心内容
CompletableFuture 是 Java 异步编程的核心工具,提供了强大的异步操作组合能力:
- 使用
supplyAsync
/runAsync
创建异步任务 - 通过
thenApply
/thenCompose
/thenCombine
构建处理链 - 使用
allOf
/anyOf
协调多个任务 - 通过
exceptionally
/handle
处理错误 - 结合自定义线程池优化资源使用
8.2 最佳实践建议
- 始终为长时间操作指定专用线程池
- 优先使用组合方法而非嵌套回调
- 为每个 CompletableFuture 链添加异常处理
- 避免在异步链中使用阻塞操作
- 使用完成处理器(thenAccept/thenRun)替代 get()/join()
8.3 thenApply/thenCompose/thenAccept/thenRun/thenCombine
核心区别一句话概括:
thenApply
: 同步变换。对上一个阶段的结果进行同步转换,输入输出都是普通值。thenCompose
: 异步变换。对上一个阶段的结果进行异步转换,输入是值,输出是一个新的CompletableFuture
(用于“展平”嵌套的 Future)。thenCombine
: 组合汇聚。将两个独立的CompletableFuture
的结果进行同步组合。- thenAccept: 消费者 - Consumer, 在当前阶段完成后,将其结果作为输入,执行一个有输入、无输出的消费操作。它接收一个
Consumer<T>
函数式接口 - thenRun: (动作执行者), 在当前阶段完成后,执行一个既不需要输入、也不需要输出的操作。它接收一个
Runnable
函数式接口。
8.3.1 thenApply
核心概念:在当前阶段完成后,同步地对其结果进行转换处理,并返回一个新的值。它接收一个 Function<T, U>
函数式接口。
类比:类似于 Stream API 中的 map()
操作。你拿到一个输入,经过一个函数处理,直接返回一个输出。
flowchart LR
A[Stage 1<br>CompletableFuture<T>] --> B["thenApply(Function<T, U>)"]
B --> C[Stage 2<br>CompletableFuture<U>]
subgraph B [同步操作]
direction LR
B1[输入 T] --> B2[Function T->U] --> B3[输出 U]
end
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello");
// thenApply: 同步地将字符串转换为大写
CompletableFuture<String> upperCaseFuture = future.thenApply(s -> {
// 这是一个同步操作,立即执行
return s.toUpperCase();
});
// 输出: HELLO
upperCaseFuture.thenAccept(System.out::println);
典型场景:数据格式转换、计算、提取字段等所有不需要再发起异步操作的处理。
8.3.2 thenCompose (异步变换 / 展平)
核心概念:在当前阶段完成后,异步地对其结果进行转换处理。它接收一个 Function<T, CompletableFuture<U>>
函数。它用于解决嵌套的 CompletableFuture
(即 CompletableFuture<CompletableFuture<U>>
)问题,将其“展平”为单一的 CompletableFuture<U>
。
flowchart LR
A[Stage 1<br>CompletableFuture<T>] --> B["thenCompose(Function<T, CompletableFuture<U>>)"]
B --> C[启动新的异步任务]
C --> D[Stage 2<br>CompletableFuture<U>]
// 模拟一个异步服务:根据用户ID获取用户详情
CompletableFuture<User> getUserDetail(String userId) {
return CompletableFuture.supplyAsync(() -> {
// 模拟网络IO
return new User(userId, "Alice");
});
}
CompletableFuture<String> userIdFuture = CompletableFuture.supplyAsync(() -> "123");
// 错误做法:使用 thenApply 会导致嵌套的 Future: CompletableFuture<CompletableFuture<User>>
CompletableFuture<CompletableFuture<User>> badFuture = userIdFuture.thenApply(userId -> getUserDetail(userId));
// 正确做法:使用 thenCompose 将嵌套的 Future “展平” 成单一的 CompletableFuture<User>
CompletableFuture<User> userFuture = userIdFuture.thenCompose(userId -> getUserDetail(userId));
// 现在可以直接操作 User 对象
userFuture.thenAccept(user -> System.out.println("User name: " + user.getName())); // Output: User name: Alice
典型场景:异步链式调用。当一个异步操作(如调用A服务)的结果,是另一个异步操作(如调用B服务)的输入时,必须使用 thenCompose
。
8.3.3 thenCombine (组合汇聚)
核心概念:等待两个独立的 CompletableFuture
都完成后,将它们的结果作为输入,同步地进行组合处理,并返回一个新的结果。它接收另一个 CompletionStage
和一个 BiFunction<T, U, V>
。
类比:等待两个并行任务都完工,然后把它们的产出物组装成一个新产品。
flowchart TD
A[Future 1<br>CompletableFuture<T>] --> C["thenCombine(Future2, BiFunction<T, U, V>)"]
B[Future 2<br>CompletableFuture<U>] --> C
C --> D[组合结果 Future<br>CompletableFuture<V>]
subgraph C [同步组合操作]
direction LR
C1[输入 T] --> C2[BiFunction T,U->V]
C3[输入 U] --> C2
C2 --> C4[输出 V]
end
// 两个完全独立的异步任务
CompletableFuture<Integer> weightFuture = CompletableFuture.supplyAsync(() -> 70); // 获取体重
CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 1.75); // 获取身高
// thenCombine: 等待两个独立任务都完成,然后组合它们的计算结果(BMI)
CompletableFuture<Double> bmiFuture = weightFuture.thenCombine(heightFuture, (weight, height) -> {
// 这是一个同步计算函数
double heightInMeter = height;
return weight / (heightInMeter * heightInMeter);
});
// 输出: BMI: 22.857...
bmiFuture.thenAccept(bmi -> System.out.println("BMI: " + bmi));
典型场景:聚合多个独立异步任务的结果。例如,从商品服务和库存服务分别获取信息,然后组合成一个完整的商品详情VO。
8.3.4 thenAccept (消费者 - Consumer)
核心概念:在当前阶段完成后,将其结果作为输入,执行一个有输入、无输出的消费操作。它接收一个 Consumer<T>
函数式接口。
语义:“有了这个结果之后,我要拿它做点什么…”(比如打印它、保存它、发送它)
flowchart LR
A[Stage 1<br>CompletableFuture<T>] --> B["thenAccept(Consumer<T>)"]
B --> C[Stage 2<br>CompletableFuture<Void>]
subgraph B [消费操作]
direction LR
B1[输入 T] --> B2[Consumer 消费 T] --> B3[输出 Void]
end
CompletableFuture<String> asyncTask = CompletableFuture.supplyAsync(() -> {
// 模拟耗时计算
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "Processed_Data_123";
});
// thenAccept: 接收上一步的字符串结果,并消费它(例如保存到数据库或发送消息)
CompletableFuture<Void> saveFuture = asyncTask.thenAccept(result -> {
System.out.println("成功获取到结果,正在保存: " + result);
// repository.save(result); // 假设的保存操作
// kafkaTemplate.send("topic", result); // 假设的发送消息操作
});
// 因为 thenAccept 返回的是 CompletableFuture<Void>,
// 所以你不能继续链式地转换数据,但可以链式地执行后续操作。
saveFuture.thenRun(() -> System.out.println("数据保存完毕!"));
典型场景:
- 将异步计算的结果保存到数据库。
- 将结果发送到消息队列(Kafka, RabbitMQ)。
- 打印日志或更新UI。
8.3.5 thenRun (动作执行者 - Runnable)
核心概念:在当前阶段完成后,执行一个既不需要输入、也不需要输出的操作。它接收一个 Runnable
函数式接口。
语义:“不管之前的结果是什么,现在这个阶段完成后,我就要做某件事…”(比如发通知、清理资源)
flowchart LR
A[Stage 1<br>CompletableFuture<T>] --> B["thenRun(Runnable)"]
B --> C[Stage 2<br>CompletableFuture<Void>]
subgraph B [执行操作]
direction LR
B2[Runnable 执行] --> B3[输出 Void]
end
CompletableFuture<String> asyncTask = CompletableFuture.supplyAsync(() -> {
// 模拟一个复杂的文件处理任务
try { Thread.sleep(2000); } catch (InterruptedException e) {}
return "large_file.pdf";
});
// thenRun: 不关心上一步的结果是什么,只关心它“完成”了这个动作。
// 例如,在上面的文件处理完后,发送一个通知或清理临时资源。
CompletableFuture<Void> notificationFuture = asyncTask.thenRun(() -> {
// 这里无法访问 asyncTask 的结果 "large_file.pdf"
System.out.println("之前的异步处理任务已经完成,发送通知...");
// notificationService.send("Task completed!");
// tempFileCleaner.cleanup(); // 清理临时文件
});
notificationFuture.thenRun(() -> System.out.println("通知发送完毕!"));
典型场景:
- 发送完成通知(邮件、短信)。
- 释放资源、清理临时文件。
- 记录一个不依赖于具体结果的操作日志(例如“流程执行完毕”)。
- 作为异步链的最终结束步骤。
8.3.6 图例
thenApply |
thenCompose |
thenCombine |
|
---|---|---|---|
核心作用 | 同步变换 | 异步变换(展平) | 组合汇聚 |
输入 | 上一个阶段的结果 T |
上一个阶段的结果 T |
两个独立的 Future 的结果 T 和 U |
操作函数 | Function<T, U> |
Function<T, CompletableFuture<U>> |
BiFunction<T, U, V> |
输出 | CompletableFuture<U> |
CompletableFuture<U> |
CompletableFuture<V> |
解决痛点 | 简单的值转换 | 嵌套的 Future (CF<CF<U>> ) |
合并两个独立任务的结果 |
类比 | Stream map |
Stream flatMap |
等待两个并行任务并组装 |
特性 | thenAccept |
thenRun |
---|---|---|
核心作用 | 消费前一个阶段的结果 | 在前一个阶段完成后执行一个动作 |
是否需要输入 | 需要(前一个阶段的结果 T ) |
不需要 |
操作函数接口 | Consumer<T> |
Runnable |
返回值 | CompletableFuture<Void> |
CompletableFuture<Void> |
语义 | “有了这个,然后…” | “做完这个之后,然后…” |
类比 | 顾客拿到菜(结果)后开始吃(消费) | 厨师做完菜(完成)后洗手(动作),不关心菜被谁吃了 |
8.4 exceptionally/handle/whenComplete
核心区别一句话概括:
exceptionally
: 恢复。只在发生异常时被调用,相当于catch
块,用于提供降级值或恢复业务。handle
: 转换。无论成功与否都会被调用,相当于finally
+ 结果转换,可以同时访问结果和异常,并返回一个新结果。whenComplete
: 回调。无论成功与否都会被调用,相当于finally
,可以访问结果或异常,但无法改变原有结果或异常。
8.4.1 exceptionally (异常恢复 - Catch Block)
核心概念:仅在前一阶段抛出异常时被触发
。它接收一个 Function<Throwable, T>
函数,用于计算一个降级(fallback)值来恢复流程,使链式调用可以继续下去。
语义:“如果上一步出错了,我就用这个值来代替…”
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Data fetch failed!");
}
return "Data";
});
// 仅在异常时执行,提供降级值
CompletableFuture<String> safeFuture = future.exceptionally(ex -> {
System.err.println("Operation failed, using fallback. Cause: " + ex.getMessage());
return "Fallback Data"; // 提供一个降级值,让流程继续
});
// 输出可能是 "Data" 或 "Fallback Data"
safeFuture.thenAccept(System.out::println);
典型场景:服务降级、默认值返回、异常转换(将系统异常转换为业务异常)。
8.4.2 handle (结果和异常转换 - Finally + Transform)
核心概念:无论前一阶段成功还是失败,都会被调用。它接收一个 BiFunction<T, Throwable, U>
函数,可以同时访问结果和异常(其中一个必为 null
),并必须返回一个新结果,从而可以统一转换成功和失败两种情况的结果。
语义:“不管上一步是成功还是失败,我都要处理一下,并返回一个新的结果…”
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Calculation error!");
}
return 10;
});
// 无论成功失败都执行,并返回一个新结果
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
return "Computation failed: " + ex.getMessage(); // 异常时返回字符串
} else {
return "Computation succeeded: " + (result * 2); // 成功时也返回字符串
}
// 注意:必须返回一个值,这个值会成为新的 CompletableFuture 的结果
});
// 输出是统一的字符串格式
handledFuture.thenAccept(System.out::println);
典型场景:统一封装响应体(将成功结果和异常信息都封装成固定的API响应格式)、结果日志记录、监控指标上报。
8.4.3 whenComplete (最终回调 - Finally Block)
核心概念:无论前一阶段成功还是失败,都会被调用。它接收一个 BiConsumer<T, Throwable>
函数,可以访问结果或异常,但无法改变它们。它执行完后,原来的结果(或异常)会原封不动地传递给下一阶段。
语义:“不管上一步是成功还是失败,我都要看一下,做点事(比如日志、清理),但不会改变最终结果…”
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Network issue!");
}
return "Important Result";
});
// 无论成功失败都执行,但不会改变结果
CompletableFuture<String> loggedFuture = future.whenComplete((result, ex) -> {
if (ex != null) {
// 记录错误日志,但异常依然会传播
System.err.println("Operation completed with exception: " + ex.getMessage());
} else {
// 记录成功日志,结果依然是 "Important Result"
System.out.println("Operation completed successfully: " + result);
}
// 注意:这是一个 Consumer,没有返回值
});
// 尝试获取结果,如果上面有异常,这里依然会抛出
try {
System.out.println("Final result: " + loggedFuture.get());
} catch (Exception e) {
System.out.println("Still got exception: " + e.getCause().getMessage());
}
典型场景:资源清理、日志记录、指标监控(如记录耗时)、发送通知。它不会阻止异常传播。
特性 | exceptionally |
handle |
whenComplete |
---|---|---|---|
调用时机 | 仅异常时 | 总是 | 总是 |
输入 | Throwable (异常对象) |
(T result, Throwable ex) (一个非null) |
(T result, Throwable ex) (一个非null) |
函数接口 | Function<Throwable, T> |
BiFunction<T, Throwable, U> |
BiConsumer<T, Throwable> |
返回值 | 有(降级值 T ) |
有(新结果 U ) |
无 |
是否改变结果/异常 | 是(用返回值替代异常) | 是(用返回值替代原有结果和异常) | 否(观察者,原样传递) |
类比 | catch 块 |
finally + 结果转换 |
finally 块 |
核心用途 | 服务降级,提供备选值 | 统一转换,格式化成功/失败输出 | 副作用操作,如日志、清理 |
8.4.4 组合使用与最佳实践
CompletableFuture.supplyAsync(() -> queryDatabase(userId))
.thenApply(data -> transformData(data)) // 转换数据
.whenComplete((result, ex) -> { // 记录日志或监控,不影响结果
metrics.record("dbQuery", ex == null);
log.debug("Query finished for user {}", userId);
})
.exceptionally(ex -> { // 如果上述任何一步出错,提供降级
log.error("Full operation failed", ex);
return getCachedData(userId); // 尝试返回缓存数据
})
.handle((result, ex) -> { // 最终统一响应格式
if (ex != null) {
return new ApiResponse<>("FAILURE", null, ex.getMessage());
} else {
return new ApiResponse<>("SUCCESS", result, null);
}
})
.thenAccept(response -> sendResponse(response)); // 发送最终响应
总结一下选择策略:
- 想吃掉异常并提供一个备用结果 -> 用
exceptionally
- 想统一处理成功和失败两种情况,并返回一个新的统一类型 -> 用
handle
- 只是想看一下结果或异常(记录日志、清理),但不想改变它们 -> 用
whenComplete
往期资料获取地址:
- https://pan.baidu.com/s/1FbUp2xFFidxCfeXbzYjmOw 提取码: jzm9
更多推荐
所有评论(0)