如何使用multiprocessing pool.map处理多个参数

895 投票
24 回答
1066876 浏览
提问于 2025-04-16 14:28

在Python的multiprocessing库中,有没有一种可以支持多个参数的pool.map的变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()

24 个回答

191

我觉得下面这个会更好:

def multi_run_wrapper(args):
   return add(*args)

def add(x,y):
    return x+y

if __name__ == "__main__":
    from multiprocessing import Pool
    with Pool(4) as pool:
        results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

输出结果

[3, 5, 7]
824

有没有一种可以支持多个参数的 pool.map 的变体?

在 Python 3.3 中,有一个叫做 pool.starmap() 方法

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

对于旧版本的 Python:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

输出

1 1
2 1
3 1

注意这里使用了 itertools.izip()itertools.repeat()

由于 @unutbu 提到的这个 bug,在 Python 2.6 中你不能使用 functools.partial() 或类似的功能,所以需要明确地定义一个简单的包装函数 func_star()。另外可以参考 这个解决方法uptimebox 提出的建议

508

这个问题的答案会根据你使用的Python版本和具体情况而有所不同。对于最近的Python版本(从3.3开始),最通用的答案是由J.F. Sebastian首次描述的。这个方法使用了Pool.starmap,它可以接受一系列的参数元组。然后,它会自动将每个元组中的参数拆开,并传递给指定的函数:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

对于早期版本的Python,你需要写一个辅助函数来手动拆分参数。如果你想使用with,你还需要写一个包装器,把Pool变成一个上下文管理器。(感谢muon的提醒。)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

在一些简单的情况下,如果第二个参数是固定的,你也可以使用partial,但这只适用于Python 2.7及以上版本。

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. 这部分内容受到了他的回答的启发,可能应该被接受。不过因为这个回答一直在最上面,所以我觉得最好为未来的读者改进一下。

撰写回答