CompletableFuture 深度解析

本文将探讨 Java 8 引入的 CompletableFuture,一个在异步编程中实现非阻塞、可组合操作的强大工具。我们将从 CompletableFuture 的基本概念、与传统 Future 的区别、核心 API 用法,到复杂的链式调用、组合操作以及异常处理进行全面解析,并通过丰富的代码示例,帮助 Java 后端开发者更好地理解和应用 CompletableFuture,提升系统性能和响应能力。

1. 为什么需要 CompletableFuture

在现代后端开发中,高并发和低延迟是衡量系统性能的重要指标。传统的同步编程模型在处理耗时操作(如网络请求、数据库查询)时,会阻塞当前线程,导致系统吞吐量下降,用户体验变差。为了解决这一问题,异步编程应运而生。Java 5 引入的 Future 接口是异步编程的初步尝试,它代表了一个异步计算的结果,但其局限性也日益凸显。

1.1 传统 Future 的局限性

Future 接口虽然提供了异步执行任务的能力,但其设计存在以下几个主要局限性:

  • 阻塞式获取结果Future.get() 方法会阻塞当前线程,直到异步任务完成并返回结果。这意味着,如果任务执行时间过长,调用线程将被长时间阻塞,无法执行其他操作,从而降低了系统的响应能力和资源利用率。
  • 无法方便地进行链式操作和组合Future 接口没有提供直接的方法来将多个异步操作串联起来,或者将多个异步操作的结果进行组合。当需要执行一系列相互依赖的异步任务时,开发者往往需要手动管理线程和回调,代码变得复杂且难以维护,容易出现“回调地狱”(Callback Hell)。
  • 异常处理不便Future 接口的异常处理机制相对简单。当异步任务抛出异常时,只有在调用 get() 方法时才能捕获到 ExecutionException,这使得异常的传播和处理变得不灵活,难以在异步流程中进行细粒度的错误控制。

1.2 CompletableFuture 的优势

为了克服 Future 的这些局限性,Java 8 引入了 CompletableFuture 类。CompletableFuture 不仅实现了 Future 接口,还实现了 CompletionStage 接口,这使得它在异步编程方面拥有了前所未有的灵活性和强大功能。CompletableFuture 的主要优势体现在:

  • 非阻塞CompletableFuture 通过回调机制实现了非阻塞操作。它允许你注册一个回调函数,当异步任务完成时,该回调函数会被自动执行,而不会阻塞当前线程。这极大地提高了系统的并发能力和响应速度。
  • 可组合CompletableFuture 提供了丰富的 API,支持将多个异步操作进行链式调用和组合。你可以轻松地将一个异步任务的结果作为另一个异步任务的输入,或者等待多个异步任务都完成后再执行某个操作。这种可组合性使得复杂的异步流程能够以声明式的方式清晰地表达,代码结构更加简洁。
  • 更灵活的异常处理CompletableFuture 提供了 exceptionally()handle() 等方法,允许开发者在异步任务的任何阶段捕获和处理异常。这使得异常处理变得更加灵活和可控,避免了传统 Future 中异常处理的痛点。
  • 更强大的并发控制CompletableFuture 内部使用了 ForkJoinPool 作为默认的异步执行线程池,也可以自定义线程池。它能够更好地利用多核处理器的优势,实现高效的并发控制。

总之,CompletableFuture 是 Java 异步编程领域的一个里程碑式的改进,它为开发者提供了构建高性能、高响应、易于维护的并发应用程序的强大工具。

2. CompletableFuture 核心概念与基本用法

CompletableFuture 是 Java 8 中引入的一个强大的并发工具,它位于 java.util.concurrent 包中。

2.1 CompletableFuture 是什么

CompletableFuture<T> 是一个类,它实现了 Future<T>CompletionStage<T> 两个接口。这意味着它既可以作为传统 Future 的替代品,用于获取异步计算的结果,又具备了 CompletionStage 接口提供的强大功能,支持链式操作和组合多个异步计算步骤。

  • Future<T> 接口:代表一个异步计算的结果。通过 get() 方法可以阻塞地获取结果,或者通过 cancel() 方法取消任务。
  • CompletionStage<T> 接口:定义了一系列用于描述异步计算阶段的方法。这些方法允许你在一个异步操作完成后执行另一个操作,而无需阻塞当前线程。这是 CompletableFuture 强大之处的核心。

