如何将迭代器管道化到多个消费者?

9 投票
2 回答
648 浏览
提问于 2025-04-17 20:06

有没有办法让多个消费者“流水线”地消费一个生成器呢?

比如,代码中常常会出现这样的模式:

def consumer1(iterator):
    for item in iterator:
        foo(item)

def consumer2(iterator):
    for item in iterator:
        bar(item)

myiter = list(big_generator())
v1 = consumer1(myiter)
v2 = consumer2(myiter)

在这种情况下,多个函数会完全消耗同一个迭代器,这就需要把迭代器缓存到一个列表里。因为每个消费者都把迭代器用完了,所以 itertools.tee 就没用了。

我经常看到这样的代码,总希望能让消费者一次消费一个项目,而不是把整个迭代器都缓存起来。比如:

  1. consumer1 消费 myiter[0]
  2. consumer2 消费 myiter[0]
  3. consumer1 消费 myiter[1]
  4. consumer2 消费 myiter[1]
  5. 等等...

如果我能设计一种语法,它可能看起来像这样:

c1_retval, c2_retval = iforkjoin(big_generator(), (consumer1, consumer2))

你可以通过线程或多进程和 tee 迭代器来接近这个目标,但线程的消费速度不同,这意味着 tee 内部缓存的值队列可能会变得非常大。这里的重点不是利用并行处理或加速任务,而是避免缓存迭代器的大部分内容。

在我看来,如果不修改消费者,这可能是不可能的,因为控制流在消费者那里。然而,当消费者实际消费迭代器时,控制权会转到迭代器的 next() 方法,所以也许可以以某种方式反转控制流,让迭代器一次阻塞一个消费者,直到它能同时给他们提供数据?

如果这可能实现,我还没有聪明到能想到怎么做。有没有什么想法呢?

2 个回答

1

这个不行吗?还是说你需要整个迭代器,所以像这样复制每一个是不行的?如果是这样的话,我觉得你要么得创建一个副本,要么就得把列表生成两次?

for item in big_generator():
    consumer1.handle_item(item)
    consumer2.handle_item(item)
1

在不能修改消费者代码的情况下(也就是说,不能在它们里面加循环),你只有两个选择:

  1. 你在问题中提到的方法:把生成的项目保存在内存中,然后多次遍历这些项目。
  2. 把每个消费者放在一个线程里,并实现某种同步的itertools.tee,其中一个的缓冲区大小为1,这样在给所有消费者提供项目i+1之前,必须先把项目i提供给他们。

没有其他选择。你不能同时实现以下所有目标,因为它们是相互矛盾的:

  1. 有一个生成器
  2. 有一个循环来消费所有内容
  3. 然后,在前一个循环结束后,再有一个循环来再次消费所有内容
  4. 在消费它们时,只在内存(或磁盘等)中保留O(1)的项目
  5. 不重新生成(也就是说,不重新创建生成器)

如果你想重复使用生成的项目,它们必须存储在某个地方。

如果可以修改消费者的代码,显然@monkey的解决方案是最简单和直接的。

撰写回答