为Python的Queue.join()添加超时参数
我想要在使用 Queue 类的 join() 方法时,如果调用没有在一段时间内返回,就让它超时。有什么好的方法可以做到这一点吗?可以通过子类化队列或者使用 metaclass 来实现吗?
4 个回答
0
首先,你需要确保你队列中的所有工作线程都能通过 task_done()
来正常退出。
如果你想在使用 Queue
的时候实现超时功能,可以把队列的代码放在一个线程里,然后使用 Thread.join([timeout])
为这个线程设置一个超时时间。
下面是一个未经测试的示例,来说明我的建议
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
def queuefunc():
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.setDaemon(True)
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
t = Thread(target=queuefunc)
t.start()
t.join(100) # timeout applies here
18
join() 方法的作用就是等所有的任务都完成。如果你不在乎这些任务是否真的完成,你可以定期检查还有多少任务没有完成:
stop = time() + timeout
while q.unfinished_tasks and time() < stop:
sleep(1)
这个循环会在任务完成或者超时时间到了的时候结束。
Raymond
24
创建一个Queue
的子类可能是最好的方法。像下面这样应该可以工作(虽然没测试过):
def join_with_timeout(self, timeout):
self.all_tasks_done.acquire()
try:
endtime = time() + timeout
while self.unfinished_tasks:
remaining = endtime - time()
if remaining <= 0.0:
raise NotFinished
self.all_tasks_done.wait(remaining)
finally:
self.all_tasks_done.release()