管理Python multiprocessing模块的进程
我想我该发个帖子了;怎样才能正确管理Process
工作者呢?我试着使用Pool
,但发现我无法获取每个完成进程的返回值。我也试过用回调函数,但效果也不如预期。我是不是应该自己用active_children ()
来管理它们呢?
这是我的Pool代码:
from multiprocessing import *
import time
import random
SOME_LIST = []
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
def cb(retval):
SOME_LIST.append(retval)
print("Starting...")
p = Pool(processes=8)
p.apply_async(myfunc, callback=cb)
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
我期待得到一系列的值;但我得到的却只是最后一个完成的工作项:
$ python multi.py
Starting...
Stopping...
[3]
注意:答案中不应该使用threading
模块;原因如下:
在CPython中,由于全局解释器锁的限制,任何时候只能有一个线程在执行Python代码(尽管某些性能导向的库可能会绕过这个限制)。如果你想让你的应用程序更好地利用多核机器的计算资源,建议使用多进程。
1 个回答
6
你可能误解了 apply_async
的工作方式。它并不是在 Pool
中的每个进程里都调用你传给它的函数。它只会在其中一个工作进程里调用这个函数一次。所以你看到的结果是正常的。你有几种选择来实现你想要的效果:
from multiprocessing import Pool
import time
import random
SOME_LIST = []
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
def cb(retval):
SOME_LIST.append(retval)
print("Starting...")
p = Pool(processes=8)
for _ in range(p._processes):
p.apply_async(myfunc, callback=cb)
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
或者
from multiprocessing import Pool
import time
import random
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
print("Starting...")
p = Pool(processes=8)
SOME_LIST = p.map(myfunc, range(p._processes))
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
注意,你也可以调用 apply_async
或 map
的次数超过池中的进程数量。Pool
的意思是,它保证在整个生命周期内会有正好 num_processes
个进程在运行,无论你提交多少任务。所以如果你创建一个 Pool(8)
并调用 apply_async
一次,那么你的八个工作进程中会有一个接到任务,其他七个会闲着。如果你创建一个 Pool(8)
并调用 apply_async
80 次,这80个任务会分配给你的八个工作进程,但同时最多只有八个任务会被处理。