CompletableFuture.allOf() 异步编程示例

1. 示例

1.1 定义线程池

@Slf4j
public class CompletableFutureDemo {

    /**
     * 获取处理器数量
     * int core = Runtime.getRuntime().availableProcessors();
     */
    private static AtomicInteger threadId = new AtomicInteger(1);
    private final ExecutorService pool = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 3,
            1000L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue(50),
            runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName("pool-%d" + threadId.getAndIncrement());
                thread.setPriority(Thread.NORM_PRIORITY);
                thread.setDaemon(true);
                return thread;
            },
            new ThreadPoolExecutor.AbortPolicy());
}

1.1 无返回值 runAsync()

@Test
public void allOf() {
    Map resultMap = new HashMap();

    SqlRequest request = new SqlRequest();
    request.setTradeId("20240410");

    CompletableFuture future1 = CompletableFuture.runAsync(() -> {
        SqlRequest newRequest = new SqlRequest();
        BeanUtils.copyProperties(request, newRequest);
        newRequest.setTpCode("SF");
        resultMap.put("future1", newRequest.toString());
    }, pool);

    CompletableFuture future2 = CompletableFuture.runAsync(() -> {
        SqlRequest newRequest = new SqlRequest();
        BeanUtils.copyProperties(request, newRequest);
        newRequest.setTpCode("ZTO");
        resultMap.put("future2", newRequest.toString());
    }, pool);

    // 阻塞主线程,直到 future1, future2 都完成
    CompletableFuture.allOf(future1, future2).join();
    System.out.println(resultMap);
}
@Data
public class SqlRequest implements Serializable {
    private static final long serialVersionUID = -5480600656586378430L;

    private String tradeId;
    private String mailNo;
    private String tpCode;
}

1.2 有返回值 supplyAsync()

@Test
public void allOf2() {
    SqlRequest request = new SqlRequest();
    request.setTradeId("20240410");

    CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
        SqlRequest newRequest = new SqlRequest();
        BeanUtils.copyProperties(request, newRequest);
        newRequest.setTpCode("SF");
        return newRequest.toString();
    }, pool);

    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
        SqlRequest newRequest = new SqlRequest();
        BeanUtils.copyProperties(request, newRequest);
        newRequest.setTpCode("ZTO");
        return newRequest.toString();
    }, pool);

    CompletableFuture.allOf(future1, future2).join();
    System.out.println("All tasks completed!");
    String join1 = future1.join();
    String join2 = future2.join();

    System.out.println(join1 + ", " + join2);
}

1.3 非阻塞操作方式

@Test
public void allOf2() {

    CompletableFuture future1 = ...;
    CompletableFuture future2 = ...;

    // 调用 thenAccept() 方法注册一个回调函数,不会阻塞当前线程
    CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2);
    combinedFuture.thenAccept(aVoid -> {
        String join1 = future1.join();
        String join2 = future2.join();
    });
}

1.4 thenApply() 方式

@Test
public void allOf2() {
    CompletableFuture future1 = ...;
    CompletableFuture future2 = ...;

    List> futures = Lists.newArrayList(future1, future2);
    List list = allRun(futures);
    System.out.println(list);
    }
}
public static  List allRun(List> futures) {
    CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    CompletableFuture> resultFuture = allDoneFuture
            .thenApply(unused -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));

    try {
        List res = resultFuture.get();
        return res;
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
        return new ArrayList();
    }
}

其他:

public static  CompletableFuture> allRun(List> futures) {
    CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(
            unused -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}

2. 分析

2.1 CompletableFuture 类

// 无返回值
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)

// 有返回值
public static  CompletableFuture supplyAsync(Supplier supplier)
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor)

2.2 异步回调

无需阻塞等待

public CompletableFuture thenAccept(Consumer super T> action)

public  CompletableFuture thenApply(Function super T,? extends U> fn)
方法 处理结果的方式 返回值类型 使用场景
thenAccept() 消费结果(无返回值) CompletableFuture 对结果进行副作用操作,无需新值
thenApply() 计算新值(有返回值) CompletableFuture 对结果进行计算,生成新值供后续使用

3. ThreadFactory 接口

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

示例:

// 原子整数
private final AtomicInteger threadId = new AtomicInteger(1);

ThreadFactory factory = new ThreadFactory() {
    @Override
    public Thread newThread(@NotNull Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("pool-%d" + threadId.getAndIncrement());
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setDaemon(true);
        return thread;
    }
};

4. 线程池拒绝策略

拒绝策略 描述 特点
AbortPolicy 默认,直接抛出 RejectedExecutionException,阻止系统正常运行
CallerRunsPolicy “调用者运行”策略,不抛弃任务,也不抛出异常(当线程池无法接受新任务时,由提交该任务的线程(即调用者线程)负责运行这个被拒绝的任务) 避免任务丢失,但会延迟响应
DiscardOldestPolicy 抛弃队列中等待时间最长的任务 牺牲等待时间最长的任务
DiscardPolicy 直接丢弃任务,不做任何处理,也不抛出异常

【信息由网络或者个人提供,如有涉及版权请联系COOY资源网邮箱处理】

© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容