1. 示例
1.1 定义线程池
@Slf4j
public class CompletableFutureDemo {
/**
* 获取处理器数量
* int core = Runtime.getRuntime().availableProcessors();
*/
private static AtomicInteger threadId = new AtomicInteger(1);
private final ExecutorService pool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 3,
1000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(50),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("pool-%d" + threadId.getAndIncrement());
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.AbortPolicy());
}
1.1 无返回值 runAsync()
@Test
public void allOf() {
Map resultMap = new HashMap();
SqlRequest request = new SqlRequest();
request.setTradeId("20240410");
CompletableFuture future1 = CompletableFuture.runAsync(() -> {
SqlRequest newRequest = new SqlRequest();
BeanUtils.copyProperties(request, newRequest);
newRequest.setTpCode("SF");
resultMap.put("future1", newRequest.toString());
}, pool);
CompletableFuture future2 = CompletableFuture.runAsync(() -> {
SqlRequest newRequest = new SqlRequest();
BeanUtils.copyProperties(request, newRequest);
newRequest.setTpCode("ZTO");
resultMap.put("future2", newRequest.toString());
}, pool);
// 阻塞主线程,直到 future1, future2 都完成
CompletableFuture.allOf(future1, future2).join();
System.out.println(resultMap);
}
@Data
public class SqlRequest implements Serializable {
private static final long serialVersionUID = -5480600656586378430L;
private String tradeId;
private String mailNo;
private String tpCode;
}
1.2 有返回值 supplyAsync()
@Test
public void allOf2() {
SqlRequest request = new SqlRequest();
request.setTradeId("20240410");
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
SqlRequest newRequest = new SqlRequest();
BeanUtils.copyProperties(request, newRequest);
newRequest.setTpCode("SF");
return newRequest.toString();
}, pool);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
SqlRequest newRequest = new SqlRequest();
BeanUtils.copyProperties(request, newRequest);
newRequest.setTpCode("ZTO");
return newRequest.toString();
}, pool);
CompletableFuture.allOf(future1, future2).join();
System.out.println("All tasks completed!");
String join1 = future1.join();
String join2 = future2.join();
System.out.println(join1 + ", " + join2);
}
1.3 非阻塞操作方式
@Test
public void allOf2() {
CompletableFuture future1 = ...;
CompletableFuture future2 = ...;
// 调用 thenAccept() 方法注册一个回调函数,不会阻塞当前线程
CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.thenAccept(aVoid -> {
String join1 = future1.join();
String join2 = future2.join();
});
}
1.4 thenApply() 方式
@Test
public void allOf2() {
CompletableFuture future1 = ...;
CompletableFuture future2 = ...;
List> futures = Lists.newArrayList(future1, future2);
List list = allRun(futures);
System.out.println(list);
}
}
public static List allRun(List> futures) {
CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
CompletableFuture> resultFuture = allDoneFuture
.thenApply(unused -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
try {
List res = resultFuture.get();
return res;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return new ArrayList();
}
}
其他:
public static CompletableFuture> allRun(List> futures) {
CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(
unused -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
2. 分析
2.1 CompletableFuture 类
// 无返回值
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
// 有返回值
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
2.2 异步回调
无需阻塞等待
public CompletableFuture thenAccept(Consumer super T> action)
public CompletableFuture thenApply(Function super T,? extends U> fn)
方法 | 处理结果的方式 | 返回值类型 | 使用场景 |
---|---|---|---|
thenAccept() | 消费结果(无返回值) | CompletableFuture | 对结果进行副作用操作,无需新值 |
thenApply() | 计算新值(有返回值) | CompletableFuture | 对结果进行计算,生成新值供后续使用 |
3. ThreadFactory 接口
public interface ThreadFactory {
Thread newThread(Runnable r);
}
示例:
// 原子整数
private final AtomicInteger threadId = new AtomicInteger(1);
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName("pool-%d" + threadId.getAndIncrement());
thread.setPriority(Thread.NORM_PRIORITY);
thread.setDaemon(true);
return thread;
}
};
4. 线程池拒绝策略
拒绝策略 | 描述 | 特点 |
---|---|---|
AbortPolicy | 默认,直接抛出 RejectedExecutionException,阻止系统正常运行 | |
CallerRunsPolicy | “调用者运行”策略,不抛弃任务,也不抛出异常(当线程池无法接受新任务时,由提交该任务的线程(即调用者线程)负责运行这个被拒绝的任务) | 避免任务丢失,但会延迟响应 |
DiscardOldestPolicy | 抛弃队列中等待时间最长的任务 | 牺牲等待时间最长的任务 |
DiscardPolicy | 直接丢弃任务,不做任何处理,也不抛出异常 |
【信息由网络或者个人提供,如有涉及版权请联系COOY资源网邮箱处理】
© 版权声明
部分内容为互联网分享,若有侵权请联系站长删除。
THE END
暂无评论内容