Java中无限流的并行处理
为什么下面的代码不打印任何输出,而如果我们删除parallel,它会打印0,1
IntStream.iterate(0, i -> ( i + 1 ) % 2)
.parallel()
.distinct()
.limit(10)
.forEach(System.out::println);
虽然我知道理想情况下,限制应该放在distinct之前,但我的问题更多地与添加并行处理导致的差异有关
# 1 楼答案
Stream.iterate返回“无限顺序流”。因此,使序列流并行并不太有用
根据Stream package的描述:
在你的例子中似乎是这样的,使用unordered(),它会打印0,1
# 2 楼答案
真正的原因是,有序并行
.distinct()
是全势垒操作,如文档中的described:“全屏障操作”指的是,在下游开始之前,必须执行所有上游操作。流API中只有两个完整的屏障操作:
.sorted()
(每次)和.distinct()
(在有序并行情况下)。由于向.distinct()
提供了非短路无限流,最终得到了无限循环。根据约定.distinct()
不能以任何顺序向下游发送元素:它应该始终发送第一个重复元素。虽然理论上可以更好地实现并行有序.distinct()
,但实现起来要复杂得多至于解决方案,@user140547是正确的:在
.distinct()
之前添加.unordered()
这将distinct()
算法切换为无序算法(只使用共享ConcurrentHashMap
存储所有观察到的元素,并将每个新元素发送到下游)。请注意,在.distinct()
之后添加.unordered()
不会有帮助# 3 楼答案
我知道代码是不正确的,正如解决方案中所建议的,如果我们在distinct之前移动限制,我们就不会有无限循环
并行函数使用fork和join的概念来分配工作,它为工作分配所有可用的线程,而不是单个线程
我们正确地期待无限循环,因为多个线程无限地处理数据,并且没有任何东西阻止它们,因为10的限制在distinct之后永远不会达到
它可能一直在尝试分叉,而从未试图加入推动它前进的行列。但我仍然认为这是java的一个缺陷,它比其他任何东西都重要
# 4 楼答案
这段代码有一个主要问题,即使没有并行代码: 之后distinct(),流将只有2个元素——因此限制永远不会生效——它将打印这两个元素,然后继续无限期地浪费CPU时间。但这可能正是你想要的
有了平行和限制,我相信由于工作分工的方式,问题会进一步恶化。我还没有一路追踪并行流代码,但我猜:
并行代码在多个线程之间分配工作,这些线程都会无限期地运行,因为它们永远不会完成配额。系统可能会等待每个线程完成,这样就可以组合它们的结果,以确保顺序的清晰性——但在您提供的情况下,这种情况永远不会发生
在没有订单要求的情况下,每个辅助线程的结果可以在对照全局差异集进行检查后立即使用
毫无限制地,我怀疑不同的代码被用来处理无限的流:不是等待所需的10个流填满,而是报告发现的结果。这有点像制作一个迭代器,报告hasNext()=true,首先生成0,然后生成1,然后next()调用永远挂起,而不生成结果——在并行情况下,有些东西在等待多个报告,以便在输出之前正确地组合/排序它们,而在串行情况下,它会执行可以挂起的操作
我会尝试找出调用堆栈中有无distinct()或limit()的确切区别,但到目前为止,似乎很难找到相当复杂的流库调用序列