CompletableFuture对象的使用

Callable、Future、FutureTask原理一文中,我们介绍了Future的使用以及原理。Future虽然可以实现获取异步执行结果的需求,但是它有着显而易见的缺点:

  1. Future没有提供通知机制,我们无法得知Future什么时候完成
  2. 想要获得执行结果,要么使用阻塞,在future.get()的地方等待future返回的结果,这时又变成同步操作。要么使用isDone()轮询地判断Future是否完成,这样会耗费CPU的资源。

在JDK8之前,我们可以使用第三方库来解决这个问题。Netty、Guava分别扩展了Java的Future接口,方便做异步编程。

JDK8新增了一个CompletableFuture类。CompletableFuture类吸收了所有Guava中ListenableFutureSettableFuture的特征,还提供了其他强大的功能,让Java拥有了完整的非阻塞编程模型:Future、Promise和Callback

CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。

CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAcceptthenApplythenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

创建CompletableFuture对象

public static <U> CompletableFuture<U> completedFuture(U value),是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture

completedFuture方法的使用方式如下所示:

1
2
3
4
5
6
7
@Test
@Test
public void test01() {
CompletableFuture<String> cf = CompletableFuture.completedFuture("message");
logger.info("isDone {}", cf.isDone());
logger.info(cf.getNow(null));
}

其中getNow方法返回CompletableFuture当前的执行结果,如果没有执行完成则返回默认值。

执行结果:

1
2
17:10:52.318 [main] INFO completable_future.p3.CompletableFutureTest - isDone true
17:10:52.322 [main] INFO completable_future.p3.CompletableFutureTest - message

以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

runAsync方法以Runnable函数式接口参数类型为参数,所以CompletableFuture的计算结果为空。

supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U

runAsync方法的使用方式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void test02() throws InterruptedException {
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

logger.info("isDone 1 " + cf.isDone());
Thread.sleep(2000);
logger.info("isDone 2 " + cf.isDone());
logger.info("result {}", cf.join());
}

执行结果:

1
2
3
17:11:43.143 [main] INFO completable_future.p3.CompletableFutureTest - isDone 1 false
17:11:45.150 [main] INFO completable_future.p3.CompletableFutureTest - isDone 2 true
17:11:45.150 [main] INFO completable_future.p3.CompletableFutureTest - result null

supplyAsync方法的使用方式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void test02_1() throws InterruptedException {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});

logger.info("isDone 1 " + cf.isDone());
Thread.sleep(2000);
logger.info("isDone 2 " + cf.isDone());
logger.info("result {}", cf.join());
}

执行结果:

1
2
3
17:11:52.902 [main] INFO completable_future.p3.CompletableFutureTest - isDone 1 false
17:11:54.907 [main] INFO completable_future.p3.CompletableFutureTest - isDone 2 true
17:11:54.907 [main] INFO completable_future.p3.CompletableFutureTest - result hello

从结果上我们也可以看到,runAsyncsupplyAsync的区别就是:supplyAsync能够返回执行结果,而runAsync不会。

计算结果完成时的处理

CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

1
2
3
4
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

可以看到Action的类型是BiConsumer<? super T, ? super Throwable>,它可以处理正常的计算结果,或者异常情况。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

注意这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

whenComplete方法的使用方式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void test03() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = cf.whenComplete((s, throwable) -> {
if (throwable == null) {
logger.info(s);
}
});

logger.info(cf.join());
logger.info(cf1.join());
}

执行结果如下:

1
2
3
4
18:54:03.213 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
18:54:04.220 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello
18:54:04.220 [main] INFO completable_future.p3.CompletableFutureTest - hello
18:54:04.220 [main] INFO completable_future.p3.CompletableFutureTest - hello

可以看到,正常情况下whenComplete返回supplyAsync执行的结果。

如果执行过程中抛出异常,whenComplete也可以接收到异常然后处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void test03_1() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (true) {
throw new RuntimeException("exception");
}
return "hello";
});
cf.whenComplete((s, throwable) -> {
if (throwable == null) {
logger.info(s);
} else {
logger.error(throwable.getMessage());
}
});

while (!cf.isDone()) {}
}

执行结果如下:

1
2
18:15:30.632 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
18:15:31.635 [ForkJoinPool.commonPool-worker-1] ERROR completable_future.p3.CompletableFutureTest - java.lang.RuntimeException: exception

exceptionally方法返回一个新的CompletableFuture,当原始的CompletableFuture抛出异常的时候,就会触发这个CompletableFuture的计算,调用function计算值,否则如果原始的CompletableFuture正常计算完后,这个新的CompletableFuture也计算完成,它的值和原始的CompletableFuture的计算的值相同。也就是这个exceptionally方法用来处理异常的情况。

exceptionally方法的使用方式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void test05() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (true) {
throw new RuntimeException("exception");
}
return "hello";
});
cf.whenComplete((s, throwable) -> {
if (throwable == null) {
logger.info(s);
} else {
logger.error(throwable.getMessage());
}
});
CompletableFuture<String> cf1 = cf.exceptionally(throwable -> {
logger.error(throwable.getMessage());
return "exception happened";
});

