如何在Python中并行化列表推导式计算?
列表推导和映射计算在理论上应该比较容易实现并行处理:在列表推导中的每一个计算都可以独立于其他元素的计算进行。例如,在这个表达式中
[ x*x for x in range(1000) ]
每个 x*x 的计算(至少在理论上)都可以并行进行。
我的问题是:有没有什么 Python 模块、实现或者编程技巧可以让列表推导的计算实现并行处理(这样就可以利用所有 16、32 个核心,或者把计算分布到计算机集群或云端)?
9 个回答
关于列表推导的自动并行化
在我看来,要有效地实现列表推导的自动并行化,必须有额外的信息支持(比如OpenMP中的指令),或者只能限制在使用内置类型和方法的表达式上。
如果不能保证对每个列表项的处理没有副作用,那么如果处理顺序被打乱,结果可能会不正确(或者至少会不同)。
# Artificial example
counter = 0
def g(x): # func with side-effect
global counter
counter = counter + 1
return x + counter
vals = [g(i) for i in range(100)] # diff result when not done in order
还有任务分配的问题。我们应该如何将问题空间拆分呢?
如果每个元素的处理形成一个任务(类似于任务农场),那么当有很多元素且每个元素的计算都很简单时,管理这些任务的开销可能会抵消并行化带来的性能提升。
另一种方法是数据拆分,将问题空间平均分配给可用的进程。
不过,列表推导也可以与生成器一起使用,这让事情变得有些复杂,但如果预先迭代的开销可以接受,这可能不会成为大问题。当然,也有生成器可能会有副作用,如果后续的项被过早迭代,结果可能会改变。这种情况不太可能,但还是有可能的。
更大的问题是进程之间的负载不均衡。没有保证每个元素处理所需的时间是相同的,因此静态划分的数据可能会导致一个进程承担大部分工作,而其他进程则闲着。
将列表分成更小的块,并在每个子进程可用时分配这些块是一个不错的折中方案,但选择合适的块大小依赖于具体应用,因此没有更多用户信息的话是无法做到的。
替代方案
正如其他答案中提到的,有很多方法和并行计算模块/框架可以根据需求选择。
我只使用过MPI(在C语言中),没有使用Python进行并行处理的经验,所以我无法为任何一个方案背书(不过,快速浏览一下,multiprocessing、jug、pp和pyro看起来都很不错)。
如果需求是尽可能接近列表推导,那么jug似乎是最接近的选择。从教程来看,在多个实例之间分配任务可以简单到:
from jug.task import Task
from yourmodule import process_data
tasks = [Task(process_data,infile) for infile in glob('*.dat')]
虽然这与multiprocessing.Pool.map()
做的事情类似,但jug
可以使用不同的后端来同步进程和存储中间结果(如redis、文件系统、内存),这意味着这些进程可以跨越集群中的节点。
对于共享内存的并行处理,我推荐使用 joblib:
from joblib import delayed, Parallel
def square(x): return x*x
values = Parallel(n_jobs=NUM_CPUS)(delayed(square)(x) for x in range(1000))
正如Ken所说,它是做不到的,不过在2.6版本的multiprocessing模块中,进行并行计算变得非常简单。
import multiprocessing
try:
cpus = multiprocessing.cpu_count()
except NotImplementedError:
cpus = 2 # arbitrary default
def square(n):
return n * n
pool = multiprocessing.Pool(processes=cpus)
print(pool.map(square, range(1000)))
文档中还有一些例子,展示了如何使用Managers来实现,这样也可以进行分布式计算。