异步编排CompletebleFuture

特点

在传统future上,只能异步调用get方法,获取计算结果,能做的事情十分有限,比如想要异步执行完任务之后再去执行另一个任务。传统future做这个事情就显得力不从心了。这就是CompletebleFuture解决的问题。CompletebleFuture提供了丰富的接口用来添加异步任务的回调函数。

简单使用

创建异步任务

创建异步任务主要有以下三种

1
CompletableFuture.supplyAsync(Supplier<U> supplier)
1
CompletableFuture.runAsync(Runnable runnable)
1
2
3
4
5
CompletableFuture<Object> future=new CompletableFuture<>();
new Thread(()->{
Object o=getO();
future.complete(o);
}).start();

添加回调函数

1
thenAccept(Consumer<? super T> action)

消费异步任务结果回调函数传入参数是一个 Consumer

1
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)

同过上一个异步任务的结果 转换为下个异步任务对象

1
thenApply(Function<? super T,? extends U> fn)

对异步任务的结果进行转换

1
2
thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)

组合当前异步任务和另一个异步任务(other)的结果。

1
thenRun(Runnable action)

异步任务完成后的回调函数

1
2
3
thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)

当两个异步任务都完成回调消费函数

1
2
acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)

当两个异步任务的一个完成时 回调消费函数

1
2
runAfterBoth(CompletionStage<?> other,
Runnable action)

当两个任务完成时 回调action

1
2
runAfterEither(CompletionStage<?> other,
Runnable action)

当两个异步任务的一个完成时 回调action

上面的函数都有一个异步版本 *Async()可以自己传入线程池,默认为ForkJoinPool.commonPool()

还有两个重要的回调函数

当完成时回调

a
1
2
whenComplete(
BiConsumer<? super T, ? super Throwable> action

抛出异常时回调

1
2
3
4
exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}

这两个函数有一个地方需要注意以下,使用whenComplete时,如果在异步任务中有异常抛出,在异步转同步的时候同样会抛出异常,而使用了exceptionally,在异步转同步的时候不会抛出异常。

注意2点 否则出了异常情况下调试起来很麻烦:

  1. 这里如果没有在exceptionally 中打出异常堆栈,转同步的时候,也不会抛异常,异步任务没有正常执行的的信息就会被吃掉。
  2. 没有调用转同步方法,也没有使用exceptionally ,异常信息也会被吃掉。

异步转同步

get() 阻塞直到获取任务结果,或者超时, 可以设置超时时间,抛出受检查异常,

join()阻塞直到获取任务结果,不抛出受检查异常

除了通过get,join来同步任务以外还可以通过下面两个方法

1
2
CompletableFuture.allOf(CompletableFuture<?>... cfs).join()//阻塞直到所有任务完成
CompletableFuture.anyOf(CompletableFuture<?>... cfs).join()//阻塞直到任一个任务完成

示例

1
2
3
4
5
6
7
8
9
10
11
12
List<CompletableFuture<IndicatorResultEntity>> futures = resultModelList.stream().map(x ->
CompletableFuture.supplyAsync(() -> {
return doSomething();
}, executorService)thenApply(x->{
return doSomething(x)
}).exceptionally(e -> {
IndicatorResultEntity resultEntity = new IndicatorResultEntity();
log.error("计算指标出错:{}", resultEntity.getExpressionStatement(), e);
return resultEntity;
})).collect(Collectors.toList());
#等待所有任务完成
CompletableFuture.anyOf(futures.stream().toArray(size -> new CompletableFuture[size])).join();