如何将具有多个参数的函数传递给Python的concurrent.futures.ProcessPoolExecutor.map()函数?

2024-04-24 14:50:41 发布

您现在位置:Python中文网/ 问答频道 /正文

我想concurrent.futures.ProcessPoolExecutor.map()调用一个包含2个或更多参数的函数。在下面的例子中,我使用了lambda函数并将ref定义为具有相同值的numberlist大小相等的数组。

第一个问题:有没有更好的方法可以做到这一点?在numberlist的大小可以是百万到十亿个元素的情况下,因此ref大小必须跟随numberlist,这种方法不必要地占用宝贵的内存,我想避免这种情况。我这样做是因为我读到了map函数将终止其映射,直到到达最短的数组结尾。

import concurrent.futures as cf

nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3


def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
    print(n)
    if str(ref[0]) in n:
        print('match')

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
        print(type(n))
        print(n)
        if str(ref[0]) in n:
            print('match')

运行上面的代码,我发现map函数能够实现我想要的结果。但是,当我将相同的条件传输到concurrent.futures.ProcessPoolExecutor.map()时,python3.5失败,并出现以下错误:

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed

问题2:为什么会发生此错误,如何让concurrent.futures.ProcessPoolExecutor.map()调用一个参数大于1的函数?


Tags: lambda函数inrefmapifconcurrentprint
3条回答

(1)不需要列清单。您可以使用itertools.repeat创建一个迭代器,该迭代器只重复某个值。

(2)需要将命名函数传递给map,因为它将被传递给子流程执行。map使用pickle协议发送东西,lambda不能被pickle,因此它们不能成为映射的一部分。但这完全没有必要。你的lambda所做的就是用2个参数调用一个2个参数的函数。完全拆下。

工作代码是

import concurrent.futures as cf
import itertools

nmax = 10
numberlist = range(nmax)
workers = 3

def _findmatch(listnumber, ref):    
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x 

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
        print(type(n))
        print(n)
        #if str(ref[0]) in n:
        #    print('match')

要首先回答第二个问题,您将得到一个异常,因为像您正在使用的那种lambda函数是不可选择的。由于Python使用pickle协议序列化在主进程和ProcessPoolExecutor的工作进程之间传递的数据,这是一个问题。现在还不清楚为什么要使用lambda。你的lambda有两个参数,就像原来的函数一样。您可以直接使用_findmatch,而不是lambda,它应该可以工作。

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_findmatch, numberlist, ref):
        ...

至于第一个问题,即在不创建大列表的情况下传递第二个常量参数,您可以用几种方法解决这个问题。一种方法可能是使用itertools.repeat创建一个iterable对象,该对象在迭代时永远重复相同的值。

但更好的方法可能是编写一个额外的函数,为您传递常量参数。(也许这就是为什么要使用lambda函数?)如果您使用的函数在模块的顶级命名空间中是可访问的,那么它应该可以工作:

def _helper(x):
    return _findmatch(x, 5)

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_helper, numberlist):
        ...

关于您的第一个问题,我是否正确理解您希望传递一个参数,该参数的值仅在调用map时确定,但对于映射函数的所有实例都是常量?如果是这样的话,我会用一个派生自“模板函数”的函数来做map,该函数的第二个参数(在您的示例中是ref)是使用functools.partial烘焙的:

from functools import partial
refval = 5

def _findmatch(ref, listnumber):  # arguments swapped
    ...

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(partial(_findmatch, refval), numberlist):
        ...

关于。问题2,第一部分:我还没有找到尝试pickle(serialize)然后应该并行执行的函数的确切代码片段,但这听起来很自然--不仅是参数,而且函数还必须以某种方式传递给workers,而且它可能必须被序列化才能进行此传输。当不能在别处提到lambdas时,partial函数可以被pickle,例如这里:https://stackoverflow.com/a/19279016/6356764

关于。问题2,第二部分:如果您想调用一个在ProcessPoolExecutor.map中有多个参数的函数,您可以将该函数作为第一个参数传递给它,然后是该函数的第一个参数的iterable,然后是它的第二个参数的iterable,以此类推:

for n in executor.map(_findmatch, numberlist, ref):
    ...

相关问题 更多 >