在Python中向pool.map()函数传递多个参数
我想找到一种方法,在使用pool.map()的时候,能够让一个函数接受多个参数。根据我的理解,pool.map()的目标函数只能有一个可迭代对象作为参数,但有没有办法让我也能传入其他参数呢?在我的情况下,我需要传入一些配置变量,比如我的Lock()和日志信息。
我尝试过做一些研究,我觉得我可能可以用部分函数来实现这个功能?不过我对这些部分函数的工作原理还不是很明白。任何帮助都会非常感谢!下面是我想做的一个简单例子:
def target(items, lock):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
pool.map(target(PASS PARAMS HERE), iterable)
pool.close()
pool.join()
3 个回答
如果你没有办法使用 functools.partial
,你也可以用一个包装函数来解决这个问题。
def target(lock):
def wrapped_func(items):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
return wrapped_func
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
lck = multiprocessing.Lock()
pool.map(target(lck), iterable)
pool.close()
pool.join()
这样一来,target()
就变成了一个可以接受锁(或者你想传入的其他参数)的函数,它会返回一个只需要一个可迭代对象作为输入的函数,但仍然可以使用你传入的所有其他参数。最终,这个函数会被传递给 pool.map()
,然后应该可以顺利执行。
你可以使用一个可以接受多个参数的映射函数,就像在 pathos
中找到的 multiprocessing
的一个分支一样。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> def add_and_subtract(x,y):
... return x+y, x-y
...
>>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))
>>> res
[(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)]
>>> Pool().map(add_and_subtract, *zip(*res))
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]
pathos
让你可以轻松地嵌套层级并行映射,支持多个输入,所以我们可以扩展我们的例子来演示这一点。
>>> from pathos.multiprocessing import ThreadingPool as TPool
>>>
>>> res = TPool().amap(add_and_subtract, *zip(*Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))))
>>> res.get()
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]
更有趣的是,我们可以构建一个嵌套函数,然后把它传递给池(Pool)。这是可能的,因为 pathos
使用了 dill
,它几乎可以序列化 Python 中的任何东西。
>>> def build_fun_things(f, g):
... def do_fun_things(x, y):
... return f(x,y), g(x,y)
... return do_fun_things
...
>>> def add(x,y):
... return x+y
...
>>> def sub(x,y):
... return x-y
...
>>> neato = build_fun_things(add, sub)
>>>
>>> res = TPool().imap(neato, *zip(*Pool().map(neato, range(0,20,2), range(-5,5,1))))
>>> list(res)
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]
不过,如果你不能使用标准库以外的东西,那就得用其他方法了。在这种情况下,最好的选择是使用 multiprocessing.starmap
,可以在这里看到:Python multiprocessing pool.map for multiple arguments(这是 @Roberto 在原帖评论中提到的)
你可以在这里获取 pathos
:https://github.com/uqfoundation
你可以使用 functools.partial
来实现这个功能(正如你所猜测的那样):
from functools import partial
def target(lock, iterable_item):
for item in iterable_item:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()
举个例子:
def f(a, b, c):
print("{} {} {}".format(a, b, c))
def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
a = "hi"
b = "there"
func = partial(f, a, b)
pool.map(func, iterable)
pool.close()
pool.join()
if __name__ == "__main__":
main()
输出结果:
hi there 1
hi there 2
hi there 3
hi there 4
hi there 5