在Callable、Future、FutureTask原理一文中,我们介绍了Future
的使用以及原理。Future
虽然可以实现获取异步执行结果的需求,但是它有着显而易见的缺点:
Future
没有提供通知机制,我们无法得知Future什么时候完成- 想要获得执行结果,要么使用阻塞,在
future.get()
的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()
轮询地判断Future
是否完成,这样会耗费CPU的资源。
在JDK8之前,我们可以使用第三方库来解决这个问题。Netty、Guava分别扩展了Java的Future
接口,方便做异步编程。
JDK8新增了一个CompletableFuture
类。CompletableFuture
类吸收了所有Guava中ListenableFuture
和SettableFuture
的特征,还提供了其他强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise和Callback
CompletableFuture
能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。
CompletableFuture
弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept
、thenApply
、thenCompose
等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。
创建CompletableFuture对象
public static <U> CompletableFuture<U> completedFuture(U value)
,是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture
。
completedFuture
方法的使用方式如下所示:
1 |
|
其中getNow
方法返回CompletableFuture
当前的执行结果,如果没有执行完成则返回默认值。
执行结果:
1 | 17:10:52.318 [main] INFO completable_future.p3.CompletableFutureTest - isDone true |
以下四个静态方法用来为一段异步执行的代码创建CompletableFuture
对象:
1 | public static CompletableFuture<Void> runAsync(Runnable runnable) |
以Async
结尾并且没有指定Executor
的方法会使用ForkJoinPool.commonPool()
作为它的线程池执行异步代码。
runAsync
方法以Runnable
函数式接口参数类型为参数,所以CompletableFuture
的计算结果为空。
supplyAsync
方法以Supplier<U>
函数式接口类型为参数,CompletableFuture
的计算结果类型为U
runAsync
方法的使用方式如下所示:
1 |
|
执行结果:
1 | 17:11:43.143 [main] INFO completable_future.p3.CompletableFutureTest - isDone 1 false |
supplyAsync
方法的使用方式如下所示:
1 |
|
执行结果:
1 | 17:11:52.902 [main] INFO completable_future.p3.CompletableFutureTest - isDone 1 false |
从结果上我们也可以看到,runAsync
和supplyAsync
的区别就是:supplyAsync
能够返回执行结果,而runAsync
不会。
计算结果完成时的处理
当CompletableFuture
的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action
。主要是下面的方法:
1 | public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) |
可以看到Action
的类型是BiConsumer<? super T, ? super Throwable>
,它可以处理正常的计算结果,或者异常情况。
方法不以Async
结尾,意味着Action
使用相同的线程执行,而Async
可能会使用其他的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
注意这几个方法都会返回CompletableFuture
,当Action
执行完毕后它的结果返回原始的CompletableFuture
的计算结果或者返回异常。
whenComplete
方法的使用方式如下所示:
1 |
|
执行结果如下:
1 | 18:54:03.213 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
可以看到,正常情况下whenComplete
返回supplyAsync
执行的结果。
如果执行过程中抛出异常,whenComplete
也可以接收到异常然后处理:
1 |
|
执行结果如下:
1 | 18:15:30.632 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
exceptionally
方法返回一个新的CompletableFuture
,当原始的CompletableFuture
抛出异常的时候,就会触发这个CompletableFuture
的计算,调用function
计算值,否则如果原始的CompletableFuture
正常计算完后,这个新的CompletableFuture
也计算完成,它的值和原始的CompletableFuture
的计算的值相同。也就是这个exceptionally
方法用来处理异常的情况。
exceptionally
方法的使用方式如下所示:
1 |
|
执行结果如下:
1 | 18:38:31.461 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
可以看到,当执行过程抛出异常时,会触发exceptionally
的执行,并返回exceptionally
的返回值。
如果执行过程中没有抛出异常:
1 |
|
执行结果如下:
1 | 18:42:55.469 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
可以看到,如果执行过程中没有抛出异常exceptionally
不会触发,它返回的值就是supplyAsync
执行返回的原始值。
下面一组方法虽然也返回CompletableFuture
对象,但是对象的值和原来的CompletableFuture
计算的值不同。当原先的CompletableFuture
的值计算完成或者抛出异常的时候,会触发这个CompletableFuture
对象的计算,结果由BiFunction
参数计算而得。因此这组方法兼有whenComplete
和转换的两个功能。
1 | public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) |
同样,不以Async
结尾的方法由原来的线程计算,以Async
结尾的方法由默认的线程池ForkJoinPool.commonPool()
或者指定的线程池executor
运行。
handle
方法的使用方式如下所示:
1 |
|
执行结果如下:
1 | 18:47:03.524 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
如果执行过程抛出异常:
1 |
|
执行结果如下:
1 | 18:48:25.769 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
可以看到,handle
方法接收执行结果和异常,处理之后返回新的结果。
转换
CompletableFuture
可以作为monad和functor。由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture
当计算完成的时候请执行某个function
。而且我们还可以将这些操作串联起来,或者将CompletableFunction
组合起来。
1 | public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) |
这一组函数的功能是当原来的CompletableFuture
计算完后,将结果传递给函数fn
,将fn
的结果作为新的CompletableFuture
计算结果。因此它的功能相当于将CompletableFuture<T>
转换成CompletableFuture<U>
。
需要注意的是,这些转换并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。
它们与handle
方法的区别在于handle
方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。而thenApply
方法只是用来处理正常值,因此一旦有异常就会抛出。
thenApply
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 20:22:24.537 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
纯消费(执行Action)
上面的方法是当计算完成的时候,会生成新的计算结果(thenApply
, handle
),或者返回同样的计算结果whenComplete
。CompletableFuture
还提供了一种处理结果的方法,只对结果执行Action
,而不返回新的计算值。因此计算值为Void
:
1 | public CompletableFuture<Void> thenAccept(Consumer<? super T> action) |
看它的参数类型也就明白了,它们是函数式接口Consumer
,这个接口只有输入,没有返回值。
thenAccept
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 20:28:30.580 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
thenAcceptBoth
1 | public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) |
thenAcceptBoth
接收另一个CompletionStage
和action
,当两个CompletionStage
都正常完成计算后,就会执行提供的action
,它用来组合另外一个异步的结果。
thenAcceptBoth
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 20:31:40.335 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start |
runAfterBoth
1 | public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) |
runAfterBoth
是当两个CompletionStage
都正常完成计算的时候,执行一个Runnable
,这个Runnable并不使用计算的结果。
runAfterBoth
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 20:34:07.085 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start |
thenRun
1 | public CompletableFuture<Void> thenRun(Runnable action) |
thenRun
当计算完成的时候会执行一个Runnable
,与thenAccept
不同,Runnable
并不使用CompletableFuture
计算的结果。
因此先前的CompletableFuture
计算的结果被忽略,返回Completable<Void>
类型的对象。
thenRun
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 20:36:05.833 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
组合
1 | public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) |
这一组方法接受一个Function
作为参数,这个Function
的输入是当前的CompletableFuture
的计算值,返回结果将是一个新的CompletableFuture
,这个新的CompletableFuture
会组合原来的CompletableFuture
和函数返回的CompletableFuture
。因此它的功能类似于:
1 | A +--> B +---> C |
thenCompose
返回的对象并不一定是函数fn
返回的对象,如果原来的CompletableFuture
还没有计算出来,它就会生成一个新的组合后的CompletableFuture
。
thenCompose
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 20:41:39.242 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
thenCombine
1 | public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) |
thenCombine
用来复合另外一个CompletionStage
的结果,它的功能类似:
1 | A + |
两个CompletionStage
是并行执行的,他们之间没有先后依赖顺序,other
并不会等待先前的CompletableFuture
执行完毕后再执行。
从功能上来讲,它们的功能更类似thenAcceptBoth
,只不过thenAcceptBoth
是纯消费,它的函数参数没有返回值,而thenCombine
的函数参数fn
有返回值。
thenCombine
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 20:45:01.018 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
Either
thenAcceptBoth
和runAfterBoth
是当两个CompletableFuture
都计算完成,而下面的方法是当任意一个CompletableFuture
计算完成的时候就会执行。
1 | public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) |
acceptEither
方法是当任意一个CompletionStage
完成的时候,action
这个消费者就会被执行。这个方法返回CompletableFuture<Void>
applyToEither
方法是当任意一个CompletionStage
完成的时候,fn
会被执行,它的返回值会当做新的CompletableFuture<U>
的计算结果
acceptEither
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 21:21:36.031 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
可以看到,当cf
执行完毕后,acceptEither
方法就被触发执行了
applyToEither
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 21:25:43.441 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
allOf 和 anyOf
1 | public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) |
allOf
方法是当所有的CompletableFuture
都执行完后执行计算
anyOf
方法是当任意一个CompletableFuture
执行完后就会执行计算,计算的结果相同
anyOf
和applyToEither
不同,anyOf
接受任意多的CompletableFuture
但是applyToEither
只是判断两个CompletableFuture
。anyOf
返回值的计算结果是参数中其中一个CompletableFuture
的计算结果,applyToEither
返回值的计算结果却是要经过fn
处理的。当然还有静态方法的区别,线程池的选择等。
allOf
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 21:36:36.938 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start |
anyOf
方法的使用方式如下:
1 |
|
执行结果如下:
1 | 21:43:12.562 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start |
https://colobu.com/2016/02/29/Java-CompletableFuture/
https://www.ibm.com/developerworks/cn/java/j-cf-of-jdk8/index.html
https://www.jianshu.com/p/dff9063e1ab6