多进程池挂起,无法退出应用

4 投票
3 回答
9235 浏览
提问于 2025-04-17 13:12

我知道这可能是个新手错误,但我就是搞不清楚我在使用多进程时哪里出了问题。我有这段代码(其实就是闲着没事做)

if __name__ == '__main__':
    pool = Pool(processes=4)  
    for i, x in enumerate(data): 
        pool.apply_async(new_awesome_function, (i, x))
    pool.close()
    pool.join()

数据是一个列表([1,2,3,4,5]),我想把这个列表里的每个项目分发到多个CPU上去处理。但是当我把我的工作命令放进一个函数里,然后运行这段代码时,它什么都不做(如果我直接调用这个函数,它就能正常工作)。所以我觉得我可能用错了多进程(虽然我参考了网上的例子),有没有什么建议?

更新:我注意到当程序卡住时,我甚至无法用控制+c退出……通常这个方法可以让我退出那些有问题的程序。我查看了python2.5的多进程池,试着按照建议在我的if语句里添加导入,但还是没用。

更新2:抱歉,刚刚意识到,感谢下面的回答,命令其实是有效的,但似乎没有终止程序,也让我无法强制退出。

3 个回答

2

你的代码在我这边好像是能正常运行的:

from multiprocessing import Pool
import time

def new_awesome_function(a,b):
    print(a,b, 'start')
    time.sleep(1)
    print(a,b, 'end')

if __name__ == '__main__':
    data = [1,2,3,4,5]
    pool = Pool(processes=4)
    for i, x in enumerate(data): 
        pool.apply_async(new_awesome_function, (i, x))
    pool.close()
    pool.join()

给我的结果是:

0 1 start
1 2 start
2 3 start
3 4 start
1 2 end
0 1 end
4 5 start
2 3 end
3 4 end
4 5 end

你为什么觉得它不工作呢?


补充:试着运行这个代码,看看输出结果:

from multiprocessing import Pool
import time

def new_awesome_function(a,b):
    print(a,b, 'start')
    time.sleep(1)
    print(a,b, 'end')
    return a + b

if __name__ == '__main__':
    data = [1,2,3,4,5]
    pool = Pool(processes=4)
    results = []
        for i, x in enumerate(data): 
        r = pool.apply_async(new_awesome_function, (i, x))
        results.append((i,r))
    pool.close()
    already = []
    while len(already) < len(data):
        for i,r in results:
            if r.ready() and i not in already:
                already.append(i)
                print(i, 'is ready!')
    pool.join()

我得到的结果是:

0 1 start
1 2 start
2 3 start
3 4 start
0 1 end
4 5 start
1 2 end
2 3 end
0 is ready!
3 4 end
1 is ready!
2 is ready!
3 is ready!
4 5 end
4 is ready!
3

多进程和多线程是两回事。

你可能在做的事情大概是这样的:

data = {}

def new_awesome_function(a, b):
    data[a] = b

当你运行这个脚本后,数据并没有改变。这是因为多进程会使用你程序的副本。你的函数确实在运行,但它们是在程序的副本中运行,因此对你原来的程序没有任何影响。

要想利用多进程,你需要明确地在不同的进程之间进行沟通。使用多线程时,所有东西都是共享的,但在多进程中,除非你特别指定共享,否则什么都不会共享。

最简单的方法是使用返回值:

def new_awesome_function(a, b):
    return a + b

result = pool.apply_async(new_awesome_function, (1, 2))
# later...
value = result.get()

你可以查看Python的文档:http://docs.python.org/library/multiprocessing.html,了解其他方法,比如队列、管道和管理器。你不能改变程序的状态然后指望它能正常工作。

2

我不知道你用的是什么数据库,但很可能你不能像那样在不同的进程之间共享数据库连接。

在Linux系统中,fork()这个命令会在你启动子进程时,把内存中的所有东西都复制一份。不过像网络连接、打开的文件和数据库连接这些东西,除非特别设计过,否则是不能正常工作的。

在Windows系统中,fork()是不可用的,所以它会重新运行你的脚本。在你的情况下,这样做会很糟糕,因为它会把之前的所有东西都丢掉。你可以通过把代码放在if __name__ == '__main__':这个部分来避免这个问题。

你应该能够在my_awesome_function里面重新打开数据库连接,这样就能成功和数据库进行交互了。

说实话,这样做并不会让你的程序运行得更快。实际上,我觉得这样会更慢。因为数据库本身就很慢。你的进程大部分时间都在等数据库。现在你有多个进程在等数据库,这样并不会改善情况。

不过数据库是用来存储东西的。在你进行处理的时候,最好是在代码里完成这些处理,然后再去访问数据库。你基本上是把数据库当成一个集合来用,而用Python的集合会更好。如果你真的需要把这些东西放进数据库,最好是在程序的最后再去做。

撰写回答