子阵列的多处理填充通过合并每个进程的所有子阵列来构建全局阵列

2024-03-28 17:32:35 发布

您现在位置:Python中文网/ 问答频道 /正文

在Python2.7中,我必须构建一个2D数组(arrayFullCross_final[u][u]),它包含16个块,每个块的大小为100x100个元素。在开始时,我使用一个4D数组(arrayFullCross),然后我将其重塑为400x400 2D数组

我有一个第一个版本(sequential),其中我使用了经典的python函数“map”和一个类似这样的“generator”(buildCrossMatrix_loop是我想要应用generator的函数generatorCrossMatrix):

# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))


def buildCrossMatrix_loop(params_array):

  # rows indices
  xb = params_array[0]
  # columns indices
  yb = params_array[1]
  # Current redshift
  z = zrange[params_array[2]]

  # Loop inside block
  for ub in range(dimPoints):
    for vb in range(dimPoints):
      # Diagonal terms
      if (xb == yb):
        #arrayFullCross[u][u][w][t] = 2*P_bgs**2 * N_bgs**2
        if (xb == 0):
          N_bgs = (1+1/(n[params_array[2]]*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)))

          arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])), 10**P_m_NW(np.log10(arrayCross_k[ub])), bias2D_array*sig_8_fid, growth_f[params_array[2]]*sig_8_fid, H_orig(z), H_orig(z), D_A_orig(z), D_A_orig(z), params_array[2], 0, 0)**2 * N_bgs**2
...
...

##### MAIN LOOP to fill, at each index i, the array "arrayFullCross" #####
while i < len(zrange):

  ...
  ...

  def generatorCrossMatrix(index):
    for igen in range(dimBlocks):
      for lgen in range(dimBlocks):
        yield igen, lgen, index


  if __name__ == '__main__':
      map(buildCrossMatrix_loop, generatorCrossMatrix(i))
  
  ...      
  ...

i = i+1   

i只是主循环“while”的索引

使用这种顺序方法,一切正常,我得到了预期的大输出数组arrayFullCross[u][v][x][y](我检查了其中的值,在通过400x400重新整形后,效果很好)

现在,我试着做同样的事情,但是用multiprocessing import Pool。我做到了:

from multiprocessing import Pool

def buildCrossMatrix_loop(params_array):
...

while i < len(zrange):
...

if __name__ == '__main__':          
      pool = mp.Pool(16)
      pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
      pool.terminate()

      # Reshape 4D array to 2D global array
      arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)

      print 'arrayFullCross2D_final = ', arrayFullCross2D_final

但是当我打印最终输出的2D数组arrayFullCross2D_final时,我系统地得到一个只填充零值的数组

arrayFullCross2D_final =  [[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
 ...
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]
[0. 0. 0. ... 0. 0. 0.]]

也许我必须在不同进程之间共享4D数组arrayFullCross?我该怎么做呢

每个进程如何能够同时修改4D阵列的不同部分

似乎循环的每个i索引都会覆盖这个4D全局数组

更新1

我忘了说我已经这样声明了完整数组(在main的开头,即在while循环之外):

# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))

我如何使用给出答案的解决方案和我的arrayFullCross声明?i、 e:

manager = Manager()
arrayFullCross = manager.list()

更新2

我认为通过使用ThreadPoolfrom multiprocessing.dummy import Pool as ThreadPool找到了一个很好的解决方案,方法如下:

pool = ThreadPool(16)
pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
pool.close()
pool.join()

但是性能似乎很差:事实上,我只看到一个进程使用tophtop命令运行,这正常吗

似乎大部分时间都花在锁定全局数组以写入它上:这种情况没有必要,因为我在独立的子数组上填充全局数组

我可以用ThreadPool来做这个吗


Tags: loopnp数组paramsarrayfinalpoolorig
1条回答
网友
1楼 · 发布于 2024-03-28 17:32:35

代码似乎确实是正确的。但是,当您在池模式下运行它时,每个工作进程都将拥有自己的阵列副本。然后,它们将写回您从未接触过的共享内存副本,因此表中填充了0

通过利用multiprocessing模块中的共享内存变量,您应该能够与主线程共享结果。您可以使用c类型数组,但这会大大增加代码的复杂性。multiprocessing模块通过Manager子模块提供类似python的列表。它应该足以使arrayFullCross成为Manager列表:

from multiprocessing import Manager, Pool
manager = Manager()
arrayFullCross = manager.list()

def buildCrossMatrix_loop(params_array):
...

while i < len(zrange):
...

if __name__ == '__main__':          
      pool = mp.Pool(16)
      pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
      pool.terminate()

      # Reshape 4D array to 2D global array
      arrayFullCross2D_final = arrayFullCross.swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)

      print 'arrayFullCross2D_final = ', arrayFullCross2D_final

值得注意的是,利用manager对象会产生一定程度的开销。如果性能不令人满意,请尝试使用multiprocessing中的数组类型

有关这些资源的更多信息,请参见multiprocessing docs

相关问题 更多 >