1. CompletableFuture 类
public class CompletableFuture implements Future, CompletionStage {
}
1.1 CompletableFuture 工厂方法
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
public static CompletableFuture
允许指定自定义的 Executor 来执行异步任务
public static CompletableFuture
allOf() 主要用于并行执行多个异步任务,并等待所有任务都完成
1.2 CompletionStage 接口
public interface CompletionStage {
// 异步回调
public CompletionStage thenApply(Function super T,? extends U> fn);
public CompletionStage thenAccept(Consumer super T> action);
public CompletionStage thenRun(Runnable action);
public CompletionStage thenCompose
(Function super T, ? extends CompletionStage> fn);
// 组合
public CompletionStage thenCombine
(CompletionStage extends U> other,
BiFunction super T,? super U,? extends V> fn);
}
2. 异步计算任务 supplyAsync()
2.1 supplyAsync() 方法
@Test
public void demo() {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
try {
// 阻塞,等待 future 完成
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private final ExecutorService executor = new ThreadPoolExecutor(5, 20, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10));
@Test
public void joinDemo() {
CompletableFuture future = CompletableFuture.supplyAsync(() -> System.currentTimeMillis(), executor);
Long time = future.join();
}
2.2 示例
@Test
public void demo() throws Exception {
ArrayList strings = Lists.newArrayList("111", "222");
List> futures = strings.stream()
.map(str -> CompletableFuture.supplyAsync(() -> str))
.collect(Collectors.toList());
List collect = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
log.info("result:" + collect);
}
2.3 回调函数 thenAccept()
@Test
public void demo() {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 注册回调函数,当异步任务完成时打印结果
future.thenAccept(result -> {
System.out.println(result);
});
}
thenAccept() 注册一个回调函数,当 CompletableFuture 完成时,该函数将接收到结果字符串并将其打印出来
2.4 模拟耗时操作
@Test
public void thenAccept() {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("supplyAsync: Hello");
return "Hello";
});
log.info("start");
// 回调函数
future.thenAccept(str -> log.info(str));
log.info("end");
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
3. 异步计算任务 runAsync()
@Test
public void runAsync() {
CompletableFuture future = CompletableFuture.runAsync(() -> {
log.info("runAsync start...");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("runAsync end...");
});
log.info("callback1...");
// 回调函数
CompletableFuture callbackFuture = future.thenRun(() -> {
log.info("thenRun start...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("thenRun end...");
});
log.info("callback2...");
// future.join();
callbackFuture.join();
}
打印结果:
09:34:34.881 [main] INFO com.juc.pool.Demo - callback1...
09:34:34.881 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync start...
09:34:34.885 [main] INFO com.pool.Demo - callback2...
09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync end...
09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun start...
09:34:39.890 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun end...
4. allOf()
4.1 等待
@Test
public void demo() {
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
log.info("future1 start");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("future1 end");
return "future1";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
log.info("future2 start");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("future2 end");
return "future2";
});
CompletableFuture allFuture = CompletableFuture.allOf(future1, future2);
CompletableFuture thenFuture = allFuture.thenAccept(unused -> {
String join1 = future1.join();
String join2 = future2.join();
System.out.println(join1 + ", " + join2);
});
log.info("end....");
// 阻塞,等待回调执行完成
// thenFuture.join();
try {
thenFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
4.1 若改为直接 join()
CompletableFuture.allOf(future1, future2).join();
String join1 = future1.join();
String join2 = future2.join();
System.out.println(join1 + ", " + join2);
思考:CompletableFuture.allOf().join()
与 CompletableFuture.allOf().thenAccept().join()
区别与联系
阻塞时机:
直接 join() 立即阻塞,直到所有任务完成;
而 thenAccept().join() 先注册回调,异步执行回调,最后阻塞等待回调执行完成
结果处理:
直接 join() 不涉及结果处理;
thenAccept().join() 在所有任务完成后,执行特定的回调逻辑来处理结果
5. join() vs get()
相同点:
两者都是为了获取由CompletableFuture封装的异步操作完成后产生的最终结果
区别:
异常处理,get() 抛出一个受检异常(checked exception 必须处理), join() 抛出未检查异常(unchecked exception)
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
6. 计算
@Test
public void calculateDemo() {
List values = new ArrayList();
for (int i = 1; i >> futures = new ArrayList();
for (Integer value : values) {
futures.add(CompletableFuture.supplyAsync(() -> getList(value)));
}
// List> collect = futures.stream().map(CompletableFuture::join)
// .collect(Collectors.toList());
List list = futures.stream().map(CompletableFuture::join)
.flatMap(Collection::stream)
.collect(Collectors.toList());
System.out.println("collect.size: " + list.size());
}
public static List getList(Integer value) {
List values = new ArrayList();
for (int i = 0; i
【信息由网络或者个人提供,如有涉及版权请联系COOY资源网邮箱处理】
暂无评论内容