Python 队列似乎失效了

1 投票
3 回答
544 浏览
提问于 2025-04-16 17:12

我有一个主进程,它会创建一些子进程,然后这些子进程会把任务放到一个队列里,主进程再从这个队列中取任务来创建更多的子进程。这个过程运行得挺好的,大约20秒后,主进程就停止了创建新任务,尽管队列里还有很多任务。

这是任务进程的运行循环代码:

try:
    page = self.fetch_page(self.url, self.timeout)
    if page != None:
        #do_stuff
        pass 
except Exception, e:                           #Log any errors
    import traceback
    self.log(str(traceback.format_exc(limit=10)), level="error")
    self.log(str(e), level="error")
finally:
    import os, signal
    print "releasing Semaphore"
    self.__sem.release()
    #print "Joining pQueue" #these statements raise errors...
    #self.__pqueue.join_thread()
    #print "Joining lQueue"
    #self.__log.join_thread()
    print "exiting"
    os._exit(1)
    #os.kill(self.pid, signal.SIGTERM)

这是主进程用来创建任务的代码:

while True:
    print "Waiting for url"
    url = self.page_queue.get()
    print "waiting for semaphore"
    self.__sem.acquire()
    print "semaphore recived"
    process = self.process_handler(url, self.log_queue, self.__sem, self.page_queue)
    process.start()

稍微解释一下,生成进程中的self.log_queue对应任务进程中的self.__log,self.page_queue对应任务进程中的self.__pqueue,而self.__sem在两个进程中是一样的。

生成进程通常会在以下代码处卡住:

url = self.page_queue.get()

我觉得这可能和任务进程在完成写入队列之前就结束了有关,这样会导致队列出问题,不过这只是我的猜测。而且,self.__pqueue.join_thread()会引发一个断言错误。

3 个回答

1

不确定这是否有用,但如果你的self.page_queue是一个Queue的实例(可以参考这个链接:http://docs.python.org/library/queue.html),那么get()默认是会阻塞的,也就是说它会一直等着,直到有东西可以取出来。你有没有确认过队列是不是空的?可能它只是卡在那儿等着取出一个项目。我记得我在使用队列的时候也遇到过这个问题。

另外,只有在你为每个任务调用了get()之后,才需要调用task_done(),这样才能完成任务的合并。

1

如果你是在获取网页内容的话,建议你使用eventlet这个库,而不是用多个进程。用多个进程确实在进行大量计算时很有用,但你大部分时间可能都是在等网络连接。所以,启动多个进程的额外开销其实是浪费的。

Eventlet采用了一种协作式线程模型,这样写这种应用就简单多了。

0

好的,我解决了这个问题。最开始我觉得是队列在写入时被杀掉了。不过在用Queue.qsize()检查队列里有没有值后,我开始怀疑是信号量出了问题。于是我研究了一下多进程的管理器对象,这些对象可以让进程通过“代理”来操作它们的数据。于是我调整了逻辑,让所有的队列和信号量都由管理器对象来控制,结果看起来效果很好。相关的Python文档链接在这里:http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

撰写回答