logger.info(cf1.join());
}

执行结果如下:

1
2
3
4
18:38:31.461 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
18:38:32.467 [ForkJoinPool.commonPool-worker-1] ERROR completable_future.p3.CompletableFutureTest - java.lang.RuntimeException: exception
18:38:32.467 [ForkJoinPool.commonPool-worker-1] ERROR completable_future.p3.CompletableFutureTest - java.lang.RuntimeException: exception
18:38:32.467 [main] INFO completable_future.p3.CompletableFutureTest - exception happened

可以看到,当执行过程抛出异常时,会触发exceptionally的执行,并返回exceptionally的返回值。

如果执行过程中没有抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void test05() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
cf.whenComplete((s, throwable) -> {
if (throwable == null) {
logger.info(s);
} else {
logger.error(throwable.getMessage());
}
});
CompletableFuture<String> cf1 = cf.exceptionally(throwable -> {
logger.error(throwable.getMessage());
return "exception happened";
});

logger.info(cf1.join());
}

执行结果如下:

1
2
3
18:42:55.469 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
18:42:56.476 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello
18:42:56.476 [main] INFO completable_future.p3.CompletableFutureTest - hello

可以看到,如果执行过程中没有抛出异常exceptionally不会触发,它返回的值就是supplyAsync执行返回的原始值。

下面一组方法虽然也返回CompletableFuture对象,但是对象的值和原来的CompletableFuture计算的值不同。当原先的CompletableFuture的值计算完成或者抛出异常的时候,会触发这个CompletableFuture对象的计算,结果由BiFunction参数计算而得。因此这组方法兼有whenComplete和转换的两个功能。

1
2
3
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

同样,不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。

handle方法的使用方式如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void test06() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = cf.handle((s, throwable) -> {
if (throwable == null) {
logger.info(s);
return s + " world";
} else {
logger.error(throwable.getMessage());
return "exception happened";
}

});

logger.info(cf1.join());
}

执行结果如下:

1
2
3
18:47:03.524 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
18:47:04.529 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello
18:47:04.530 [main] INFO completable_future.p3.CompletableFutureTest - hello world

如果执行过程抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void test06() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (true) {
throw new RuntimeException("exception");
}
return "hello";
});
CompletableFuture<String> cf1 = cf.handle((s, throwable) -> {
if (throwable == null) {
logger.info(s);
return s + " world";
} else {
logger.error(throwable.getMessage());
return "exception happened";
}

});

logger.info(cf1.join());
}

执行结果如下:

1
2
3
18:48:25.769 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
18:48:26.775 [ForkJoinPool.commonPool-worker-1] ERROR completable_future.p3.CompletableFutureTest - java.lang.RuntimeException: exception
18:48:26.776 [main] INFO completable_future.p3.CompletableFutureTest - exception happened

可以看到,handle方法接收执行结果和异常,处理之后返回新的结果。

转换

CompletableFuture可以作为monad和functor。由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉CompletableFuture当计算完成的时候请执行某个function。而且我们还可以将这些操作串联起来,或者将CompletableFunction组合起来。

1
2
3
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

这一组函数的功能是当原来的CompletableFuture计算完后,将结果传递给函数fn,将fn的结果作为新的CompletableFuture计算结果。因此它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>

需要注意的是,这些转换并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。

它们与handle方法的区别在于handle方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。而thenApply方法只是用来处理正常值,因此一旦有异常就会抛出。

thenApply方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void test07() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = cf.thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
logger.info(s);
return s + " world";
}
});

logger.info(cf1.join());
}

执行结果如下:

1
2
3
20:22:24.537 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
20:22:25.542 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello
20:22:25.543 [main] INFO completable_future.p3.CompletableFutureTest - hello world

纯消费(执行Action)

上面的方法是当计算完成的时候,会生成新的计算结果(thenApply, handle),或者返回同样的计算结果whenCompleteCompletableFuture还提供了一种处理结果的方法,只对结果执行Action,而不返回新的计算值。因此计算值为Void

1
2
3
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

看它的参数类型也就明白了,它们是函数式接口Consumer,这个接口只有输入,没有返回值。

thenAccept方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void test08() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
cf.thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
logger.info(s);
}
});

cf.join();
}

执行结果如下:

1
2
20:28:30.580 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
20:28:31.588 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello

thenAcceptBoth

1
2
3
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

thenAcceptBoth接收另一个CompletionStageaction,当两个CompletionStage都正常完成计算后,就会执行提供的action,它用来组合另外一个异步的结果。

thenAcceptBoth方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void test09() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});

cf.thenAcceptBoth(cf1, (s, s2) -> {
logger.info(s + " " + s2);
}).join();
}

执行结果如下:

1
2
3
20:31:40.335 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start
20:31:40.335 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
20:31:42.344 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - hello world

runAfterBoth

1
2
3
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)

runAfterBoth是当两个CompletionStage都正常完成计算的时候,执行一个Runnable,这个Runnable并不使用计算的结果。

runAfterBoth方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void test10() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});

