用“进程”或“线程”策略将独立块填充到全局数组中?

2024-06-17 11:41:22 发布

您现在位置:Python中文网/ 问答频道 /正文

在python2中,我想通过填充并行进程(或线程)不同的子数组(总共有16个块)来填充全局数组。我必须明确指出,每个块不依赖于其他块,我的意思是当我执行当前块的每个单元格的赋值时。你知道吗

1)根据我的发现,通过使用不同的“processes”,我将从CPU多核中获得很大的好处,但是由所有其他进程共享全局数组似乎有点复杂。你知道吗

2)从另一个角度来看,我可以使用“threads”而不是“processes”,因为实现起来不那么困难。我发现来自“multiprocessing.dummy”的库“ThreadPool”允许所有其他并发线程共享这个全局数组。你知道吗

例如,对于python2.7,以下代码起作用:

from multiprocessing.dummy import Pool as ThreadPool

## discretization along x-axis and y-axis for each block
arrayCross_k = np.linspace(kMIN, kMAX, dimPoints)
arrayCross_mu = np.linspace(-1, 1, dimPoints)
# Build all big matrix with N total blocks = dimBlock*dimBlock = 16 here
arrayFullCross = np.zeros((dimBlocks, dimBlocks, arrayCross_k.size, arrayCross_mu.size))
dimBlocks = 4
# Size of dimension along k and mu axis
dimPoints = 100
# dimension along one dimension of global arrayFullCross
dimMatCovCross = dimBlocks*dimPoints

# Build cross-correlation matrix 
def buildCrossMatrix_loop(params_array):
  # rows indices
  xb = params_array[0]
  # columns indices
  yb = params_array[1]
  # Current redshift
  z = zrange[params_array[2]]
  # Loop inside block
  for ub in range(dimPoints):
    for vb in range(dimPoints):
      # Diagonal blocs 
      if (xb == yb):
      # Fill the (xb,yb) su-block of global array by 
        arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb] , z, 10**P_m(np.log10(arrayCross_k[ub])),

        ...
        ...

# End of function buildCrossMatrix_loop

# Main loop
while i < len(zrange):

  def generatorCrossMatrix(index):
    for igen in range(dimBlocks):
      for lgen in range(dimBlocks):
        yield igen, lgen, index

if __name__ == '__main__':

  # Use 20 threads
  pool = ThreadPool(20)
  pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))

  # Increment index "i"
  i = i+1

但不幸的是,即使使用了20个线程,我也意识到我的CPU核心没有完全运行(实际上,使用'top'或'htop'命令,我只看到一个进程处于100%状态)。你知道吗

3)如果我想充分利用CPU的16个核心,我必须选择什么策略(就像pool.map(function, generator)) but with also the sharing of global array的情况一样?你知道吗

4)有人告诉我要对每个子数组进行I/O(基本上,把每个块写在一个文件中,然后通过读取子数组来收集所有子数组,然后填充整个数组)。这个解决方案很方便,但我希望避免I/O(除非真的没有其他解决方案)。你知道吗

5)我用C language练习过MPI library,填充子数组,最后聚集成一个大数组的操作,不是很复杂。但是,我不想在Python语言中使用MPI(我不知道它是否存在)。你知道吗

6)我还尝试使用Process,目标与我的填充函数(buildCrossMatrix_loop)相等,就像这样进入上面的while主循环:

from multiprocessing import Process

# Main loop on z range
while i < len(zrange):

  params_p = []
  for ip in range(4):
    for jp in range(4):
      params_p.append(ip)
      params_p.append(jp)
      params_p.append(i)
      p = Process(target=buildCrossMatrix_loop, args=(params_p,))
      params_p = []
      p.start()

  # Finished : wait everybody
  p.join()

  ...
  ...

  i = i+1
  # End of main while loop

但是最后的2D全局数组只填充了零。所以我必须推断Process函数不共享要填充的数组?你知道吗

7)那么我必须寻找哪种策略?地址:

<强>1。使用“池进程”并找到共享全局阵列的方法,知道我的所有16核都将运行

<强>2。使用“线程”和共享全局数组,但乍一看,性能似乎不如使用“池进程”。也许有一种方法可以提高每个“线程”的能力,我是说像“池进程”?

我试着遵循https://docs.python.org/2/library/multiprocessing.html上的不同示例,但没有成功,也就是说,从加速的角度来看,没有相关的性能。你知道吗

我认为在我的例子中,主要的问题是所有子数组的聚集,或者全局数组arrayFullCross不被其他进程或线程共享

如果有人有一个在多线程上下文(这里是一个数组)中共享全局变量的简单示例,那么最好把它放在这里。你知道吗

更新1:我用Threading(而不是multiprocessing)进行了测试,但性能仍然很差。GIL显然没有解锁,即只有一个进程出现在htop命令中(可能线程库的版本不正确)。你知道吗

因此,我将尝试使用“return”方法来处理我的问题。你知道吗

我天真地尝试在应用map函数的函数末尾返回整个数组,如下所示:

# Build cross-correlation matrix 
def buildCrossMatrix_loop(params_array):

  # rows indices
  xb = params_array[0]
  # columns indices
  yb = params_array[1]
  # Current redshift
  z = zrange[params_array[2]]
  # Loop inside block
  for ub in range(dimPoints):
    for vb in range(dimPoints):
      # Diagonal blocs 
      if (xb == yb):         
        arrayFullCross[xb][xb][ub][vb] = 2*P_obs_cross(arrayCross_k[ub], arrayCross_mu[vb])

      ... 
      ... #others assignments on arrayFullCross elements

  # Return global array to main process
  return arrayFullCross

然后,我尝试从map接收这个全局数组,如下所示:

if __name__ == '__main__':

  pool = Pool(16)
  outputArray = pool.map(buildCrossMatrix_loop, generatorCrossMatrix(i))
  pool.terminate()
  ## Print outputArray
  print 'outputArray = ', outputArray

  ## Reshape 4D outputArray to 2D array
  arrayFullCross2D_swap = np.array(outputArray).swapaxes(1,2).reshape(dimMatCovCross,dimMatCovCross)

不幸的是,当我打印outputArray时,我得到:

outputArray =  [None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]

这不是预期的4D outputArray,只是一个16无的列表(我认为16的数量对应于generatorCrossMatrix(i)提供的进程数量)。你知道吗

一旦map启动并且完成后,我如何才能取回整个4D阵列?你知道吗


Tags: innoneloopfor进程range数组params
1条回答
网友
1楼 · 发布于 2024-06-17 11:41:22

首先,我认为multiprocessing.ThreadPool是一个私有API,所以应该避免使用它。现在multiprocessing.dummy是一个无用的模块。它不做任何多线程/处理,这就是为什么你看不到任何好处。您应该使用“plain”multiprocessing模块。你知道吗

第二个代码不起作用,因为它使用多个进程。进程不共享内存,因此在子进程中所做的更改不会反映在其他子进程或主进程中。您可以:

  • 返回值并在主进程中将它们组合在一起,例如使用multiprocessing.Pool.map
  • 使用threading而不是multiprocessing: just replace导入多处理with导入线程and多处理过程with穿线。穿线`代码应该有用。你知道吗

请注意,threading版本将只工作,因为numpy在计算期间释放GIL,否则它将被卡在1个CPU上。你知道吗

你可能想看看几分钟前的this similar question,它是I answered。你知道吗

相关问题 更多 >