当我使用multiprocessing.pool.apply_async超出处理器数量时会发生什么?
我有以下的设置:
results = [f(args) for _ in range(10**3)]
但是,f(args)
的计算时间很长。所以我想用多进程来处理这个问题。我想这样做:
pool = mp.pool(mp.cpu_count() -1) # mp.cpu_count() -> 8
results = [pool.apply_async(f, args) for _ in range(10**3)]
显然,我的电脑没有1000个处理器,所以我担心的是:
上面的调用会导致1000个进程同时争抢CPU时间,还是说只有7个进程同时运行,每当一个进程完成后再计算下一个f(args)
?
我想我可以做类似pool.async_map(f, (args for _ in range(10**3)))
的事情来得到相同的结果,但我发这个帖子是想了解pool.apply_async
的行为。
2 个回答
6
工作进程的数量完全由 mp.pool()
这个参数来控制。所以如果 mp.cpu_count()
在你的电脑上返回的是8,那就会创建7个工作进程。
所有的 pool
方法(比如 apply_async()
)都不会使用超过这个数量的工作进程。在后台,参数会在主程序中被“打包”,然后通过进程间的管道发送到工作进程。这种隐藏的机制实际上创建了一个工作队列,固定数量的工作进程会从中提取需要完成的工作描述(包括函数名和参数)。
除此之外,其他的就都是魔法了 ;-)
11
你在运行的进程数量永远不会超过你池子里的工作者数量(在你的情况下是 mp.cpu_count() - 1
)。如果你调用 apply_async
,但所有的工作者都在忙,那么这个任务就会被放到队列里,等到有工作者空闲出来后再执行。你可以通过一个简单的测试程序来看到这一点:
#!/usr/bin/python
import time
import multiprocessing as mp
def worker(chunk):
print('working')
time.sleep(10)
return
def main():
pool = mp.Pool(2) # Only two workers
for n in range(0, 8):
pool.apply_async(worker, (n,))
print("called it")
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出结果是这样的:
called it
called it
called it
called it
called it
called it
called it
called it
working
working
<delay>
working
working
<delay>
working
working
<delay>
working
working