有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java中无限流的并行处理

为什么下面的代码不打印任何输出,而如果我们删除parallel,它会打印0,1

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);

虽然我知道理想情况下,限制应该放在distinct之前,但我的问题更多地与添加并行处理导致的差异有关


共 (4) 个答案

  1. # 1 楼答案

    Stream.iterate返回“无限顺序流”。因此,使序列流并行并不太有用

    根据Stream package的描述:

    For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.

    在你的例子中似乎是这样的,使用unordered(),它会打印0,1

        IntStream.iterate(0, i -> (i + 1) % 2)
                .parallel()
                .unordered()
                .distinct()
                .limit(10)
                .forEach(System.out::println);
    
  2. # 2 楼答案

    真正的原因是,有序并行.distinct()是全势垒操作,如文档中的described

    Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier, with substantial buffering overhead), and stability is often not needed.

    “全屏障操作”指的是,在下游开始之前,必须执行所有上游操作。流API中只有两个完整的屏障操作:.sorted()(每次)和.distinct()(在有序并行情况下)。由于向.distinct()提供了非短路无限流,最终得到了无限循环。根据约定.distinct()不能以任何顺序向下游发送元素:它应该始终发送第一个重复元素。虽然理论上可以更好地实现并行有序.distinct(),但实现起来要复杂得多

    至于解决方案,@user140547是正确的:在.distinct()之前添加.unordered()这将distinct()算法切换为无序算法(只使用共享ConcurrentHashMap存储所有观察到的元素,并将每个新元素发送到下游)。请注意,在.distinct()之后添加.unordered()不会有帮助

  3. # 3 楼答案

    我知道代码是不正确的,正如解决方案中所建议的,如果我们在distinct之前移动限制,我们就不会有无限循环

    并行函数使用fork和join的概念来分配工作,它为工作分配所有可用的线程,而不是单个线程

    我们正确地期待无限循环,因为多个线程无限地处理数据,并且没有任何东西阻止它们,因为10的限制在distinct之后永远不会达到

    它可能一直在尝试分叉,而从未试图加入推动它前进的行列。但我仍然认为这是java的一个缺陷,它比其他任何东西都重要

  4. # 4 楼答案

    这段代码有一个主要问题,即使没有并行代码: 之后distinct(),流将只有2个元素——因此限制永远不会生效——它将打印这两个元素,然后继续无限期地浪费CPU时间。但这可能正是你想要的

    有了平行和限制,我相信由于工作分工的方式,问题会进一步恶化。我还没有一路追踪并行流代码,但我猜:

    并行代码在多个线程之间分配工作,这些线程都会无限期地运行,因为它们永远不会完成配额。系统可能会等待每个线程完成,这样就可以组合它们的结果,以确保顺序的清晰性——但在您提供的情况下,这种情况永远不会发生

    在没有订单要求的情况下,每个辅助线程的结果可以在对照全局差异集进行检查后立即使用

    毫无限制地,我怀疑不同的代码被用来处理无限的流:不是等待所需的10个流填满,而是报告发现的结果。这有点像制作一个迭代器,报告hasNext()=true,首先生成0,然后生成1,然后next()调用永远挂起,而不生成结果——在并行情况下,有些东西在等待多个报告,以便在输出之前正确地组合/排序它们,而在串行情况下,它会执行可以挂起的操作

    我会尝试找出调用堆栈中有无distinct()或limit()的确切区别,但到目前为止,似乎很难找到相当复杂的流库调用序列