为Python的Queue.join()添加超时参数

19 投票
4 回答
9987 浏览
提问于 2025-04-15 15:03

我想要在使用 Queue 类的 join() 方法时,如果调用没有在一段时间内返回,就让它超时。有什么好的方法可以做到这一点吗?可以通过子类化队列或者使用 metaclass 来实现吗?

4 个回答

0

首先,你需要确保你队列中的所有工作线程都能通过 task_done() 来正常退出。

如果你想在使用 Queue 的时候实现超时功能,可以把队列的代码放在一个线程里,然后使用 Thread.join([timeout]) 为这个线程设置一个超时时间。

下面是一个未经测试的示例,来说明我的建议

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

def queuefunc():
    q = Queue()
    for i in range(num_worker_threads):
        t = Thread(target=worker)
        t.setDaemon(True)
        t.start()

    for item in source():
        q.put(item)

    q.join()       # block until all tasks are done

t = Thread(target=queuefunc)
t.start()
t.join(100) # timeout applies here
18

join() 方法的作用就是等所有的任务都完成。如果你不在乎这些任务是否真的完成,你可以定期检查还有多少任务没有完成:

stop = time() + timeout
while q.unfinished_tasks and time() < stop:
    sleep(1)

这个循环会在任务完成或者超时时间到了的时候结束。

Raymond

24

创建一个Queue的子类可能是最好的方法。像下面这样应该可以工作(虽然没测试过):

def join_with_timeout(self, timeout):
    self.all_tasks_done.acquire()
    try:
        endtime = time() + timeout
        while self.unfinished_tasks:
            remaining = endtime - time()
            if remaining <= 0.0:
                raise NotFinished
            self.all_tasks_done.wait(remaining)
    finally:
        self.all_tasks_done.release()

撰写回答