如何使用multiprocessing pool.map处理多个参数
在Python的multiprocessing
库中,有没有一种可以支持多个参数的pool.map
的变体?
import multiprocessing
text = "test"
def harvester(text, case):
X = case[0]
text + str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text, case), case, 1)
pool.close()
pool.join()
24 个回答
191
我觉得下面这个会更好:
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
with Pool(4) as pool:
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
输出结果
[3, 5, 7]
824
有没有一种可以支持多个参数的 pool.map 的变体?
在 Python 3.3 中,有一个叫做 pool.starmap()
方法:
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
对于旧版本的 Python:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
输出
1 1
2 1
3 1
注意这里使用了 itertools.izip()
和 itertools.repeat()
。
由于 @unutbu 提到的这个 bug,在 Python 2.6 中你不能使用 functools.partial()
或类似的功能,所以需要明确地定义一个简单的包装函数 func_star()
。另外可以参考 这个解决方法 和 由 uptimebox
提出的建议。
508
这个问题的答案会根据你使用的Python版本和具体情况而有所不同。对于最近的Python版本(从3.3开始),最通用的答案是由J.F. Sebastian首次描述的。这个方法使用了Pool.starmap
,它可以接受一系列的参数元组。然后,它会自动将每个元组中的参数拆开,并传递给指定的函数:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
对于早期版本的Python,你需要写一个辅助函数来手动拆分参数。如果你想使用with
,你还需要写一个包装器,把Pool
变成一个上下文管理器。(感谢muon的提醒。)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
在一些简单的情况下,如果第二个参数是固定的,你也可以使用partial
,但这只适用于Python 2.7及以上版本。
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. 这部分内容受到了他的回答的启发,可能应该被接受。不过因为这个回答一直在最上面,所以我觉得最好为未来的读者改进一下。