导入使用多进程的Python模块
我想用多进程模块来加快一些交通规划模型的运行时间。我已经通过“正常”的方法尽可能地优化了,但问题的核心是一个非常适合并行处理的问题。比如说,对四组不同的输入执行相同的一组矩阵运算,这些输入之间是独立的。
伪代码:
for mat1,mat2,mat3,mat4 in zip([a1,a2,a3,a4],[b1,b2,b3,b4],[c1,c2,c3,c4],[d1,d2,d3,d4]):
result1 = mat1*mat2^mat3
result2 = mat1/mat4
result3 = mat3.T*mat2.T+mat4
所以我真正想做的就是在一台四核电脑上并行处理这个循环的迭代。我在这里和其他地方都查阅了关于多进程模块的资料,感觉它非常适合我的需求,除了需要:
if __name__ == '__main__'
根据我的理解,这意味着你只能在脚本中运行多进程代码?也就是说,如果我做类似这样的事情:
import multiprocessing
from numpy.random import randn
a = randn(100,100)
b = randn(100,100)
c = randn(100,100)
d = randn(100,100)
def process_matrix(mat):
return mat^2
if __name__=='__main__':
print "Multiprocessing"
jobs=[]
for input_matrix in [a,b,c,d]:
p = multiprocessing.Process(target=process_matrix,args=(input_matrix,))
jobs.append(p)
p.start()
它运行得很好,但是假设我把上面的内容保存为'matrix_multiproc.py',并定义了一个新文件'importing_test.py',里面只写了:
import matrix_multiproc
多进程就不会发生,因为现在的名字是'matrix_multiproc'而不是main。
这是否意味着我永远无法在导入的模块上使用并行处理?我只是想让我的模型运行成:
def Model_Run():
import Part1, Part2, Part3, matrix_multiproc, Part4
Part1.Run()
Part2.Run()
Part3.Run()
matrix_multiproc.Run()
Part4.Run()
抱歉问了一个很长的问题,但可能答案很简单,谢谢!
2 个回答
你可以在代码的任何地方使用多进程,只要程序的主模块使用了
if __name__ == '__main__'
这个保护措施。
不过,这并不是最好的方法。这个 __main__
的检查太严格了,它在模块代码中不适用,并且限制了你能做的事情。而且其实也没必要。只要有一个检查能确保初始化代码在各个进程中只执行一次,就可以了。
更好的方法
一个更好的解决方案是检查一个环境变量。如果这个变量没有设置,就设置它并初始化你的多进程。新创建的进程会继承环境的变化,看到变量已经设置好,就不会再创建更多的进程了。
还有一种更复杂的方法是检查所有父级框架中是否有 __main__
的名字。这种方法比较难,而且容易出错。简单点好。
接下来这篇文章会解释为什么这些方法更好。
示例
比如说,你想在一个导入的模块中启动一个监听进程。这个监听进程会监控一个队列,处理数据(在这个例子中,就是记录数据)。假设我们有这样的代码:
# ----- in module.py
def listener (queue) :
while true :
... # do something with queue
def init () :
'setup shared queue and start listener process'
manager = mp.get_manager ()
queue = mp.Queue ()
child = manager.Process (
target = listener,
args = queue,
daemon = true # so parent process doesn't hang waiting for child
)
child.start ()
# start the listener
init ()
def log (msg) :
queue.put (msg)
# ----- in main.py
import module
log ('foo')
上面的代码在使用 fork 时没问题,但在使用 spawn 时就会出错。init() 会在子进程中再次运行,导致又创建一个子进程,接着又是一个,……
那怎么解决呢?当 module.py 被导入时,你无法测试 __name__ == '__main__'
。此时 __name__
的值是模块名。所以这样不行。
这里有一个明显(但不好的)解决方案:你可以把 init() 的调用移到 main.py 中,并用 __name__ == '__main__'
包裹起来。这是个坏主意。现在任何导入 module.py 的人都必须在使用之前调用 module.init()。想象一下,如果标准库也这样工作,你可能得在开始做任何事情之前调用 15 或 20 个初始化函数,容易漏掉一个,太容易出错了。
模块链
这个明显的解决方案在另一个模块包含你的模块时也会出问题。考虑这个场景:
# --- mod1.py
def init () : ... # start a child listener process
def log (msg) : ...
# --- mod2.py
import mod1.py
def somefunc (*args) :
do something...
mod1.log (f'result is : { result }')
if __name__ == '__main__' :
mod1.init ()
# --- main.py
import mod2
mod2.somefunc ()
现在 main.py 出错了。mod1.init() 从未被调用。main.py 甚至没有导入 mod1。初始化的问题比之前更糟糕。测试 __main__
是个糟糕的解决方案。
更好的方法:使用 os.environ
一个更好的解决方案是检查一个环境变量。如果这个变量没有设置,那就是第一次执行。设置这个变量并初始化你的进程。新创建的子进程会继承这个环境,看到变量已经设置好,就不会再初始化了。这样所有的初始化行为都保持在模块内部。
这个方法的挑战在于确保你选择一个独特的环境变量名,这个名字在系统的其他程序或其他 Python 模块中没有被使用。我建议使用 __file__
,因为这个值是不会变的,和 __name__
不同。为了更安全,可以使用完整路径。
在上面的第一个示例中,把对 init()
的调用替换为以下内容:
# --- module.py
envkey = 'PYSPAWN_' + os.path.basename (__file__)
# start the listener on first pass (parent process)
if not os.environ.get (envkey, false) :
os.environ [envkey] = str (os.getpid ())
init ()
如果你只是使用 __name__
而不是 __file__
,当 module.py 被导入时仍然会工作。然而,当 module.py 作为主脚本运行时,第一次运行时的名字会是 __main__
,然后在新创建的子进程中会变成 main
或 mpmain
或类似的名字(这取决于你使用的多进程库)。这样 envkey
会有两个不同的值,你会得到一个额外的子进程。
其他选项
你也可以通过查看所有父级框架来测试 __main__
的名字 如这里所述。不过你得查看整个调用栈,因为调用者可能不是主模块,就像上面的第二个例子那样。这种方法没有使用环境变量那么简单和高效。简单的解决方案更好。
这是不是意味着我不能在导入的模块上使用并行处理?
不是的,实际上你可以在代码的任何地方使用 multiprocessing
,前提是程序的主模块要加上 if __name__ == '__main__'
这个保护措施。
在Unix系统上,你甚至不需要这个保护,因为它有 fork()
这个系统调用,可以从主 python
进程创建子进程。
而在Windows上,fork()
是通过 multiprocessing
模拟的,它会重新启动一个新进程来运行主模块,这时会使用不同的 __name__
。如果没有这个保护,你的主程序会不断尝试再创建新进程,这样就会导致无限循环,快速占满你电脑的内存。