如何将迭代器管道化到多个消费者?
有没有办法让多个消费者“流水线”地消费一个生成器呢?
比如,代码中常常会出现这样的模式:
def consumer1(iterator):
for item in iterator:
foo(item)
def consumer2(iterator):
for item in iterator:
bar(item)
myiter = list(big_generator())
v1 = consumer1(myiter)
v2 = consumer2(myiter)
在这种情况下,多个函数会完全消耗同一个迭代器,这就需要把迭代器缓存到一个列表里。因为每个消费者都把迭代器用完了,所以 itertools.tee
就没用了。
我经常看到这样的代码,总希望能让消费者一次消费一个项目,而不是把整个迭代器都缓存起来。比如:
consumer1
消费myiter[0]
consumer2
消费myiter[0]
consumer1
消费myiter[1]
consumer2
消费myiter[1]
- 等等...
如果我能设计一种语法,它可能看起来像这样:
c1_retval, c2_retval = iforkjoin(big_generator(), (consumer1, consumer2))
你可以通过线程或多进程和 tee
迭代器来接近这个目标,但线程的消费速度不同,这意味着 tee
内部缓存的值队列可能会变得非常大。这里的重点不是利用并行处理或加速任务,而是避免缓存迭代器的大部分内容。
在我看来,如果不修改消费者,这可能是不可能的,因为控制流在消费者那里。然而,当消费者实际消费迭代器时,控制权会转到迭代器的 next()
方法,所以也许可以以某种方式反转控制流,让迭代器一次阻塞一个消费者,直到它能同时给他们提供数据?
如果这可能实现,我还没有聪明到能想到怎么做。有没有什么想法呢?
2 个回答
1
这个不行吗?还是说你需要整个迭代器,所以像这样复制每一个是不行的?如果是这样的话,我觉得你要么得创建一个副本,要么就得把列表生成两次?
for item in big_generator():
consumer1.handle_item(item)
consumer2.handle_item(item)
1
在不能修改消费者代码的情况下(也就是说,不能在它们里面加循环),你只有两个选择:
- 你在问题中提到的方法:把生成的项目保存在内存中,然后多次遍历这些项目。
- 把每个消费者放在一个线程里,并实现某种同步的
itertools.tee
,其中一个的缓冲区大小为1,这样在给所有消费者提供项目i+1
之前,必须先把项目i
提供给他们。
没有其他选择。你不能同时实现以下所有目标,因为它们是相互矛盾的:
- 有一个生成器
- 有一个循环来消费所有内容
- 然后,在前一个循环结束后,再有一个循环来再次消费所有内容
- 在消费它们时,只在内存(或磁盘等)中保留O(1)的项目
- 不重新生成(也就是说,不重新创建生成器)
如果你想重复使用生成的项目,它们必须存储在某个地方。
如果可以修改消费者的代码,显然@monkey的解决方案是最简单和直接的。