当我使用multiprocessing.pool.apply_async超出处理器数量时会发生什么?

6 投票
2 回答
3492 浏览
提问于 2025-04-18 05:28

我有以下的设置:

results = [f(args) for _ in range(10**3)]

但是,f(args)的计算时间很长。所以我想用多进程来处理这个问题。我想这样做:

pool = mp.pool(mp.cpu_count() -1) # mp.cpu_count() -> 8
results = [pool.apply_async(f, args) for _ in range(10**3)]

显然,我的电脑没有1000个处理器,所以我担心的是:
上面的调用会导致1000个进程同时争抢CPU时间,还是说只有7个进程同时运行,每当一个进程完成后再计算下一个f(args)

我想我可以做类似pool.async_map(f, (args for _ in range(10**3)))的事情来得到相同的结果,但我发这个帖子是想了解pool.apply_async的行为。

2 个回答

6

工作进程的数量完全由 mp.pool() 这个参数来控制。所以如果 mp.cpu_count() 在你的电脑上返回的是8,那就会创建7个工作进程。

所有的 pool 方法(比如 apply_async())都不会使用超过这个数量的工作进程。在后台,参数会在主程序中被“打包”,然后通过进程间的管道发送到工作进程。这种隐藏的机制实际上创建了一个工作队列,固定数量的工作进程会从中提取需要完成的工作描述(包括函数名和参数)。

除此之外,其他的就都是魔法了 ;-)

11

你在运行的进程数量永远不会超过你池子里的工作者数量(在你的情况下是 mp.cpu_count() - 1)。如果你调用 apply_async,但所有的工作者都在忙,那么这个任务就会被放到队列里,等到有工作者空闲出来后再执行。你可以通过一个简单的测试程序来看到这一点:

#!/usr/bin/python

import time
import multiprocessing as mp

def worker(chunk):
    print('working')
    time.sleep(10)
    return

def main():
    pool = mp.Pool(2)  # Only two workers
    for n in range(0, 8):
        pool.apply_async(worker, (n,))
        print("called it")
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

输出结果是这样的:

called it
called it
called it
called it
called it
called it
called it
called it
working
working
<delay>
working
working
<delay>
working 
working
<delay>
working
working

撰写回答