如何在循环中使用python multiprocessing Pool.map

2 投票
1 回答
14771 浏览
提问于 2025-04-17 23:27

我正在使用Runge-Kutta方法进行模拟。在每个时间步长中,我需要对两个独立变量进行两次快速傅里叶变换(FFT),这可以并行处理。我是这样写代码的:

from multiprocessing import Pool
import numpy as np

pool = Pool(processes=2)    # I like to calculate only 2 FFTs parallel 
                            # in every time step, therefor 2 processes

def Splitter(args):
    '''I have to pass 2 arguments'''
    return makeSomething(*args):

def makeSomething(a,b):
    '''dummy function instead of the one with the FFT'''
    return a*b

def RungeK():
    # ...
    # a lot of code which create the vectors A and B and calculates 
    # one Kunge-Kutta step for them 
    # ...

    n = 20                         # Just something for the example
    A = np.arange(50000)
    B = np.ones_like(A)

    for i in xrange(n):                  # loop over the time steps
        A *= np.mean(B)*B - A
        B *= np.sqrt(A)
        results = pool.map(Splitter,[(A,3),(B,2)])
        A = results[0]
        B = results[1]

    print np.mean(A)                                 # Some output
    print np.max(B)

if __name__== '__main__':
    RungeK()

不幸的是,Python在进入循环后会生成无限数量的进程。在此之前,似乎只有两个进程在运行。同时,我的内存也被占满了。在循环后面加一个

pool.close()
pool.join()

并没有解决我的问题,把它放在循环里面对我来说也没有意义。希望你们能帮帮我。

1 个回答

2

把创建池的代码放到 RungeK 函数里面;

def RungeK():
    # ...
    # a lot of code which create the vectors A and B and calculates
    # one Kunge-Kutta step for them
    # ...

    pool = Pool(processes=2)
    n = 20                         # Just something for the example
    A = np.arange(50000)
    B = np.ones_like(A)

    for i in xrange(n):  # loop over the time steps
        A *= np.mean(B)*B - A
        B *= np.sqrt(A)
        results = pool.map(Splitter, [(A, 3), (B, 2)])
        A = results[0]
        B = results[1]
    pool.close()
    print np.mean(A)  # Some output
    print np.max(B)

或者,把它放在主程序块里。

这可能是因为多进程工作的方式造成的。例如,在微软的Windows系统上,你需要能够在没有副作用的情况下导入主模块(比如说,不要在导入时创建新的进程)。

撰写回答