mpi4py使用大型numpy数组进行分散和聚集

2024-05-15 02:57:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用mpi4py在大型numpy数组上并行一些操作。我目前使用numpy.array_split将数组分成块,然后com.scatter将数组发送到不同的内核,然后comm.gather收集得到的数组。最小(非)工作示例如下:

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    test = np.random.rand(411,48,52,40)
    test_chunks = np.array_split(test,size,axis=0)

else:
    test_chunks = None

test_chunk = comm.scatter(test_chunks,root=0)
output_chunk = np.zeros([np.shape(test_chunk)[0],128,128,128])

for i in range(0,np.shape(test_chunk)[0],1):
    print(i)
    output_chunk[i,0:48,0:52,0:40] = test_chunk[i]

outputData = comm.gather(output_chunk,root=0)


if rank == 0:
    outputData = np.concatenate(outputData,axis = 0)

运行此命令会出错:

  File "test_4d.py", line 23, in <module>
    outputData = comm.gather(output_chunk,root=0)
  File "Comm.pyx", line 869, in mpi4py.MPI.Comm.gather (src/mpi4py.MPI.c:73266)
  File "pickled.pxi", line 614, in mpi4py.MPI.PyMPI_gather (src/mpi4py.MPI.c:33592)
  File "pickled.pxi", line 146, in mpi4py.MPI._p_Pickle.allocv (src/mpi4py.MPI.c:28517)
  File "pickled.pxi", line 95, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:27832)
SystemError: Negative size passed to PyString_FromStringAndSize

这个错误似乎是由于gather收集的numpy数组太大所致;因为scatter和gather将数组作为数组列表发送,所以很容易超过列表大小。我遇到的一个建议是使用comm.Scatter和comm.Gather。然而,我正在努力为这些功能找到清晰的文档,到目前为止还无法成功实现它们。例如:

更换

outputData = comm.gather(output_chunk,root=0)

带着那条线

outputData=comm.Gather(sendbuf[test_chunks,MPI.DOUBLE],recvbuf=output_chunk,MPI.DOUBLE],root=0)

给出错误:

  File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
  File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
  File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
  File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
  File "message.pxi", line 51, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19644)
  File "asbuffer.pxi", line 108, in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:6757)
  File "asbuffer.pxi", line 50, in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:6093)
TypeError: expected a readable buffer object

或者用这句话:

outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)

给出错误:

  File "test_4d_2.py", line 24, in <module>
    outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
  File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
  File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
  File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
  File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
  File "message.pxi", line 60, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19747)
TypeError: unhashable type: 'numpy.ndarray'

此外,输入矩阵test的大小也可能增加,这可能导致comm.scatter的类似问题。除了我在comm.Gather方面已经存在的问题之外,我不确定如何设置comm.Scatter,因为recvbuf是根据test_chunk的大小(即comm.scatter的输出)定义的,因此我不能在comm.Scatter内指定recvbuf


Tags: intestsrcmessageoutputlinempi4py数组
1条回答
网友
1楼 · 发布于 2024-05-15 02:57:56

解决方案是使用comm.Scattervcomm.Gatherv作为内存块(而不是numpy数组的列表)发送和接收数据,以解决数据大小问题。comm.Scattervcomm.Gatherv假设内存中有一个C顺序(row major)的数据块,需要指定两个向量,sendcountsdisplacementsSendcounts给出分割输入数据的位置的整数值(索引)(即发送到给定核的每个向量的起点),而displacements给出该向量的长度。因此,可以改变发送到每个核心的数据量。更多细节可以在这里找到:http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html

下面给出了一个对二维矩阵使用comm.Scattervcomm.Gatherv的示例: Along what axis does mpi4py Scatterv function split a numpy array?

相关问题 更多 >

    热门问题