简单来说,CompletableFuture 代表了一个可能在未来某个时间点完成的异步计算的结果。这个结果可以是成功的值,也可以是计算过程中抛出的异常。

2.2 创建 CompletableFuture

CompletableFuture 提供了多种静态方法来创建不同类型的异步任务:

2.2.1 CompletableFuture.runAsync(Runnable runnable)

用于执行一个没有返回值的异步任务。它接受一个 Runnable 类型的参数,并在 ForkJoinPool.commonPool() 中异步执行该任务。如果需要指定线程池,可以使用 CompletableFuture.runAsync(Runnable runnable, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws InterruptedException {System.out.println("主线程开始");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("异步任务执行完成,无返回值");} catch (InterruptedException e) {e.printStackTrace();}});// 主线程可以继续执行其他任务,无需等待异步任务完成System.out.println("主线程继续执行其他任务");// 等待异步任务完成(非阻塞方式,通过回调)future.thenRun(() -> System.out.println("异步任务真正完成后的回调"));// 为了演示效果,让主线程等待一段时间,确保异步任务有时间执行TimeUnit.SECONDS.sleep(3);System.out.println("主线程结束");}
}

2.2.2 CompletableFuture.supplyAsync(Supplier supplier)

用于执行一个有返回值的异步任务。它接受一个 Supplier<T> 类型的参数,并在 ForkJoinPool.commonPool() 中异步执行该任务,返回一个 CompletableFuture<T>。同样,也可以指定线程池:CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("异步任务执行完成,有返回值");return "Hello, CompletableFuture!";} catch (InterruptedException e) {throw new IllegalStateException(e);}});System.out.println("主线程继续执行其他任务");// 阻塞式获取结果(仅为演示,实际应用中应避免长时间阻塞)String result = future.get(); System.out.println("异步任务返回结果: " + result);System.out.println("主线程结束");}
}

2.2.3 new CompletableFuture()

你可以手动创建一个 CompletableFuture 实例,并在后续通过 complete()completeExceptionally() 方法来手动完成它。这在某些场景下非常有用,例如当你需要将一个非 CompletableFuture 风格的异步操作包装成 CompletableFuture 时。

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = new CompletableFuture<>();// 在另一个线程中执行耗时操作,并手动完成 CompletableFuturenew Thread(() -> {try {TimeUnit.SECONDS.sleep(2);future.complete("手动完成的 CompletableFuture");System.out.println("手动 CompletableFuture 已完成");} catch (InterruptedException e) {future.completeExceptionally(e);}}).start();System.out.println("主线程继续执行其他任务");String result = future.get(); // 阻塞等待结果System.out.println("获取到手动 CompletableFuture 的结果: " + result);System.out.println("主线程结束");}
}

2.2.4 CompletableFuture.completedFuture(T value)

如果你已经知道一个异步操作的结果,可以直接使用 completedFuture() 方法创建一个已经完成的 CompletableFuture。这对于测试或者某些特定场景非常方便,可以避免不必要的异步执行。

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主线程开始");CompletableFuture<String> future = CompletableFuture.completedFuture("这是一个已完成的 CompletableFuture");System.out.println("主线程继续执行其他任务");String result = future.get(); // 不会阻塞,立即返回结果System.out.println("获取到已完成 CompletableFuture 的结果: " + result);System.out.println("主线程结束");}
}

2.3 获取结果

CompletableFuture 完成后,你可以通过以下方法获取其结果:

2.3.1 get()

get() 方法是 Future 接口中定义的方法,它会阻塞当前线程,直到 CompletableFuture 完成并返回结果。如果任务在完成时抛出异常,get() 方法会抛出 ExecutionException,其 getCause() 方法可以获取到原始异常。

注意:在实际应用中,应尽量避免长时间阻塞主线程,get() 方法通常用于测试或在确定异步任务很快完成的场景。

2.3.2 join()

join() 方法与 get() 方法类似,也会阻塞当前线程并等待 CompletableFuture 完成。但不同的是,join() 方法不会抛出受检异常 ExecutionException,而是将原始异常包装成非受检异常 CompletionException 抛出。这使得在链式调用中处理异常更加方便,无需在每个 get() 调用处都进行 try-catch

