全部版块 我的主页
论坛 数据科学与人工智能 IT基础 JAVA语言开发
110 0
2025-12-01

长期以来,并发编程一直是 Java 开发中的一大挑战。虽然传统工具如

ExecutorService

Future

在一定程度上满足了需求,但它们允许子任务独立于父任务长期运行,容易导致线程泄漏、取消困难等问题。结构化并发通过将多个相关任务视为一个整体工作单元,从根本上解决了这些缺陷。它不仅简化了错误处理与任务取消流程,还显著增强了程序的可靠性与可观测性。

非结构化并发带来的挑战

在典型的非结构化并发模式中,一个线程创建执行器,另一个提交任务,而实际执行任务的线程则与前两者无直接关联。这意味着,某个线程提交任务后,完全不同的线程可以等待其结果——只要持有

Future

的引用,任何代码都可以连接并等待该任务完成,甚至跨线程进行。

这种松散的结构带来了诸多问题:当父任务未能正确终止子任务时,就会发生线程泄漏;由于缺乏统一协调机制,多个子任务的取消操作往往延迟或失效;同时,任务之间的层级关系在运行时未被记录,导致监控和调试极为困难。

// 非结构化:关系是隐式且脆弱的
ExecutorService executor = Executors.newCachedThreadPool();
Future<User> userFuture = executor.submit(() -> fetchUser(id));
Future<Orders> ordersFuture = executor.submit(() -> fetchOrders(id));

// 如果 fetchUser 失败会发生什么?
// 谁负责关闭执行器?
// 如果我们忘记清理,线程会泄漏吗?

StructuredTaskScope 简介

结构化并发的核心是

java.util.concurrent

包中的关键类

StructuredTaskScope

。该类允许开发者将一组并发执行的子任务作为一个整体来管理。使用

StructuredTaskScope

,可以在独立线程中分叉各个子任务,并在主任务继续之前统一汇合等待所有子任务完成。

其典型使用流程包括以下几个步骤:

  • 通过 try-with-resources 创建
  • StructuredTaskScope
  • 将每个子任务定义为
  • Callable
  • 在各自的线程中启动分叉
  • 调用汇合方法等待完成
  • 处理各子任务返回的结果

以下是一个获取天气信息的真实场景示例:

WeatherReport getWeatherReport(String location)
        throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Supplier<Temperature> temperature =
            scope.fork(() -> getTemperature(location));
        Supplier<Humidity> humidity =
            scope.fork(() -> getHumidity(location));
        Supplier<WindSpeed> windSpeed =
            scope.fork(() -> getWindSpeed(location));

        scope.join()           // 汇合所有子任务
             .throwIfFailed(); // 如果有任何失败,传播错误

        // 全部成功,组合结果
        return new WeatherReport(
            location,
            temperature.get(),
            humidity.get(),
            windSpeed.get()
        );
    }
}

其中,try-with-resources 结构至关重要——它确保作用域在退出时被正确关闭,自动取消仍在运行的子任务,从而避免资源泄漏。

利用关闭策略实现短路控制

短路机制使主任务能够在获得所需结果后立即中断其余不必要的子任务,提升效率。结构化并发提供了两种内置的关闭策略来应对常见情况:

ShutdownOnFailure:全量成功模式

当需要所有子任务都成功完成时,可使用

ShutdownOnFailure

。一旦任一子任务失败,整个作用域会立即取消其他仍在运行的任务:

Response handleRequest(String userId) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Subtask<User> user = scope.fork(() -> fetchUser(userId));
        Subtask<Profile> profile = scope.fork(() -> fetchProfile(userId));
        Subtask<Settings> settings = scope.fork(() -> fetchSettings(userId));

        scope.join().throwIfFailed();

        // 如果有任何失败,我们永远不会到达这里
        return new Response(user.get(), profile.get(), settings.get());
    }
}

例如,若

fetchUser()

