如何向concurrent.futures.Executor.map传递多个参数?

112 投票
10 回答
135859 浏览
提问于 2025-04-16 22:02

concurrent.futures.Executor.map 这个功能可以接收多个可迭代对象(比如列表、元组等),然后用你提供的函数去处理它们。 如果我有一个生成器,它会产生元组,而这些元组通常是在调用时直接拆开的,我该怎么用这个功能呢?

下面的写法是行不通的,因为生成器产生的每个元组都被当作不同的参数传给了 map:

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
    pass

如果没有生成器,传给 map 的参数可能看起来是这样的:

executor.map(
    f,
    (i[0] for i in args),
    (i[1] for i in args),
    ...,
    (i[N] for i in args),
)

10 个回答

28

假设你有一个函数,它需要3个参数,而这3个参数是动态的,每次调用时都会变化。比如说:

def multiply(a,b,c):
    print(a * b * c)

为了使用线程多次调用这个函数,我会先创建一个元组列表,每个元组代表一组不同的a、b、c:

arguments = [(1,2,3), (4,5,6), (7,8,9), ....]

我们知道,concurrent.futures里的map函数,第一个参数是目标函数,第二个参数是每组参数的列表,这些参数会被用来执行函数。因此,你可以这样调用:

for _ in executor.map(multiply, arguments) # Error

但是这样会出现错误,提示函数期望接收3个参数,但只得到了1个。为了解决这个问题,我们需要创建一个辅助函数:

def helper(numbers):
    multiply(numbers[0], numbers[1], numbers[2])

现在,我们可以用执行器来调用这个函数,如下所示:

with ThreadPoolExecutor() as executor:
     for _ in executor.map(helper, arguments):
         pass

这样就能得到你想要的结果了。

104

一个重复的参数,一个在 c 中的参数

from itertools import repeat
for result in executor.map(f, repeat(a), c):
    pass

需要解包 c 的项目,并且可以解包 c

from itertools import izip
for result in executor.map(f, *izip(*c)):
    pass

需要解包 c 的项目,但不能解包 c

  1. f 改成只接受一个参数,并在函数内部解包这个参数。
  2. 如果 c 中每个项目的成员数量不固定,或者你只调用 f 几次:

    executor.map(lambda args, f=f: f(*args), c)
    

    这段代码定义了一个新函数,它会从 c 中解包每个项目并调用 f。在 lambda 中为 f 使用默认参数,可以让 flambda 内部使用,这样可以减少查找时间。

  3. 如果你有固定数量的参数,并且需要多次调用 f

    from collections import deque
    def itemtee(iterable, n=2):
        def gen(it = iter(iterable), items = deque(), next = next):
            popleft = items.popleft
            extend = items.extend
            while True:
                if not items:
                    extend(next(it))
                yield popleft()
        return [gen()] * n
    
    executor.map(f, *itemtee(c, n))
    

这里的 nf 的参数数量。这段内容是根据 itertools.tee 改编的。

94

你需要在 map 调用中去掉 * 符号:

args = ((a, b) for b in c)
for result in executor.map(f, args):
    pass

这样会让 f 被调用 len(args) 次,其中 f 应该接受一个参数。

如果你想让 f 接受两个参数,可以使用像这样的 lambda 调用:

args = ((a, b) for b in c)
for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
    pass

撰写回答