管理Python multiprocessing模块的进程

0 投票
1 回答
1490 浏览
提问于 2025-04-28 19:04

我想我该发个帖子了;怎样才能正确管理Process工作者呢?我试着使用Pool,但发现我无法获取每个完成进程的返回值。我也试过用回调函数,但效果也不如预期。我是不是应该自己用active_children ()来管理它们呢?

这是我的Pool代码:

from multiprocessing import *                                                                                      
import time
import random

SOME_LIST = []

def myfunc():
    a = random.randint(0,3)
    time.sleep(a)
    return a

def cb(retval):
    SOME_LIST.append(retval)

print("Starting...")

p = Pool(processes=8)
p.apply_async(myfunc, callback=cb)
p.close()
p.join()

print("Stopping...")
print(SOME_LIST)

我期待得到一系列的值;但我得到的却只是最后一个完成的工作项:

$ python multi.py 
Starting...
Stopping...
[3]

注意:答案中不应该使用threading模块;原因如下:

在CPython中,由于全局解释器锁的限制,任何时候只能有一个线程在执行Python代码(尽管某些性能导向的库可能会绕过这个限制)。如果你想让你的应用程序更好地利用多核机器的计算资源,建议使用多进程。

暂无标签

1 个回答

6

你可能误解了 apply_async 的工作方式。它并不是在 Pool 中的每个进程里都调用你传给它的函数。它只会在其中一个工作进程里调用这个函数一次。所以你看到的结果是正常的。你有几种选择来实现你想要的效果:

from multiprocessing import Pool                                                                                   
import time
import random

SOME_LIST = []

def myfunc():
    a = random.randint(0,3)
    time.sleep(a)
    return a

def cb(retval):
    SOME_LIST.append(retval)

print("Starting...")

p = Pool(processes=8)
for _ in range(p._processes):
    p.apply_async(myfunc, callback=cb)
p.close()
p.join()

print("Stopping...")
print(SOME_LIST)

或者

from multiprocessing import Pool                                                                                      
import time
import random


def myfunc():
    a = random.randint(0,3)
    time.sleep(a)
    return a

print("Starting...")

p = Pool(processes=8)
SOME_LIST = p.map(myfunc, range(p._processes))
p.close()
p.join()

print("Stopping...")
print(SOME_LIST)

注意,你也可以调用 apply_asyncmap 的次数超过池中的进程数量。Pool 的意思是,它保证在整个生命周期内会有正好 num_processes 个进程在运行,无论你提交多少任务。所以如果你创建一个 Pool(8) 并调用 apply_async 一次,那么你的八个工作进程中会有一个接到任务,其他七个会闲着。如果你创建一个 Pool(8) 并调用 apply_async 80 次,这80个任务会分配给你的八个工作进程,但同时最多只有八个任务会被处理。

撰写回答