2.3.3 complete(T value)

complete() 方法用于手动完成 CompletableFuture,并为其设置一个结果值。如果 CompletableFuture 已经完成(无论是正常完成还是异常完成),再次调用 complete() 将无效。

2.3.4 completeExceptionally(Throwable ex)

completeExceptionally() 方法用于手动使 CompletableFuture 异常完成,并为其设置一个异常。这在异步任务执行过程中发生错误时非常有用,可以将异常信息传递给 CompletableFuture 的消费者。

示例代码(get() vs join()):

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureGetJoin {public static void main(String[] args) {// 正常完成的 CompletableFutureCompletableFuture<String> successFuture = CompletableFuture.supplyAsync(() -> "Success");try {String resultGet = successFuture.get();System.out.println("get() 正常结果: " + resultGet);String resultJoin = successFuture.join();System.out.println("join() 正常结果: " + resultJoin);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 异常完成的 CompletableFutureCompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Something went wrong!");});try {exceptionFuture.get(); // 抛出 ExecutionException} catch (InterruptedException | ExecutionException e) {System.out.println("get() 捕获到异常: " + e.getCause().getMessage());}try {exceptionFuture.join(); // 抛出 CompletionException} catch (Exception e) {System.out.println("join() 捕获到异常: " + e.getCause().getMessage());}}
}

3. 链式操作:构建异步任务流

CompletableFuture 最强大的特性之一是其支持链式操作,允许我们将多个异步任务串联起来,形成一个有向无环图(DAG),从而构建复杂的异步任务流。这极大地简化了异步编程的复杂性,避免了传统回调模式带来的“回调地狱”。

3.1 结果转换:thenApply()

thenApply() 方法用于对上一个 CompletableFuture 的结果进行转换,并返回一个新的 CompletableFuture。它接受一个 Function 函数式接口作为参数,该函数接收上一个 CompletableFuture 的结果作为输入,并返回一个转换后的新结果。thenApply() 是同步执行的,即转换操作会在完成上一个 CompletableFuture 的线程中执行。如果需要异步执行转换操作,可以使用 thenApplyAsync()

方法签名:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> initialFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> transformedFuture = initialFuture.thenApply(s -> {System.out.println("thenApply 在 " + Thread.currentThread().getName() + " 线程中执行");return s + " World";});System.out.println("结果: " + transformedFuture.get()); // 输出: 结果: Hello World}
}

3.2 消费结果:thenAccept()

thenAccept() 方法用于消费上一个 CompletableFuture 的结果,但不会返回新的结果(即返回 CompletableFuture<Void>)。它接受一个 Consumer 函数式接口作为参数,该函数接收上一个 CompletableFuture 的结果作为输入,但没有返回值。thenAccept() 同样有异步版本 thenAcceptAsync()

方法签名:

CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenAccept(s -> {System.out.println("thenAccept 在 " + Thread.currentThread().getName() + " 线程中执行");System.out.println("消费结果: " + s + ", 无返回值");});voidFuture.get(); // 等待消费完成}
}

3.3 任务完成:thenRun()

thenRun() 方法用于在上一个 CompletableFuture 完成后执行一个不关心结果且没有返回值的任务。它接受一个 Runnable 函数式接口作为参数。thenRun() 同样有异步版本 thenRunAsync()

方法签名:

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenRun(() -> {System.out.println("thenRun 在 " + Thread.currentThread().getName() + " 线程中执行");System.out.println("上一个任务已完成,执行不关心结果的任务");});voidFuture.get(); // 等待任务完成}
}

3.4 异步转换:thenCompose()

thenCompose() 方法是 CompletableFuture 中非常重要的一个方法,它用于将上一个 CompletableFuture 的结果作为参数,创建并返回一个新的 CompletableFuture。这与 thenApply() 的区别在于,thenApply() 返回的是一个包含转换后结果的 CompletableFuture,而 thenCompose() 返回的是一个扁平化的 CompletableFuture。当你的转换函数本身也返回一个 CompletableFuture 时,thenCompose() 可以避免出现 CompletableFuture<CompletableFuture<T>> 这种嵌套结构,从而保持链的扁平化。

方法签名:

<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "User ID: 123";} catch (InterruptedException e) {throw new IllegalStateException(e);}});// thenApply 示例 (会产生嵌套)CompletableFuture<CompletableFuture<String>> nestedFuture = future1.thenApply(userId -> {System.out.println("thenApply 内部获取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Alice";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenApply 结果 (嵌套): " + nestedFuture.get().get()); // 需要两次 get()// thenCompose 示例 (扁平化)CompletableFuture<String> flatFuture = future1.thenCompose(userId -> {System.out.println("thenCompose 内部获取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Bob";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenCompose 结果 (扁平化): " + flatFuture.get()); // 只需一次 get()}
}

4. 组合操作:处理多个异步任务

在实际应用中,我们经常需要处理多个独立的异步任务,并在它们全部完成或其中任意一个完成时执行后续操作。CompletableFuture 提供了强大的组合方法,使得这些场景的处理变得非常优雅和高效。

4.1 组合两个结果:thenCombine()

thenCombine() 方法用于当两个 CompletableFuture 都完成后,将它们的结果组合起来,并返回一个新的 CompletableFuture。它接受另一个 CompletionStage 和一个 BiFunction 作为参数,BiFunction 接收两个 CompletableFuture 的结果作为输入,并返回一个组合后的新结果。

方法签名:

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "World";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> {System.out.println("thenCombine 在 " + Thread.currentThread().getName() + " 线程中执行");return s1 + " " + s2;});System.out.println("组合结果: " + combinedFuture.get()); // 输出: 组合结果: Hello World}
}

4.2 所有任务完成:allOf()

allOf() 方法用于等待所有给定的 CompletableFuture 都完成。它接受一个 CompletableFuture 数组作为参数,并返回一个 CompletableFuture<Void>。当所有输入的 CompletableFuture 都成功完成时,返回的 CompletableFuture 才会完成。如果其中任何一个 CompletableFuture 异常完成,那么 allOf() 返回的 CompletableFuture 也会异常完成。

方法签名:

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);// 等待所有任务完成allFutures.get(); System.out.println("所有任务都已完成!");// 可以通过各自的 future.get() 获取结果System.out.println(future1.get());System.out.println(future2.get());System.out.println(future3.get());}
}

4.3 任意任务完成:anyOf()

anyOf() 方法用于当任何一个给定的 CompletableFuture 完成时,就完成当前的 CompletableFuture。它接受一个 CompletableFuture 数组作为参数,并返回一个 CompletableFuture<Object>。返回的 CompletableFuture 的结果将是第一个完成的 CompletableFuture 的结果。

方法签名:

static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("第一个完成的任务结果: " + anyOfFuture.get()); // 输出: 第一个完成的任务结果: Result from Future 2}
}

5. 异常处理

在异步编程中,异常处理是一个非常重要的环节。CompletableFuture 提供了多种机制来优雅地处理异步任务中可能出现的异常,避免了传统 Future 中异常处理的繁琐和不便。

5.1 异常处理:exceptionally()

exceptionally() 方法用于当 CompletableFuture 出现异常时,提供一个替代结果。它接受一个 Function<Throwable, ? extends T> 作为参数,当上一个 CompletableFuture 异常完成时,该函数会被调用,并接收异常作为输入,然后返回一个替代值作为当前 CompletableFuture 的结果。如果上一个 CompletableFuture 正常完成,exceptionally() 不会执行。

方法签名:

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("模拟异常");}return "正常结果";}).exceptionally(ex -> {System.out.println("捕获到异常: " + ex.getMessage());return "从异常中恢复的默认结果";});System.out.println("最终结果: " + future.get());}
}

5.2 统一处理:handle()

handle() 方法是一个更通用的处理方法,无论 CompletableFuture 是正常完成还是异常完成,它都会执行。它接受一个 BiFunction<? super T, Throwable, ? extends U> 作为参数,该函数接收上一个 CompletableFuture 的结果和可能发生的异常作为输入。如果正常完成,异常参数为 null;如果异常完成,结果参数为 nullhandle() 的返回值将作为当前 CompletableFuture 的结果。

方法签名:

<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情况CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常数据").handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕获到异常: " + ex.getMessage());return "处理异常后的结果";} else {System.out.println("handle 正常处理结果: " + result);return result + " - 处理完成";}});System.out.println("正常情况最终结果: " + normalFuture.get());// 异常情况CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模拟异常");}).handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕获到异常: " + ex.getMessage());return "处理异常后的结果";} else {System.out.println("handle 正常处理结果: " + result);return result + " - 处理完成";}});System.out.println("异常情况最终结果: " + exceptionFuture.get());}
}

