JEP 505(结构化并发第五次预览版)已被确定为 JDK 25 版本的正式特性(Targeted 状态)。该 API 历经五次预览迭代,旨在简化并行任务管理并为开发者提供更清晰、更安全的框架,特别是在使用虚拟线程相关场景。最新预览版优化了此前孵化版本(JEP 428、JEP 437)和预览版本(JEP 453、JEP 462、JEP 480、JEP 499)中引入的 API。最显著的变化是,StructuredTaskScope 不再通过公共构造函数实例化,开发者现在需通过静态工厂方法(如 StructuredTaskScope.open())开启作用域。这一改动明确了默认行为,并为更丰富的任务完成策略铺平了道路。
根据 JEP 规范,结构化并发主要解决并行编程中的三个核心问题:
1.严格限定子任务的生命周期仅在明确定义的父作用域内
2.实现可靠的取消机制,避免资源泄漏
3.通过结构化线程层级增强可观测性
该 API 的核心是java.util.concurrent.StructuredTaskScope类,它负责管理一组并发子任务。开发者在作用域内 fork 子任务,随后统一等待其完成(join),作用域将自动维护这些任务的执行边界。
以下面这段代码为例:
try (var scope = StructuredTaskScope.open()) { Subtask<String> user = scope.fork(() -> fetchUser(userId)); Subtask<List<Order>> orders = scope.fork(() -> fetchOrders(userId)); scope.join(); // Wait for all subtasks // Process results or handle exceptions String userName = user.get(); List<Order> userOrders = orders.get();}
这段代码展示了结构化并发的基本使用模式。它通过新的工厂方法创建作用域,并行派发两个任务(获取用户数据和订单数据),使用join()等待两者完成,最后获取结果。该作用域能确保在退出代码块时,所有子任务要么已完成,要么被自动取消。
作为对比,这里展示了相同功能在之前结构化并发 API 预览版中的实现方式:
try (var scope = new StructuredTaskScope<>()) { Subtask<String> user = scope.fork(() -> fetchUser(userId)); Subtask<List<Order>> orders = scope.fork(() -> fetchOrders(userId)); scope.join(); // Process results or handle exceptions String userName = user.get(); List<Order> userOrders = orders.get();}
虽然基本结构类似,但第五次预览版引入了诸如StructuredTaskScope.open()的工厂方法,取代了基于构造函数的实例化方式。这一改进提升了 API 的可读性,同时为库的维护者提供了更大灵活性,使得未来的演进不会破坏其兼容性。
无参的open()工厂方法会创建一个“快速失败”的作用域:如果任何子任务抛出异常,其余任务会被中断,且join()会重新抛出该异常。开发者也可以通过open(Joiner)提供自定义策略,例如:
// Return the first successful result, cancel the rest<T> T race(Collection<Callable<T>> tasks) throws InterruptedException { try (var scope = StructuredTaskScope.open( Joiner.<T>anySuccessfulResultOrThrow())) { tasks.forEach(scope::fork); return scope.join(); }}
每次 fork 都会会启动一个子任务(默认在虚拟线程上执行),并返回一个Subtask句柄,其get()方法仅在join()完成后调用才是安全的。该作用域强制结构化约束:若从非属主线程调用fork()或join(),或在未关闭作用域的情况下退出代码块,都将抛出StructureViolationException。
工厂方法allSuccessfulOrThrow()返回一个新的合并器(joiner):当所有子任务都成功完成时,它会生成一个包含所有子任务的流。
<T> List<T> runConcurrently(Collection<Callable<T>> tasks) throws InterruptedException { try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow())) { tasks.forEach(scope::fork); return scope.join().map(Subtask::get).toList(); }}若一个或多个子任务失败,则 join() 会抛出 FailedException,并以其中一个失败子任务的异常作为其根因。
Joiner 接口提供了三个额外的工厂方法:awaitAll() 会返回一个等待所有子任务完成(无论成功与否)的新合并器,;awaitAllSuccessfulOrThrow() 会返回一个要求所有子任务必须全部成功完成的新合并器;allUntil(Predicate 会返回一个新的合并器,在所有子任务成功完成,或某个已完成子任务满足谓词条件时,取消当前作用域并返回所有子任务的流。
在使用任何 Joiner 时,必须为每个 StructuredTaskScope 创建新的 Joiner 实例。Joiner 对象不可跨作用域复用,也不应在作用域关闭后重复使用。
开发者可直接实现 Joiner 接口以自定义完成策略。该接口包含两个泛型参数:表示作用域内子任务返回类型的 T;表示 join() 方法返回类型的 R。
其接口定义如下:
public interface Joiner<T, R> { public default boolean onFork(Subtask<? extends T> subtask); public default boolean onComplete(Subtask<? extends T> subtask); public R result() throws Throwable;}
当调用 fork() 创建子任务时,会触发 onFork() 方法;而当子任务完成时,则会调用 onComplete() 方法。
规范还明确了一点,作用域内的子任务会继承 ScopedValue 的绑定值。如果作用域的所有者线程从已绑定的 ScopedValue 读取了某个值,那么每个子任务读取到的也将是同一个值。
此外,该 JEP 还扩展了为虚拟线程引入的 JSON 线程转储格式,新增了对 StructuredTaskScope 的支持,能够以层级结构展示线程分组关系:
$jcmd <pid> Thread.dump_to_file -format=json <file>
每个作用域对应的 JSON 对象包含该作用域内派发的所有线程数组和各线程对应的调用栈信息(stack traces)。
作为预览特性,OpenJDK 团队鼓励开发者在 JDK 25 中试用这第五个迭代版本并积极反馈,这些反馈对 API 的最终定型至关重要。
查看英文原文:JEP 505 Delivers Fifth Preview of Java's Structured Concurrency with Key API Refinements





