python中的队列对于多线程是实用的,但是它不支持在队列无限期为空时停止工作线程。在
例如,考虑如下:
queue = Queue()
def process(payload):
time.sleep(random())
def work(item):
while(True):
payload = queue.get()
try:
process(payload)
except:
print("TERROR ERROR!")
finally:
queue.task_done()
threads = dict()
for thread_id in range(10):
threads[thread_id] = Thread(target=work)
threads[thread_id].deamon = True
threads[thread_id].start()
for payload in range(100):
queue.put(payload)
queue.join();
所以这很有效,但不是真的。这个队列.join()等待所有项目报告完成,然后主线程完成,但工作线程将无限期等待。如果这将是(unix)进程的结束,当然,我们可以把它留给操作系统,但如果它继续下去,就会有这些等待线程溢出资源。在
然后我们实现一个sentinel、EOQ或bottom或任何你想称之为的东西:
^{pr2}$这是一个更好的解决方案,因为线程现在停止了。然而,注入哨兵的代码是笨拙的,而且容易出错。考虑一下我不小心放的太少,或者一个工作线程意外地处理了两个,这样其他工作线程就不会得到它们的线程了。在
或者:
class FiniteQueue(Queue):
def __init__(self, ....)
super() .__init__(....)
self.finished = False
def put(self, item, ...):
if self.finished:
raise AlreadyFinished()
super().put(item, ...)
def set_finished(self):
self.finished=True
def get(self, ...):
if self.finished:
raise AlreadyFinished()
return super().get(....)
显然,我很懒,没有使put()方法线程安全,不过这是很有可能做到的。这样,工作人员可以简单地捕获AlreadyFinished对象,然后停止。在
当所有有效负载都已输入时,主队列可以简单地应用set_finished()。然后,队列可以检测到何时它将无法获得更多的有效负载,并将此情况报告给工作人员(如果您愿意,也可以报告给使用者)。在
为什么python队列不提供set_finished()功能?它不干扰无止境的_queue用例,但支持有限的处理管道。在
我在这个设计中遗漏了一个明显的错误吗?这是一个人不应该想要的吗?有没有比提供的FiniteQueue更简单的替代方案?在
为了解决sentinel问题,我加入了与sentinel相同的数量,因为有工作线程。如果工作线程检测到线程,它将退出,因此不可能有重复线程。作为sentinel,我使用了一个函数的引用,这个函数从来没有被调用过。在
编辑:感谢@DarrenRinger建议不要使用。我以前也试过,但失败了(我怀疑是因为另一个问题)
我建议使用由所有worker共享的^{} 对象,而不是sentinel,以避免意外地混合数据和sentinel。还要确保在
queue.get()
上实现阻塞和超时,以免浪费资源。在你对哨兵对象的处理方法是正确和良好的。对于
worker
threadaccidentally process two
Sentinel对象是不可能的,因为当它发现其中一个对象时,它的处理周期就会中断。在FiniteQueue
方法将不起作用,因为设置finished
标志将不会唤醒worker,在语句super().get(....)
被阻塞。在这是大多数支持线程的编程语言的常见问题:同时阻塞两个条件的等待。在您的例子中,
get()
方法应该等待:1)队列变为非空
2)或finish flag设置为true
为了正确,wait方法必须知道这两个条件。这使得使用支持
wait
的现成对象更加困难。有些语言支持某种类型的线程中断,这会唤醒阻塞的线程。Python似乎缺少这种机制。在相关问题 更多 >
编程相关推荐