抛出异常,则获取用户配置文件和系统设置的操作将被即时终止,避免无效计算和线程堆积。

ShutdownOnSuccess:首个成功即止模式

在某些场景下,只需任一子任务成功即可,比如从多个数据中心查询数据或尝试备用服务:

String fetchFromMultipleSources(String key) throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
        scope.fork(() -> fetchFromPrimaryDB(key));
        scope.fork(() -> fetchFromCache(key));
        scope.fork(() -> fetchFromBackup(key));

        scope.join();

        // 返回第一个成功的结果
        return scope.result();
    }
}

一旦有任意子任务成功完成,作用域便会自动取消其余任务。这种“竞速”模式特别适用于对响应延迟敏感的应用场景。

自定义关闭策略的实现

实践中,大多数对

StructuredTaskScope

的使用并非直接实例化基类,而是继承其两个预置子类之一,或自行扩展以实现特定逻辑。例如,以下是一种收集所有成功结果并忽略失败任务的自定义策略:

class AllSuccessesScope<T> extends StructuredTaskScope<T> {
    private final List<T> results =
        Collections.synchronizedList(new ArrayList<>());

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS) {
            results.add(subtask.get());
        }
    }

    public List<T> getResults() {
        return List.copyOf(results);
    }
}

// 用法
List<Data> collectAll() throws InterruptedException {
    try (var scope = new AllSuccessesScope<Data>()) {
        for (String source : dataSources) {
            scope.fork(() -> fetchData(source));
        }
        scope.join();
        return scope.getResults();
    }
}

虚拟线程:理想的协作伙伴

虚拟线程使得创建海量线程成为可能,而结构化并发则提供了安全协调这些线程的能力。更重要的是,它让可观测性工具能够以符合开发人员直觉的方式展示线程间的父子关系。

这一组合极具优势:虚拟线程极大降低了线程创建的成本,而结构化并发确保即使面对百万级并发任务,也能做到安全管控、有序释放。

// 现在启动 10,000 个并发任务是可行的
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    for (int i = 0; i < 10_000; i++) {
        final int taskId = i;
        scope.fork(() -> processTask(taskId));
    }
    scope.join().throwIfFailed();
}

若采用平台线程实现类似行为,系统很可能会因资源耗尽而崩溃。但结合虚拟线程与结构化并发,此类高并发场景变得既简单又可靠。

模块系统的注意事项

在构建模块化应用时,理解 Java 模块系统对于正确使用结构化并发至关重要。反射在模块环境下不再具备“特权”,其访问能力受限于编译期可见性规则——只能访问导出包中的公共类及其公共成员。

默认情况下,仅

module-info.java

中明确导出的包对外可见。若应用程序依赖反射框架(如 Spring 或 Hibernate),则需额外声明开放权限:

module com.example.app {
    // 用于编译时访问的常规导出
    exports com.example.api;

    // 为运行时反射访问开放
    opens com.example.entities to org.hibernate.orm.core;

    requires java.base;
    requires org.hibernate.orm.core;
}

在编译阶段,被 open 的包仍保持封装状态,如同该指令不存在;但在运行时,这些包内的类型可通过反射自由访问所有成员,无论是否公开。

若需对模块内所有包开放完整反射权限,可声明为开放模块:

open module com.example.app {
    exports com.example.api;
    requires java.base;
}

这相当于将模块中的每一个包都单独使用

opens

指令开放,虽便捷但牺牲了一定程度的封装性。

可观测性与调试支持

结构化并发大幅提升了系统的可观测性。线程转储现在能清晰呈现任务间的父子层次关系:

jcmd <pid> Thread.dump_to_file -format=json output.json

生成的 JSON 输出中包含了

StructuredTaskScope

实例及其分叉出的子任务数组,使开发者能直观理解当前运行的任务及其上下文。相比以往扁平、隐式关联的线程视图,这是一种根本性的改进。

API 演进历程