5.3 完成时回调:whenComplete()

whenComplete() 方法用于当 CompletableFuture 完成时执行一个回调,无论它是正常完成还是异常完成。它接受一个 BiConsumer<? super T, ? super Throwable> 作为参数,该函数接收结果和异常作为输入。与 handle() 不同的是,whenComplete() 不会修改 CompletableFuture 的结果,主要用于日志记录、资源清理或触发后续不依赖结果的操作。如果 whenComplete() 内部抛出异常,该异常会覆盖原始的异常(如果存在)。

方法签名:

CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

示例代码:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情况CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常数据").whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕获到异常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,结果: " + result);}});System.out.println("正常情况最终结果: " + normalFuture.get());// 异常情况CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模拟异常");}).whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕获到异常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,结果: " + result);}});try {exceptionFuture.get(); // 原始异常会被重新抛出} catch (Exception e) {System.out.println("get() 捕获到原始异常: " + e.getCause().getMessage());}}
}

6. 实际应用场景与最佳实践

CompletableFuture 在 Java 后端开发中有着广泛的应用,尤其是在需要处理大量并发请求、优化系统响应时间以及构建高吞吐量服务的场景下。合理地运用 CompletableFuture 可以显著提升系统性能和用户体验。

6.1 实际应用场景

6.1.1 并行调用多个微服务

在微服务架构中,一个业务请求可能需要调用多个下游微服务来获取数据。如果这些调用是串行的,那么总的响应时间将是所有微服务响应时间之和。通过 CompletableFuture,我们可以并行地发起对多个微服务的调用,然后使用 allOf()thenCombine() 等方法等待所有结果或组合结果,从而大大缩短响应时间。

示例场景: 用户下单时,需要同时查询商品库存、用户积分和优惠券信息。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class MicroserviceParallelCall {// 模拟查询商品库存的微服务public static CompletableFuture<Integer> getProductStock(String productId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟网络延迟System.out.println("查询商品 " + productId + " 库存完成");return 100; // 假设库存100} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模拟查询用户积分的微服务public static CompletableFuture<Integer> getUserPoints(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1.5); // 模拟网络延迟System.out.println("查询用户 " + userId + " 积分完成");return 2000; // 假设积分2000} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模拟查询优惠券信息的微服务public static CompletableFuture<String> getCouponInfo(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(0.8); // 模拟网络延迟System.out.println("查询用户 " + userId + " 优惠券完成");return "满100减10"; // 假设优惠券信息} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) throws Exception {long start = System.currentTimeMillis();CompletableFuture<Integer> stockFuture = getProductStock("P001");CompletableFuture<Integer> pointsFuture = getUserPoints("U001");CompletableFuture<String> couponFuture = getCouponInfo("U001");// 等待所有异步任务完成CompletableFuture.allOf(stockFuture, pointsFuture, couponFuture).join();// 获取结果并处理Integer stock = stockFuture.get();Integer points = pointsFuture.get();String coupon = couponFuture.get();System.out.println("\n所有信息查询完成:");System.out.println("商品库存: " + stock);System.out.println("用户积分: " + points);System.out.println("优惠券信息: " + coupon);long end = System.currentTimeMillis();System.out.println("总耗时: " + (end - start) + " ms");}
}

6.1.2 异步发送通知(邮件、短信)

在用户注册、订单支付成功等场景下,系统通常需要发送邮件、短信或站内信等通知。这些通知操作通常不影响主业务流程,但如果同步执行,可能会增加主业务的响应时间。使用 CompletableFuture 可以将这些通知操作异步化,提升主业务的响应速度。

