Python 有简单的基于进程的并行映射吗?

84 投票
5 回答
67255 浏览
提问于 2025-04-15 15:50

我在寻找一个简单的基于进程的并行映射方法,用于Python,也就是说,我想要一个函数

parmap(function,[data])

这个函数可以在不同的进程中对[data]中的每个元素运行(其实是不同的核心,不过据我所知,在Python中,想要在不同核心上运行东西,唯一的方法就是启动多个解释器),然后返回一个结果列表。

有没有这样的东西呢?我希望它能简单一点,所以一个简单的模块就很好。当然,如果没有这样的东西,我也可以接受一个大一点的库 :-/

5 个回答

14

这可以通过Ray来优雅地实现,Ray是一个可以让你轻松地将Python代码进行并行处理和分布式执行的系统。

要让你的例子实现并行处理,你需要用@ray.remote这个装饰器来定义你的映射函数,然后用.remote来调用它。这样可以确保每个远程函数的实例都会在不同的进程中执行。

import time
import ray

ray.init()

# Define the function you want to apply map on, as remote function. 
@ray.remote
def f(x):
    # Do some work...
    time.sleep(1)
    return x*x

# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e., 
# an identifier of the result) rather than the result itself.  
def parmap(f, list):
    return [f.remote(x) for x in list]

# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))

# Get the results
results = ray.get(result_ids)
print(results)

这将打印出:

[1, 4, 9, 16, 25]

它的完成时间大约是len(list)/p(向上取整到最近的整数),其中p是你机器上的核心数量。假设一台机器有2个核心,我们的例子将在5/2向上取整,也就是大约3秒内完成。

使用Ray相比于multiprocessing模块有很多好处。特别是,相同的代码可以在单台机器上运行,也可以在多台机器的集群上运行。想了解更多Ray的优点,可以查看这篇相关帖子

15

Python3中的Pool类有一个叫map()的方法,这个方法就是用来让你同时处理多个任务的。

from multiprocessing import Pool

with Pool() as P:
    xtransList = P.map(some_func, a_list)

使用 with Pool() as P 这种写法就像是在使用一个进程池,它会把列表中的每一项同时执行。你还可以指定要使用多少个核心:

with Pool(processes=4) as P:
147

看起来你需要的是 multiprocessing.Pool() 中的 map 方法

map(func, iterable[, chunksize])

A parallel equivalent of the map() built-in function (it supports only
one iterable argument though). It blocks till the result is ready.

This method chops the iterable into a number of chunks which it submits to the 
process pool as separate tasks. The (approximate) size of these chunks can be 
specified by setting chunksize to a positive integ

举个例子,如果你想把这个函数应用到范围为 10 的数字上:

def f(x):
    return x**2

你可以使用内置的 map() 函数来实现:

map(f, range(10))

或者使用 multiprocessing.Pool() 对象的 map() 方法:

import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))

撰写回答