Python 有简单的基于进程的并行映射吗?
我在寻找一个简单的基于进程的并行映射方法,用于Python,也就是说,我想要一个函数
parmap(function,[data])
这个函数可以在不同的进程中对[data]中的每个元素运行(其实是不同的核心,不过据我所知,在Python中,想要在不同核心上运行东西,唯一的方法就是启动多个解释器),然后返回一个结果列表。
有没有这样的东西呢?我希望它能简单一点,所以一个简单的模块就很好。当然,如果没有这样的东西,我也可以接受一个大一点的库 :-/
5 个回答
这可以通过Ray来优雅地实现,Ray是一个可以让你轻松地将Python代码进行并行处理和分布式执行的系统。
要让你的例子实现并行处理,你需要用@ray.remote
这个装饰器来定义你的映射函数,然后用.remote
来调用它。这样可以确保每个远程函数的实例都会在不同的进程中执行。
import time
import ray
ray.init()
# Define the function you want to apply map on, as remote function.
@ray.remote
def f(x):
# Do some work...
time.sleep(1)
return x*x
# Define a helper parmap(f, list) function.
# This function executes a copy of f() on each element in "list".
# Each copy of f() runs in a different process.
# Note f.remote(x) returns a future of its result (i.e.,
# an identifier of the result) rather than the result itself.
def parmap(f, list):
return [f.remote(x) for x in list]
# Call parmap() on a list consisting of first 5 integers.
result_ids = parmap(f, range(1, 6))
# Get the results
results = ray.get(result_ids)
print(results)
这将打印出:
[1, 4, 9, 16, 25]
它的完成时间大约是len(list)/p
(向上取整到最近的整数),其中p
是你机器上的核心数量。假设一台机器有2个核心,我们的例子将在5/2
向上取整,也就是大约3
秒内完成。
使用Ray相比于multiprocessing模块有很多好处。特别是,相同的代码可以在单台机器上运行,也可以在多台机器的集群上运行。想了解更多Ray的优点,可以查看这篇相关帖子。
Python3中的Pool类有一个叫map()的方法,这个方法就是用来让你同时处理多个任务的。
from multiprocessing import Pool
with Pool() as P:
xtransList = P.map(some_func, a_list)
使用 with Pool() as P
这种写法就像是在使用一个进程池,它会把列表中的每一项同时执行。你还可以指定要使用多少个核心:
with Pool(processes=4) as P:
看起来你需要的是 multiprocessing.Pool() 中的 map 方法:
map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks till the result is ready. This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integ
举个例子,如果你想把这个函数应用到范围为 10 的数字上:
def f(x):
return x**2
你可以使用内置的 map() 函数来实现:
map(f, range(10))
或者使用 multiprocessing.Pool() 对象的 map() 方法:
import multiprocessing
pool = multiprocessing.Pool()
print pool.map(f, range(10))