结构化并发最初由 JEP 428 提出,作为孵化 API 首次出现在 JDK 19 中,在 JDK 20 中重新孵化,并通过 JEP 453 在 JDK 21 中进入预览阶段,随后在 JDK 22 和 JDK 23 中持续迭代预览。到 JDK 25 时,该 API 已进一步演进,采用静态工厂方法替代原有的公共构造函数,提升了封装性与使用一致性。

要在当前 JDK 版本中使用结构化并发,必须启用预览功能:

# 编译
javac --release 21 --enable-preview MyApp.java

# 运行
java --enable-preview MyApp

根据实际应用中的反馈,该 API 正在逐步成熟并趋于稳定。结构化并发已被验证为一种安全、表达性强且易于理解的并发编程方式。这一理念最早由 Python 的相关库提出,随后被 Kotlin 等语言采纳和发展。

最佳实践

优先采用 Try-With-Resources 机制

为防止线程泄漏,必须确保作用域被正确关闭。因此,不应手动控制并发资源的生命周期。

StructuredTaskScope

合理选择执行策略

当需要所有任务结果时,应使用

ShutdownOnFailure

;在竞速场景下(即取最快完成的结果),推荐使用

ShutdownOnSuccess

;对于特殊需求,也可以实现自定义策略。

结合虚拟线程使用效果更佳

结构化并发与虚拟线程搭配使用时优势最为明显,能够以简洁的代码实现极高规模的并发处理能力。

避免共享可变状态

尽管结构化并发能有效管理任务间的协调关系,但开发者仍需自行保证对共享数据访问的线程安全性。

考虑使用作用域值传递上下文

为了在嵌套的任务层级中传递上下文信息,作用域值(JEP 481)相比传统方法提供了更优的解决方案。

ThreadLocal

实战示例:用户数据聚合

下面是一个从多个服务源获取并整合用户信息的完整案例:

public class UserAggregator {
    record UserData(User user, List<Order> orders,
                    Stats stats, Recommendations recs) {}

    public UserData aggregate(String userId)
            throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<User> user =
                scope.fork(() -> userService.fetch(userId));
            Supplier<List<Order>> orders =
                scope.fork(() -> orderService.fetch(userId));
            Supplier<Stats> stats =
                scope.fork(() -> statsService.compute(userId));
            Supplier<Recommendations> recs =
                scope.fork(() -> mlService.recommend(userId));

            scope.join().throwIfFailed();

            return new UserData(
                user.get(),
                orders.get(),
                stats.get(),
                recs.get()
            );
        }
    }
}

这种实现方式具备良好的简洁性、安全性和效率。一旦某个服务调用失败,其余并行任务将立即被取消,避免资源浪费。同时,作用域机制保障了资源的及时释放。配合虚拟线程,该模式可轻松支持成千上万的并发请求。

开发者视角

Java 架构团队刻意未让

fork

方法返回

Future

实例,目的是避免与非结构化的异步计算混淆,并与旧有并发模型明确区分。这一设计体现了结构化并发并非简单的功能增强,而是一种全新的编程范式。

据 Rock the JVM 教程分析,结构化并发终于使 Java 原生具备了其他 JVM 语言早已通过 Kotlin 协程、Scala Cats Effects Fibers 等工具实现的能力,并且获得了官方平台级的支持。

未来展望

结构化并发标志着我们在并发编程思维上的根本转变——不再专注于单个线程或 Future 的管理,而是以层次化的方式组织并发任务,正如我们使用函数和循环来组织顺序执行的代码一样。

其带来的优势十分显著:杜绝线程泄漏、实现正确的异常传播、支持协同取消机制,并提升系统的可观测性。结合虚拟线程的应用,Java 如今提供了一种既强大又易于使用的并发模型。

随着该 API 逐渐定型,预计将在各类框架和库中得到广泛集成。Spring、Hibernate 等主流生态项目已在探索如何利用结构化并发编写出更加清晰、可靠的并发代码。

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群