CompletableFuture 异步编程基础

1. CompletableFuture 类

public class CompletableFuture implements Future, CompletionStage {
}

1.1 CompletableFuture 工厂方法

public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
允许指定自定义的 Executor 来执行异步任务

public static CompletableFuture allOf(CompletableFuture>... cfs)
allOf() 主要用于并行执行多个异步任务,并等待所有任务都完成

1.2 CompletionStage 接口

public interface CompletionStage {
    // 异步回调
    public  CompletionStage thenApply(Function super T,? extends U> fn);
    public CompletionStage thenAccept(Consumer super T> action);
    public CompletionStage thenRun(Runnable action);
    public  CompletionStage thenCompose
        (Function super T, ? extends CompletionStage> fn);

    // 组合
    public  CompletionStage thenCombine
        (CompletionStage extends U> other,
         BiFunction super T,? super U,? extends V> fn);
}

2. 异步计算任务 supplyAsync()

2.1 supplyAsync() 方法

@Test
public void demo() {
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        return "Hello";
    });

    try {
        // 阻塞,等待 future 完成
        String result = future.get();
        System.out.println(result);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}
private final ExecutorService executor = new ThreadPoolExecutor(5, 20, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10));

@Test
public void joinDemo() {
    CompletableFuture future = CompletableFuture.supplyAsync(() -> System.currentTimeMillis(), executor);
    Long time = future.join();
}

2.2 示例

@Test
public void demo() throws Exception {
    ArrayList strings = Lists.newArrayList("111", "222");

    List> futures = strings.stream()
            .map(str -> CompletableFuture.supplyAsync(() -> str))
            .collect(Collectors.toList());

    List collect = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    log.info("result:" + collect);
}

2.3 回调函数 thenAccept()

@Test
public void demo() {
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        return "Hello";
    });

    // 注册回调函数,当异步任务完成时打印结果
    future.thenAccept(result -> {
        System.out.println(result);
    });
}

thenAccept() 注册一个回调函数,当 CompletableFuture 完成时,该函数将接收到结果字符串并将其打印出来

2.4 模拟耗时操作

@Test
public void thenAccept() {
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("supplyAsync: Hello");
        return "Hello";
    });

    log.info("start");
    // 回调函数
    future.thenAccept(str -> log.info(str));
    log.info("end");

    try {
        future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

3. 异步计算任务 runAsync()

@Test
public void runAsync() {
    CompletableFuture future = CompletableFuture.runAsync(() -> {
        log.info("runAsync start...");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("runAsync end...");
    });

    log.info("callback1...");
    // 回调函数
    CompletableFuture callbackFuture = future.thenRun(() -> {
        log.info("thenRun start...");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("thenRun end...");
    });
    log.info("callback2...");

    // future.join();
    callbackFuture.join();
}

打印结果:

09:34:34.881 [main] INFO com.juc.pool.Demo - callback1...
09:34:34.881 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync start...
09:34:34.885 [main] INFO com.pool.Demo - callback2...
09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync end...
09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun start...
09:34:39.890 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun end...

4. allOf()

4.1 等待

@Test
public void demo() {

    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
        log.info("future1 start");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("future1 end");
        return "future1";
    });

    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
        log.info("future2 start");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("future2 end");
        return "future2";
    });

    CompletableFuture allFuture = CompletableFuture.allOf(future1, future2);
    CompletableFuture thenFuture = allFuture.thenAccept(unused -> {
        String join1 = future1.join();
        String join2 = future2.join();
        System.out.println(join1 + ", " + join2);
    });

    log.info("end....");
    
    // 阻塞,等待回调执行完成
    // thenFuture.join();
    try {
        thenFuture.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

4.1 若改为直接 join()

CompletableFuture.allOf(future1, future2).join();
String join1 = future1.join();
String join2 = future2.join();
System.out.println(join1 + ", " + join2);

思考:
CompletableFuture.allOf().join()CompletableFuture.allOf().thenAccept().join() 区别与联系

阻塞时机:
直接 join() 立即阻塞,直到所有任务完成;
而 thenAccept().join() 先注册回调,异步执行回调,最后阻塞等待回调执行完成

结果处理:
直接 join() 不涉及结果处理;
thenAccept().join() 在所有任务完成后,执行特定的回调逻辑来处理结果

5. join() vs get()

相同点:
两者都是为了获取由CompletableFuture封装的异步操作完成后产生的最终结果

区别:
异常处理,get() 抛出一个受检异常(checked exception 必须处理), join() 抛出未检查异常(unchecked exception)

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

6. 计算

@Test
public void calculateDemo() {
    List values = new ArrayList();
    for (int i = 1; i >> futures = new ArrayList();
    for (Integer value : values) {
        futures.add(CompletableFuture.supplyAsync(() -> getList(value)));
    }

    // List> collect = futures.stream().map(CompletableFuture::join)
    //         .collect(Collectors.toList());

    List list = futures.stream().map(CompletableFuture::join)
            .flatMap(Collection::stream)
            .collect(Collectors.toList());

    System.out.println("collect.size: " + list.size());
}

public static List getList(Integer value) {
    List values = new ArrayList();
    for (int i = 0; i 

【信息由网络或者个人提供,如有涉及版权请联系COOY资源网邮箱处理】

© 版权声明
THE END
喜欢就支持一下吧
点赞6 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容