这里有一个例子。我有一个生产商和几个消费者。在
#!/usr/bin/env python2
from multiprocessing import Process, Queue
import time
def counter(low, high):
current = low
while current <= high:
yield current
current += 1
def put_tasks(q):
for c in counter(0, 9):
q.put(c)
time.sleep(.1)
print('put_tasks: no more tasks')
def work(id, q):
while True:
task = q.get()
print('process %d: %s' % (id, task))
time.sleep(.3)
print('process %d: done' % id)
if __name__ == '__main__':
q = Queue(2)
task_gen = Process(target=put_tasks, args=(q,))
processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
task_gen.start()
for p in processes:
p.start()
for p in processes:
p.join()
counter
只是put_tasks
的一个数字生成器。通常,我会有几千个任务,而不是像这个例子中的10个任务。此代码的目的是向队列递增地提供任务。在
问题是,消费者无法预先知道他们将要处理多少任务,但put_tasks
函数知道何时完成(然后打印no more tasks
)。在
样本输出:
^{pr2}$所有任务都会被处理,但程序随后会挂起(每个进程都会被困在q.get()
)上。我希望它终止时,所有的任务都已处理,而不牺牲速度或安全(没有丑陋的超时)。在
有什么想法吗?在
我最近研究了同一个问题,在Python文档中找到了上述问题的另一个答案
看起来“正确”的方法是使用
Queue.task_done()
方法,即:最简单的方法是在队列中添加一些内容,告诉用户所有的工作已经完成。在
我建议在队列的末尾放置一个sentinel值
我似乎不能使用
object()
作为哨兵,因为线程似乎访问了不同的实例,所以它们的比较并不相等。在如果您希望生成随机哨兵,可以使用
^{pr2}$uuid
模块生成随机ID:最后,zch使用
None
作为一个sentinel,只要队列中不能有None
就足够了。sentinel方法适用于大多数任意参数。在相关问题 更多 >
编程相关推荐