示例场景: 用户注册成功后,异步发送欢迎邮件和短信。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class AsyncNotification {public static CompletableFuture<Void> sendEmail(String email, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模拟邮件发送耗时System.out.println("邮件发送成功到: " + email + ", 内容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static CompletableFuture<Void> sendSms(String phoneNumber, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(0.5); // 模拟短信发送耗时System.out.println("短信发送成功到: " + phoneNumber + ", 内容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static void main(String[] args) {System.out.println("用户注册成功,开始处理通知...");CompletableFuture<Void> emailFuture = sendEmail("test@example.com", "欢迎注册!");CompletableFuture<Void> smsFuture = sendSms("13800138000", "欢迎注册!");// 主业务流程可以继续,无需等待通知发送完成System.out.println("主业务流程继续执行...");// 可以选择等待所有通知发送完成,或者不等待CompletableFuture.allOf(emailFuture, smsFuture).join();System.out.println("所有通知任务已完成。");}
}

6.1.3 批量数据处理

当需要处理大量数据,并且每个数据的处理是独立的时,可以使用 CompletableFuture 将数据分成小批次并行处理,从而提高整体处理效率。

示例场景: 批量处理用户数据,对每个用户进行数据清洗和存储。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class BatchDataProcessing {public static CompletableFuture<String> processUserData(String user) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(200); // 模拟数据处理耗时System.out.println("处理用户数据: " + user + " 完成");return user.toUpperCase(); // 模拟数据清洗} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) {List<String> users = Arrays.asList("user1", "user2", "user3", "user4", "user5");long start = System.currentTimeMillis();List<CompletableFuture<String>> futures = users.stream().map(BatchDataProcessing::processUserData).collect(Collectors.toList());// 等待所有用户数据处理完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 获取所有处理结果List<String> processedUsers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());System.out.println("\n所有用户数据处理完成,结果: " + processedUsers);long end = System.currentTimeMillis();System.out.println("总耗时: " + (end - start) + " ms");}
}

6.2 最佳实践

  • 合理选择线程池CompletableFuture 默认使用 ForkJoinPool.commonPool()。对于 CPU 密集型任务,默认线程池通常是合适的。但对于 I/O 密集型任务,建议自定义线程池,并根据实际情况调整线程数量,避免线程饥饿或资源浪费。例如,可以使用 Executors.newFixedThreadPool()ThreadPoolExecutor

    // 自定义线程池示例
    ExecutorService customExecutor = Executors.newFixedThreadPool(10);
    CompletableFuture.supplyAsync(() -> {// 耗时操作return "Result";
    }, customExecutor);
    
  • 避免过度使用 get()join():虽然 get()join() 可以获取 CompletableFuture 的结果,但它们是阻塞的。过度使用会导致异步优势丧失,甚至引入死锁。应尽量使用 thenApply()thenAccept()thenCompose() 等非阻塞的回调方法来构建异步链。

  • 善用异常处理机制exceptionally()handle()whenComplete() 提供了灵活的异常处理方式。根据业务需求选择合适的异常处理策略,确保异步任务的健壮性。exceptionally() 适用于从异常中恢复并提供替代结果的场景,handle() 适用于无论成功失败都需要统一处理的场景,而 whenComplete() 适用于执行一些副作用操作(如日志记录、资源清理)而不改变结果的场景。

  • 链式调用与组合的合理运用:充分利用 CompletableFuture 提供的链式调用和组合方法,将复杂的异步逻辑拆解成更小、更易管理的部分。这不仅使代码更具可读性,也更容易进行测试和维护。特别注意 thenApply()thenCompose() 的区别,避免不必要的嵌套。

  • 超时处理:对于可能长时间运行的异步任务,考虑添加超时机制,避免资源无限期占用。虽然 CompletableFuture 本身没有直接的超时方法,但可以通过 CompletableFuture.orTimeout() (Java 9+) 或结合 CompletableFuture.runAfter() 等方法实现。

    // Java 9+ 超时处理示例
    CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(5); // 模拟长时间任务return "Task Completed";} catch (InterruptedException e) {throw new IllegalStateException(e);}
    }).orTimeout(2, TimeUnit.SECONDS); // 2秒后超时try {System.out.println(futureWithTimeout.get());
    } catch (Exception e) {System.out.println("任务超时或异常: " + e.getMessage());
    }
    
  • 日志记录:在异步任务的关键节点添加日志,便于追踪任务执行状态和排查问题。可以使用 whenComplete()handle() 来记录任务的成功或失败。

7. 总结

CompletableFuture 是 Java 8 引入的异步编程利器,它通过提供非阻塞、可组合、灵活的异常处理机制,极大地提升了 Java 在并发编程领域的表现力。本文从 CompletableFuture 的基本概念、创建方式、链式操作、组合操作以及异常处理等方面进行了深入解析,并通过丰富的代码示例展示了其在实际应用中的强大功能。

