如何限制多进程的作用域?

19 投票
2 回答
7375 浏览
提问于 2025-04-18 18:24

使用Python的 multiprocessing 模块,下面这个例子在内存使用上非常简单:

import multiprocessing 
# completely_unrelated_array = range(2**25)

def foo(x):
    for x in xrange(2**28):pass
    print x**2

P = multiprocessing.Pool()

for x in range(8):
    multiprocessing.Process(target=foo, args=(x,)).start()

如果你取消注释创建 completely_unrelated_array 的那一行,你会发现每个新启动的进程都会为 completely_unrelated_array 分配一份内存!这是一个很小的例子,实际上它是一个更大项目的一部分,我现在还不知道怎么解决这个问题;使用多进程时,似乎会把所有全局变量都复制一遍。我需要共享内存对象,我只是想传入 x,并处理它,而不想让整个程序的内存开销那么大。

顺便提一下:有趣的是,在 foo 函数里执行 print id(completely_unrelated_array) 会得到相同的值,这说明可能并不是在复制...

2 个回答

5

这里重要的是你要针对哪个平台。Unix系统的进程是通过一种叫做“写时复制”(Copy-On-Write,简称cow)的内存方式来创建的。虽然每个进程都会得到父进程完整内存的一个副本,但只有在这个内存被修改时,才会实际分配内存,通常是按页面(4KiB)来分配。所以如果你只针对这些平台,就不需要做任何改变。

如果你要针对没有写时复制的分叉机制的平台,建议使用Python 3.4及其新的分叉上下文,比如spawnforkserver,可以查看文档。这些方法会创建新的进程,这些进程与父进程没有共享的状态,或者共享的状态非常有限,所有的内存传递都是明确的。

需要注意的是,生成的进程会导入你的模块,因此所有全局数据都会被明确复制,而不支持写时复制。为了防止这种情况,你需要减少数据的作用域。

import multiprocessing  as mp
import numpy as np

def foo(x):
    import time
    time.sleep(60)

if __name__ == "__main__":
    mp.set_start_method('spawn')
    # not global so forks will not have this allocated due to the spawn method
    # if the method would be fork the children would still have this memory allocated
    # but it could be copy-on-write
    completely_unrelated_array = np.ones((5000, 10000))
    P = mp.Pool()
    for x in range(3):
        mp.Process(target=foo, args=(x,)).start()

例如,使用spawn时的输出:

%MEM     TIME+ COMMAND
29.2   0:00.52 python3                                                
0.5   0:00.00 python3    
0.5   0:00.00 python3    
0.5   0:00.00 python3    

而使用fork时的输出:

%MEM     TIME+ COMMAND
29.2   0:00.52 python3                                                
29.1   0:00.00 python3    
29.1   0:00.00 python3                                                
29.1   0:00.00 python3

注意它的值超过了100%,这是因为写时复制的原因。

13

因为os.fork()的特性,你在__main__模块中的全局变量会被子进程继承(假设你是在Posix平台上),所以一旦子进程被创建,你会看到它们的内存使用情况反映出这些变量。我不太确定这些内存是否真的被分配了,按照我所知道的,这些内存是共享的,直到你在子进程中尝试去修改它,这时才会创建一个新的副本。另一方面,Windows不使用os.fork(),它会在每个子进程中重新导入主模块,并将你想要发送给子进程的任何局部变量进行序列化。因此,在Windows上,你可以通过仅在if __name__ == "__main__":保护下定义全局变量,来避免在子进程中复制大量全局变量,因为这个保护下的内容只会在父进程中运行:

import time
import multiprocessing 


def foo(x):
    for x in range(2**28):pass
    print(x**2)

if __name__ == "__main__":
    completely_unrelated_array = list(range(2**25)) # This will only be defined in the parent on Windows
    P = multiprocessing.Pool()

    for x in range(8):
        multiprocessing.Process(target=foo, args=(x,)).start()

现在,在Python 2.x中,如果你使用的是Posix平台,你只能通过分叉来创建新的multiprocessing.Process对象。但在Python 3.4中,你可以通过使用上下文来指定新进程的创建方式。所以,我们可以指定"spawn"上下文,这是Windows使用的,来创建我们的新进程,并使用同样的技巧:

# Note that this is Python 3.4+ only
import time
import multiprocessing 

def foo(x):
    for x in range(2**28):pass
    print(x**2)


if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))  # Again, this only exists in the parent
    ctx = multiprocessing.get_context("spawn") # Use process spawning instead of fork
    P = ctx.Pool()

    for x in range(8):
        ctx.Process(target=foo, args=(x,)).start()

如果你需要支持2.x版本,或者想继续使用os.fork()来创建新的Process对象,我认为你能做的最好的事情就是在子进程中立即删除那个占用内存的对象:

import time
import multiprocessing 
import gc

def foo(x):
    init()
    for x in range(2**28):pass
    print(x**2)

def init():
    global completely_unrelated_array
    completely_unrelated_array = None
    del completely_unrelated_array
    gc.collect()

if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))
    P = multiprocessing.Pool(initializer=init)

    for x in range(8):
        multiprocessing.Process(target=foo, args=(x,)).start()
    time.sleep(100)

撰写回答