Python:如何使用MPI并行化简单循环

2024-04-26 00:04:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要用MPI重写一个简单的for循环,因为每个步骤都很耗时。假设我有一个包含几个np.array的列表,我想对每个数组应用一些计算。例如:

def myFun(x):
    return x+2 # simple example, the real one would be complicated

dat = [np.random.rand(3,2), np.random.rand(3,2),np.random.rand(3,2),np.random.rand(3,2)] # real data would be much larger
result = []
for item in dat:
    result.append(myFun(item))

我不想使用上面的简单for循环,而是想使用MPI与24个不同的节点并行运行上述代码的“for循环”部分。我还希望结果列表中的项目顺序与dat列表中的相同

注意数据是从其他文件读取的,这些文件可以被视为每个处理器的“修复”

我以前没有用过mpi,所以这让我有一段时间很难受


Tags: 文件列表fornp步骤randomberesult
2条回答

您可以选择@dreamcrash的答案中所示的低级MPI方式,也可以选择更具Python风格的解决方案,该解决方案使用与标准Python multiprocessing模块提供的执行器池非常类似的执行器池

首先,您需要通过注意您实际上正在执行一个map操作,将myFun应用于dat的每个元素,从而将代码转换为一种功能性更强的样式:

def myFun(x):
    return x+2 # simple example, the real one would be complicated

dat = [
    np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger

result = map(myFun, dat)

map这里在一个Python解释器进程中顺序运行

要与multiprocessing模块并行运行该映射,只需实例化一个Pool对象,然后调用其map()方法来代替Python map()函数:

from multiprocessing import Pool

def myFun(x):
    return x+2 # simple example, the real one would be complicated

if __name__ == '__main__':
    dat = [
        np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
    ] # real data would be much larger

    with Pool() as pool:
        result = pool.map(myFun, dat)

在这里,Pool()创建一个新的执行器池,其中包含的解释器进程数量与操作系统看到的逻辑CPU数量相同。调用池的map()方法,通过将项目发送到池中的不同进程并等待完成,并行运行映射。由于辅助进程将Python脚本作为模块导入,因此将以前处于顶层的代码移动到if __name__ == '__main__':条件下非常重要,这样它就不会在辅助进程中运行

使用multiprocessing.Pool()非常方便,因为它只需要对原始代码稍作更改,模块就可以为您处理所有工作调度和从工作进程到工作进程的所需数据移动。multiprocessing的问题是它只能在单个主机上工作。幸运的是,mpi4py通过mpi4py.futures.MPIPoolExecutor类提供了类似的接口:

from mpi4py.futures import MPIPoolExecutor

def myFun(x):
    return x+2 # simple example, the real one would be complicated

if __name__ == '__main__':
    dat = [
        np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
    ] # real data would be much larger

    with MPIPoolExecutor() as pool:
        result = pool.map(myFun, dat)

与来自multiprocessing模块的Pool对象一样,MPI池执行器为您处理所有工作调度和数据移动

有两种方法可以运行MPI程序。第一个以MPI单例启动脚本,然后使用MPI进程控制工具生成包含所有池工作线程的子MPI作业:

mpiexec -n 1 python program.py

您还需要指定MPI范围大小(主作业和所有子作业中的MPI列组总数)。实现的具体方式不同,因此需要参考实现手册

第二个选项是直接启动所需数量的MPI列组,并让它们以脚本名称作为参数执行mpi4py.futures模块本身:

mpiexec -n 24 python -m mpi4py.futures program.py

请记住,无论以何种方式启动脚本,都会为控制器保留一个MPI列组,并且不会运行映射任务。您的目标是在24台主机上运行,因此您应该有足够的CPU核心,并且可能有能力保留一个。或者,您可以指示MPI向第一台主机多订购一个列组

关于multiprocessing.Poolmpi4py.futures.MPIPoolExecutor需要注意的一点是map()方法保证了输出数组中项的顺序,但它不保证对不同项求值的顺序。在大多数情况下,这不应该是个问题


一句忠告。如果您的数据实际上是从文件中读取的块,您可能会尝试执行以下操作:

if __name__ == '__main__':
   data = read_chunks()
   with MPIPoolExecutor() as p:
       result = p.map(myFun, data)

不要那样做。相反,如果可能,例如,如果通过共享(希望是并行)文件系统的存在而启用,则将读取委托给工作程序:

NUM_CHUNKS = 100

def myFun(chunk_num):
    # You may need to pass the value of NUM_CHUNKS to read_chunk()
    # for it to be able to seek to the right position in the file
    data = read_chunk(NUM_CHUNKS, chunk_num)
    return ...

if __name__ == '__main__':
    chunk_nums = range(NUM_CHUNKS)  # 100 chunks
    with MPIPoolExecutor() as p:
        result = p.map(myFun, chunk_nums)

为了简单起见,让我们假设进程(带有rank = 0的进程)是将整个文件从磁盘读入内存的进程。只有了解以下MPI例程Get_size()Get_rank()scattergather,才能解决此问题

会议Get_size()

Returns the number of processes in the communicator. It will return the same number to every process.

Get_rank()委员会:

Determines the rank of the calling process in the communicator.

在MPI中,为每个进程分配一个等级,等级从0到N-1不等,其中N是正在运行的进程总数

会议scatter

MPI_Scatter involves a designated root process sending data to all processes in a communicator. The primary difference between MPI_Bcast and MPI_Scatter is small but important. MPI_Bcast sends the same piece of data to all processes while MPI_Scatter sends chunks of an array to different processes.

以及gather

MPI_Gather is the inverse of MPI_Scatter. Instead of spreading elements from one process to many processes, MPI_Gather takes elements from many processes and gathers them to one single process.

显然,您应该首先阅读教程并阅读MPI文档,以了解其并行编程模型及其例程。否则,你会发现很难理解它是如何工作的。也就是说,您的代码可能如下所示:

from mpi4py import MPI

def myFun(x):
    return x+2 # simple example, the real one would be complicated

comm = MPI.COMM_WORLD
rank = comm.Get_rank() # get your process ID
data = # init the data    

if rank == 0: # The master is the only process that reads the file
    data = # something read from file

# Divide the data among processes
data = comm.scatter(data, root=0)

result = []
for item in data:
    result.append(myFun(item))

# Send the results back to the master processes
newData = comm.gather(result,root=0)

 

这样,每个进程将只在特定的数据块中工作(并行)。完成工作后,每个进程将其数据块(即^{)发送回主进程。这只是一个玩具示例,现在取决于您根据测试环境和代码进行改进

相关问题 更多 >