使用multiprocessing模块的脚本无法结束

10 投票
1 回答
2348 浏览
提问于 2025-05-01 13:49

下面这段代码没有打印出 "here"。这是怎么回事呢?我在我的两台电脑上测试过(一个是Windows 7,一个是Ubuntu 12.10),还有在这个网站上 http://www.compileonline.com/execute_python_online.php,结果在所有情况下都没有打印出 "here"

from multiprocessing import Queue, Process


def runLang(que):
    print "start"
    myDict=dict()
    for i in xrange(10000):
        myDict[i]=i
    que.put(myDict)
    print "finish"


def run(fileToAnalyze):
    que=Queue()
    processList=[]
    dicList=[]
    langs= ["chi","eng"]
    for lang in langs:
        p=Process(target=runLang,args=(que,))
        processList.append(p)
        p.start()

    for p1 in processList:
        p1.join()

    print "here"

    for _ in xrange(len(langs)):
        item=que.get()
        print item
        dicList.append(item)

if __name__=="__main__":
    processList = []
    for fileToAnalyse in ["abc.txt","def.txt"]:
        p=Process(target=run,args=(fileToAnalyse,))
        processList.append(p)
        p.start()
    for p1 in processList:
        p1.join()
暂无标签

1 个回答

17

这是因为当你往一个 multiprocessing.Queue 里放很多东西时,这些东西会先在内存里被缓存,一旦底层的 Pipe 满了,就不会再继续接收数据。只有当有东西从 Queue 的另一端被读取时,这个缓存才会被清空,这样 Pipe 才能接受更多的数据。一个 Process 不能结束,直到它所有的 Queue 实例里的缓存都被完全清空到它的底层 Pipe 中。这意味着,如果你试图在没有其他进程或线程从它的 Queue 中调用 get 的情况下去 join 这个进程,你可能会遇到死锁的问题。这在文档中也有提到:

警告

如上所述,如果一个子进程往队列里放了东西(并且它没有使用 JoinableQueue.cancel_join_thread),那么这个进程在所有缓存的项目被清空到管道之前是不会结束的。

这意味着如果你试图去 join 这个进程,可能会出现死锁,除非你确定所有放进队列的东西都已经被消费掉了。同样,如果子进程不是守护进程,那么父进程在尝试 join 所有非守护子进程时,可能会在退出时卡住。

注意,使用管理器创建的队列没有这个问题。

你可以通过在父进程清空 Queue 后再调用 join 来解决这个问题:

for _ in xrange(len(langs)):
    item = que.get()
    print(item)
    dicList.append(item)

# join after emptying the queue.
for p in processList:
    p.join()

print("here")

撰写回答