掌握 CompletableFuture 的核心 API 和最佳实践,能够帮助 Java 后端开发者更好地应对高并发、低延迟的挑战,构建出高性能、高响应、易于维护的现代后端服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/diannao/89003.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

给自己网站增加一个免费的AI助手,纯HTML

助手效果图 看完这篇文章&#xff0c;你将免费拥有你自己的Ai助手&#xff0c;全程干货&#xff0c;先到先得 获取免费的AI大模型接口 访问这个地址 生成key https://openrouter.ai/mistralai/mistral-small-3.2-24b-instruct:free/api 或者调用其他的免费大模型&#xff0c;这…

ASProxy64.dll导致jetbrains家的IDE都无法打开。

在Windows11中,无法打开jetbrains的IDE的软件,经过排查,发现与ASProxy64.dll有关。 E:\idea\IntelliJ IDEA 2024.1.7\bin>idea.bat CompileCommand: exclude com/intellij/openapi/vfs/impl/FilePartNodeRoot.trieDescend bool exclude = true # # A fatal error has bee…

springboot+Vue逍遥大药房管理系统

概述 基于springbootVue开发的逍遥大药房管理系统。该系统功能完善&#xff0c;既包含强大的后台管理模块&#xff0c;又具备用户友好的前台展示界面。 主要内容 一、后台管理系统功能 ​​核心管理模块​​&#xff1a; 用户管理&#xff1a;管理员与普通用户权限分级药品分…

探索阿里云智能媒体管理IMM:解锁媒体处理新境界

一、引言&#xff1a;开启智能媒体管理新时代 在数字化浪潮的席卷下&#xff0c;媒体行业正经历着前所未有的变革。从传统媒体到新媒体的转型&#xff0c;从内容生产到传播分发&#xff0c;每一个环节都在寻求更高效、更智能的解决方案。而云计算&#xff0c;作为推动这一变革…

[附源码+数据库+毕业论文]基于Spring+MyBatis+MySQL+Maven+jsp实现的新生报道管理系统,推荐!

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了新生报道管理系统的开发全过程。通过分析高校新生入学报到信息管理的不足&#xff0c;创建了一个计算机管理高校新生入学报到信息的方案。文章介绍了新生报道管…

给定一个整型矩阵map,求最大的矩形区域为1的数量

题目: 给定一个整型矩阵map,其中的值只有0和1两种,求其中全是1的 所有矩形区域中,最大的矩形区域为1的数量。 例如: 1 1 1 0 其中,最大的矩形区域有3个1,所以返回3。 再如: 1 0 1 1 1 1 1 1 1 1 1 0 其中,最大的矩形区域有6个1,所以返回6。 解题思…

第8章-财务数据

get_fund # 查看股票代码000001.XSHE在2022年9月1日的总市值 q query( valuation ).filter( valuation.code 000001.XSHE ) df get_fundamentals(q, 2022-09-01) print(df[market_cap][0]) # 获取第一行的market_cap值 这段代码看起来是用于查询股票在特定日期的总…

SQL关键字三分钟入门:ROW_NUMBER() —— 窗口函数为每一行编号

在进行数据分析时&#xff0c;我们常常需要为查询结果集中的每条记录生成一个唯一的序号或行号。例如&#xff1a; 为每位员工按照入职时间排序并编号&#xff1b;按照订单金额对订单进行排序&#xff0c;并给每个订单分配一个顺序编号&#xff1b;在分组数据内为每条记录编号…

微信小程序如何实现通过邮箱验证修改密码功能

基于腾讯云开发&#xff08;Tencent Cloud Base&#xff09;实现小程序邮箱验证找回密码功能的完整逻辑说明及关键代码实现。结合安全性和开发效率&#xff0c;方案采用 ​​云函数 小程序前端​​ 的架构&#xff0c;使用 ​​Nodemailer​​ 发送邮件。Nodemailer 是一个专为…

C# VB.NET中Tuple轻量级数据结构和固定长度数组

C# VB.NET取字符串中全角字符数量和半角字符数量-CSDN博客 https://blog.csdn.net/xiaoyao961/article/details/148871910 在VB.NET中&#xff0c;使用Tuple和固定长度数组在性能上有细微差异&#xff0c;以下是详细分析&#xff1a; 性能对比测试 通过测试 100 万次调用&am…

建筑物年代预测与空间异质性分析解决方案

建筑物年代预测与空间异质性分析解决方案 1. 问题分析与创新点设计 核心任务:预测建筑物建造年代,并分析空间异质性对预测的影响 创新点设计: 空间权重矩阵集成:构建空间邻接矩阵量化地理邻近效应多尺度特征提取:融合建筑物微观特征与街区宏观特征异质性分区建模:基于…

FOUPK3system5XOS

Foupk3systemX5OS系统19.60内测版&#xff08;X9&#xff09;2023年4月16日正式发布 1.0Foupk3systemX5OS系统19.60&#xff08;X9&#xff09;2024年10月6日发布 Foupk3systemX5OS系统19.60增强版&#xff08;X9X5&#xff09;2024年10月6日发布Foupk3systemX5OS系统19.60正…

随机生成的乱码域名”常由**域名生成算法(DGA)** 产生

“随机生成的乱码域名”常由**域名生成算法&#xff08;DGA&#xff09;** 产生&#xff0c;是网络攻击&#xff08;尤其是僵尸网络、恶意软件控制场景 &#xff09;中躲避检测的手段&#xff0c;以下是关键解析&#xff1a; ### 一、本质与产生逻辑 乱码域名是攻击者利用 **DG…

Solidity学习 - 继承

文章目录 前言继承的基本概念继承的基本用法单继承实现函数重写&#xff08;overriding&#xff09; 构造函数的继承处理多重继承抽象合约 前言 继承是面向对象编程中的核心概念之一&#xff0c;Solidity作为一种面向对象的智能合约语言&#xff0c;同样支持继承机制。通过继承…

依赖注入(Dependency Injection, DI)的核心概念和解决的核心问题

核心概念&#xff1a; 依赖注入是一种设计模式&#xff0c;也是实现控制反转&#xff08;Inversion of Control, IoC&#xff09; 原则的一种具体技术。其核心思想是&#xff1a; 解耦&#xff1a; 将一个类&#xff08;客户端&#xff09;所依赖的其他类或服务&#xff08;依…

Reactor Schedulers

Reactor 是一个基于响应式编程的库&#xff0c;它提供了丰富的调度器&#xff08;Schedulers&#xff09;机制&#xff0c;用于管理异步操作的执行环境。Schedulers 是 Reactor 中的核心组件之一&#xff0c;它们允许开发者灵活地控制操作符和订阅操作在哪个线程上执行&#xf…

设备树引入

一、设备树的基本知识 1、什么是设备树&#xff1f;为什么会有设备树&#xff1f; 2011年&#xff0c;Linux之父Linus Torvalds发现这个问题后&#xff0c;就通过邮件向ARM-Linux开发社区发了一封邮件&#xff0c;不禁的发出了一句“This whole ARM thing is a f*cking pain i…

【数据标注师】3D标注

目录 一、 **3D标注知识体系框架**二、 **五阶能力培养体系**▶ **阶段1&#xff1a;空间认知筑基&#xff08;2-3周&#xff09;**▶ **阶段2&#xff1a;核心标注技能深化**▶ **阶段3&#xff1a;复杂场景解决方案**▶ **阶段4&#xff1a;领域深度专精▶ **阶段5&#xff1…

华为HN8145V光猫改华为蓝色公版界面,三网通用,xgpon公版光猫

咸鱼只卖20多元一个&#xff0c;还是xgpon的万兆猫&#xff0c;性价比不错哦 除了没有2.5G网口&#xff0c;其他还行。 改成公版光猫后&#xff0c;运营商是无法纳管光猫&#xff0c;无法后台修改光猫数据及超密。 华为 HN8145V 光猫具有以下特点&#xff1a; 性能方面 高速接…

【LeetCode 热题 100】438. 找到字符串中所有字母异位词——(解法二)定长滑动窗口+数组

Problem: 438. 找到字符串中所有字母异位词 题目&#xff1a;给定两个字符串 s 和 p&#xff0c;找到 s 中所有 p 的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 【LeetCode 热题 100】438. 找到字符串中所有字母异位词——&#xff08;解法一&…