如何在Python中并行化列表推导式计算?

64 投票
9 回答
43047 浏览
提问于 2025-04-16 13:16

列表推导和映射计算在理论上应该比较容易实现并行处理:在列表推导中的每一个计算都可以独立于其他元素的计算进行。例如,在这个表达式中

[ x*x for x in range(1000) ]

每个 x*x 的计算(至少在理论上)都可以并行进行。

我的问题是:有没有什么 Python 模块、实现或者编程技巧可以让列表推导的计算实现并行处理(这样就可以利用所有 16、32 个核心,或者把计算分布到计算机集群或云端)?

9 个回答

9

关于列表推导的自动并行化

在我看来,要有效地实现列表推导的自动并行化,必须有额外的信息支持(比如OpenMP中的指令),或者只能限制在使用内置类型和方法的表达式上。

如果不能保证对每个列表项的处理没有副作用,那么如果处理顺序被打乱,结果可能会不正确(或者至少会不同)。

# Artificial example
counter = 0

def g(x): # func with side-effect
    global counter
    counter = counter + 1
    return x + counter

vals = [g(i) for i in range(100)] # diff result when not done in order

还有任务分配的问题。我们应该如何将问题空间拆分呢?

如果每个元素的处理形成一个任务(类似于任务农场),那么当有很多元素且每个元素的计算都很简单时,管理这些任务的开销可能会抵消并行化带来的性能提升。

另一种方法是数据拆分,将问题空间平均分配给可用的进程。

不过,列表推导也可以与生成器一起使用,这让事情变得有些复杂,但如果预先迭代的开销可以接受,这可能不会成为大问题。当然,也有生成器可能会有副作用,如果后续的项被过早迭代,结果可能会改变。这种情况不太可能,但还是有可能的。

更大的问题是进程之间的负载不均衡。没有保证每个元素处理所需的时间是相同的,因此静态划分的数据可能会导致一个进程承担大部分工作,而其他进程则闲着。

将列表分成更小的块,并在每个子进程可用时分配这些块是一个不错的折中方案,但选择合适的块大小依赖于具体应用,因此没有更多用户信息的话是无法做到的。

替代方案

正如其他答案中提到的,有很多方法和并行计算模块/框架可以根据需求选择。

我只使用过MPI(在C语言中),没有使用Python进行并行处理的经验,所以我无法为任何一个方案背书(不过,快速浏览一下,multiprocessingjugpppyro看起来都很不错)。

如果需求是尽可能接近列表推导,那么jug似乎是最接近的选择。从教程来看,在多个实例之间分配任务可以简单到:

from jug.task import Task
from yourmodule import process_data
tasks = [Task(process_data,infile) for infile in glob('*.dat')]

虽然这与multiprocessing.Pool.map()做的事情类似,但jug可以使用不同的后端来同步进程和存储中间结果(如redis、文件系统、内存),这意味着这些进程可以跨越集群中的节点。

18

对于共享内存的并行处理,我推荐使用 joblib

from joblib import delayed, Parallel

def square(x): return x*x
values = Parallel(n_jobs=NUM_CPUS)(delayed(square)(x) for x in range(1000))
45

正如Ken所说,它是做不到的,不过在2.6版本的multiprocessing模块中,进行并行计算变得非常简单。

import multiprocessing

try:
    cpus = multiprocessing.cpu_count()
except NotImplementedError:
    cpus = 2   # arbitrary default


def square(n):
    return n * n

pool = multiprocessing.Pool(processes=cpus)
print(pool.map(square, range(1000)))

文档中还有一些例子,展示了如何使用Managers来实现,这样也可以进行分布式计算。

撰写回答