Python 队列似乎失效了
我有一个主进程,它会创建一些子进程,然后这些子进程会把任务放到一个队列里,主进程再从这个队列中取任务来创建更多的子进程。这个过程运行得挺好的,大约20秒后,主进程就停止了创建新任务,尽管队列里还有很多任务。
这是任务进程的运行循环代码:
try:
page = self.fetch_page(self.url, self.timeout)
if page != None:
#do_stuff
pass
except Exception, e: #Log any errors
import traceback
self.log(str(traceback.format_exc(limit=10)), level="error")
self.log(str(e), level="error")
finally:
import os, signal
print "releasing Semaphore"
self.__sem.release()
#print "Joining pQueue" #these statements raise errors...
#self.__pqueue.join_thread()
#print "Joining lQueue"
#self.__log.join_thread()
print "exiting"
os._exit(1)
#os.kill(self.pid, signal.SIGTERM)
这是主进程用来创建任务的代码:
while True:
print "Waiting for url"
url = self.page_queue.get()
print "waiting for semaphore"
self.__sem.acquire()
print "semaphore recived"
process = self.process_handler(url, self.log_queue, self.__sem, self.page_queue)
process.start()
稍微解释一下,生成进程中的self.log_queue对应任务进程中的self.__log,self.page_queue对应任务进程中的self.__pqueue,而self.__sem在两个进程中是一样的。
生成进程通常会在以下代码处卡住:
url = self.page_queue.get()
我觉得这可能和任务进程在完成写入队列之前就结束了有关,这样会导致队列出问题,不过这只是我的猜测。而且,self.__pqueue.join_thread()会引发一个断言错误。
3 个回答
不确定这是否有用,但如果你的self.page_queue是一个Queue的实例(可以参考这个链接:http://docs.python.org/library/queue.html),那么get()默认是会阻塞的,也就是说它会一直等着,直到有东西可以取出来。你有没有确认过队列是不是空的?可能它只是卡在那儿等着取出一个项目。我记得我在使用队列的时候也遇到过这个问题。
另外,只有在你为每个任务调用了get()之后,才需要调用task_done(),这样才能完成任务的合并。
如果你是在获取网页内容的话,建议你使用eventlet这个库,而不是用多个进程。用多个进程确实在进行大量计算时很有用,但你大部分时间可能都是在等网络连接。所以,启动多个进程的额外开销其实是浪费的。
Eventlet采用了一种协作式线程模型,这样写这种应用就简单多了。
好的,我解决了这个问题。最开始我觉得是队列在写入时被杀掉了。不过在用Queue.qsize()检查队列里有没有值后,我开始怀疑是信号量出了问题。于是我研究了一下多进程的管理器对象,这些对象可以让进程通过“代理”来操作它们的数据。于是我调整了逻辑,让所有的队列和信号量都由管理器对象来控制,结果看起来效果很好。相关的Python文档链接在这里:http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes