告诉使用者停止等待队列元素

2024-04-25 06:53:19 发布

您现在位置:Python中文网/ 问答频道 /正文

这里有一个例子。我有一个生产商和几个消费者。在

#!/usr/bin/env python2

from multiprocessing import Process, Queue
import time

def counter(low, high):
    current = low 
    while current <= high:
        yield current
        current += 1

def put_tasks(q):
    for c in counter(0, 9):
        q.put(c)
        time.sleep(.1)
    print('put_tasks: no more tasks') 

def work(id, q): 
    while True:
        task = q.get()
        print('process %d: %s' % (id, task))
        time.sleep(.3)
    print('process %d: done' % id) 

if __name__ == '__main__':
    q = Queue(2)
    task_gen = Process(target=put_tasks, args=(q,))
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)] 

    task_gen.start()
    for p in processes:
        p.start()
    for p in processes:
        p.join()

counter只是put_tasks的一个数字生成器。通常,我会有几千个任务,而不是像这个例子中的10个任务。此代码的目的是向队列递增地提供任务。在

问题是,消费者无法预先知道他们将要处理多少任务,但put_tasks函数知道何时完成(然后打印no more tasks)。在

样本输出:

^{pr2}$

所有任务都会被处理,但程序随后会挂起(每个进程都会被困在q.get())上。我希望它终止时,所有的任务都已处理,而不牺牲速度或安全(没有丑陋的超时)。在

有什么想法吗?在


Tags: inidfortasktimeputdefcounter
3条回答

我最近研究了同一个问题,在Python文档中找到了上述问题的另一个答案

看起来“正确”的方法是使用Queue.task_done()方法,即:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

最简单的方法是在队列中添加一些内容,告诉用户所有的工作已经完成。在

number_of_consumers = 3

def put_tasks(q):
    for c in counter(0, 9):
        q.put(c)
        time.sleep(.1)
    print('put_tasks: no more tasks')
    for i in range(number_of_consumers):
        q.put(None)

def work(id, q): 
    while True:
        task = q.get()
        if task is None:
            break
        print('process %d: %s' % (id, task))
        time.sleep(.3)
    print('process %d: done' % id) 

我建议在队列的末尾放置一个sentinel值

def put_tasks(q):
    ...

    print('put_tasks: no more tasks')
    q.put(end_of_queue)

def work(id, q):
    while True:
        task = q.get()

        if task == end_of_queue:
            q.put(task)
            print("DONE")
            return

        print('process %d: %s' % (id, task))
        time.sleep(.1)
    print('process %d: done' % id)

class Sentinel:
    def __init__(self, id):
        self.id = id

    def __eq__(self, other):
        if isinstance(other, Sentinel):
            return self.id == other.id

        return NotImplemented

if __name__ == '__main__':
    q = Queue(2)
    end_of_queue = Sentinel("end of queue")
    task_gen = Process(target=put_tasks, args=(q,))
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
    ...

我似乎不能使用object()作为哨兵,因为线程似乎访问了不同的实例,所以它们的比较并不相等。在

如果您希望生成随机哨兵,可以使用uuid模块生成随机ID:

^{pr2}$

最后,zch使用None作为一个sentinel,只要队列中不能有None就足够了。sentinel方法适用于大多数任意参数。在

相关问题 更多 >