cf.runAfterBoth(cf1, () -> {
logger.info("end");
}).join();
}

执行结果如下:

1
2
3
20:34:07.085 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start
20:34:07.085 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
20:34:09.094 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - end

thenRun

1
2
3
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

thenRun当计算完成的时候会执行一个Runnable,与thenAccept不同,Runnable并不使用CompletableFuture计算的结果。

因此先前的CompletableFuture计算的结果被忽略,返回Completable<Void>类型的对象。

thenRun方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void test11() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});

cf.thenRun(() -> logger.info("end")).join();
}

执行结果如下:

1
2
20:36:05.833 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
20:36:06.843 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - end

组合

1
2
3
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

这一组方法接受一个Function作为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture。因此它的功能类似于:

1
A +--> B +---> C

thenCompose返回的对象并不一定是函数fn返回的对象,如果原来的CompletableFuture还没有计算出来,它就会生成一个新的组合后的CompletableFuture

thenCompose方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void test12() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf2 = cf.thenCompose(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
return CompletableFuture.supplyAsync(() -> {
logger.info(s);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + " world";
});
}
});
logger.info(cf2.join());
}

执行结果如下:

1
2
3
20:41:39.242 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
20:41:40.253 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello
20:41:42.255 [main] INFO completable_future.p3.CompletableFutureTest - hello world

thenCombine

1
2
3
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

thenCombine用来复合另外一个CompletionStage的结果,它的功能类似:

1
2
3
4
5
A +
|
+------> C
+------^
B +

两个CompletionStage是并行执行的,他们之间没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。

从功能上来讲,它们的功能更类似thenAcceptBoth,只不过thenAcceptBoth是纯消费,它的函数参数没有返回值,而thenCombine的函数参数fn有返回值。

thenCombine方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void test13() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});

CompletableFuture<String> cf2 = cf.thenCombine(cf1, (s, s2) -> s + " " + s2);
logger.info(cf2.join());
}

执行结果如下:

1
2
3
20:45:01.018 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
20:45:01.018 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start
20:45:03.028 [main] INFO completable_future.p3.CompletableFutureTest - hello world

Either

thenAcceptBothrunAfterBoth是当两个CompletableFuture都计算完成,而下面的方法是当任意一个CompletableFuture计算完成的时候就会执行。

1
2
3
4
5
6
7
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)

acceptEither方法是当任意一个CompletionStage完成的时候,action这个消费者就会被执行。这个方法返回CompletableFuture<Void>

applyToEither方法是当任意一个CompletionStage完成的时候,fn会被执行,它的返回值会当做新的CompletableFuture<U>的计算结果

acceptEither方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void test14() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});

cf.acceptEither(cf1, s -> logger.info(s)).join();
}

执行结果如下:

1
2
3
21:21:36.031 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
21:21:36.031 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start
21:21:37.039 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello

可以看到,当cf执行完毕后,acceptEither方法就被触发执行了

applyToEither方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void test15() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});

CompletableFuture<String> cf2 = cf.applyToEither(cf1, s -> {
logger.info(s);
return s + " end";
});
logger.info(cf2.join());
}

执行结果如下:

1
2
3
4
21:25:43.441 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
21:25:43.441 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start
21:25:44.447 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - hello
21:25:44.448 [main] INFO completable_future.p3.CompletableFutureTest - hello end

allOf 和 anyOf

1
2
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf方法是当所有的CompletableFuture都执行完后执行计算

anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同

anyOfapplyToEither不同,anyOf接受任意多的CompletableFuture但是applyToEither只是判断两个CompletableFutureanyOf返回值的计算结果是参数中其中一个CompletableFuture的计算结果,applyToEither返回值的计算结果却是要经过fn处理的。当然还有静态方法的区别,线程池的选择等。

allOf方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void test16() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});

CompletableFuture.allOf(cf, cf1).whenComplete((v, throwable) -> {
List<String> list = new ArrayList<>();
list.add(cf.join());
list.add(cf1.join());
logger.info("result {}", list);
}).join();
}

执行结果如下:

1
2
3
21:36:36.938 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
21:36:36.938 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start
21:36:38.951 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - result [hello, world]

anyOf方法的使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Test
public void test17() {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
logger.info("start");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});

CompletableFuture<Object> cf2 = CompletableFuture.anyOf(cf, cf1).whenComplete((o, throwable) -> logger.info("result {}", o));
logger.info("result {}", cf2.join());
}

执行结果如下:

1
2
3
4
21:43:12.562 [ForkJoinPool.commonPool-worker-2] INFO completable_future.p3.CompletableFutureTest - start
21:43:12.562 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - start
21:43:13.573 [ForkJoinPool.commonPool-worker-1] INFO completable_future.p3.CompletableFutureTest - result hello
21:43:13.576 [main] INFO completable_future.p3.CompletableFutureTest - result hello

https://colobu.com/2016/02/29/Java-CompletableFuture/
https://www.ibm.com/developerworks/cn/java/j-cf-of-jdk8/index.html
https://www.jianshu.com/p/dff9063e1ab6