在二维函数的网格点上进行多进程处理

2 投票
1 回答
1134 浏览
提问于 2025-04-18 16:41

我有一个二维的函数,想在网格点上计算这个函数的值。但是,使用两个循环来遍历行和列的速度非常慢,所以我想用 multiprocessing 来加快代码的运行速度。我写了下面的代码来实现这两个循环:

from multiprocessing import Pool

#Grid points
ra = np.linspace(25.1446, 25.7329, 1000)
dec = np.linspace(-10.477, -9.889, 1000)
#The 2D function
def like2d(x,y): 
    stuff=[RaDec, beta, rho_c_over_sigma_c, zhalo, rho_crit]
    m=3e14
    c=7.455
    param=[x, y, m, c]
    return reduced_shear( param, stuff, observed_g, g_err)

pool = Pool(processes=12)

def data_stream(a, b):
    for i, av in enumerate(a):
        for j, bv in enumerate(b):
            yield (i, j), (av, bv)

def myfunc(args):
    return args[0], like2d(*args[1])

counter,likelihood = pool.map(myfunc, data_stream(ra, dec))

但是我遇到了以下错误信息:

进程 PoolWorker-1:

Traceback (most recent call last):
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/user/anaconda/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/user/anaconda/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
AttributeError: 'module' object has no attribute 'myfunc'
Process PoolWorker-2:
Traceback (most recent call last):
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/user/anaconda/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/user/anaconda/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
AttributeError: 'module' object has no attribute 'myfunc'
Process PoolWorker-3:
Traceback (most recent call last):
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/user/anaconda/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/user/anaconda/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/user/anaconda/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
AttributeError: 'module' object has no attribute 'myfunc'
Process PoolWorker-4:

所有的内容都已经定义好了,我不明白为什么会出现这个错误信息!!有没有人能指出可能出错的地方?

还有一种使用 multiprocessing 来做循环并将结果保存在一个二维数组中的方法:

#Grid points
ra = np.linspace(25.1446, 25.7329, 1000)
dec = np.linspace(-10.477, -9.889, 1000)

#The 2D function
def like2d(x,y):
    stuff=[RaDec, beta, rho_c_over_sigma_c, zhalo, rho_crit]
    m=3e14
    c=7.455
    param=[x, y, m, c]
    return reduced_shear( param, stuff, observed_g, g_err)


shared_array_base = multiprocessing.Array(ctypes.c_double, ra.shape[0]*dec.shape[0])
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array = shared_array.reshape( ra.shape[0],dec.shape[0])

# Parallel processing
def my_func(i, def_param=shared_array):
    shared_array[i,:] = np.array([float(like2d(ra[j],dec[i])) for j in range(ra.shape[0])])

print "processing to estimate likelihood in 2D grids......!!!"
start = time.time()
pool = multiprocessing.Pool(processes=12)
pool.map(my_func, range(dec.shape[0]))
print shared_array
end = time.time()
print end - start

1 个回答

4

你需要在定义完工作函数(myfunc)之后再创建Pool。创建Pool的时候,Python会在那一刻分叉出你的工作进程,而在这些子进程中,只有在Pool定义之前的函数会被定义好。所以,map会返回一个包含多个元组的列表(每个元组对应data_stream中每个生成的对象),而不是一个单独的元组。因此,你需要这样做:

from multiprocessing import Pool

#Grid points
ra = np.linspace(25.1446, 25.7329, 1000)
dec = np.linspace(-10.477, -9.889, 1000)
#The 2D function
def like2d(x,y): 
    stuff=[RaDec, beta, rho_c_over_sigma_c, zhalo, rho_crit]
    m=3e14
    c=7.455
    param=[x, y, m, c]
    return reduced_shear( param, stuff, observed_g, g_err)


def data_stream(a, b):
    for i, av in enumerate(a):
        for j, bv in enumerate(b):
            yield (i, j), (av, bv)

def myfunc(args):
    return args[0], like2d(*args[1])

if __name__ == "__main__":    
    pool = Pool(processes=12)
    results = pool.map(myfunc, data_stream(ra, dec))  # results is a list of tuples.
    for counter,likelihood in results:
        print("counter: {}, likelihood: {}".format(counter, likelihood))

我添加了if __name__ == "__main__":这个保护措施,在POSIX平台上不是必须的,但在Windows上是必须的,因为Windows不支持os.fork()

撰写回答