我需要用MPI重写一个简单的for循环,因为每个步骤都很耗时。假设我有一个包含几个np.array的列表,我想对每个数组应用一些计算。例如:
def myFun(x):
return x+2 # simple example, the real one would be complicated
dat = [np.random.rand(3,2), np.random.rand(3,2),np.random.rand(3,2),np.random.rand(3,2)] # real data would be much larger
result = []
for item in dat:
result.append(myFun(item))
我不想使用上面的简单for循环,而是想使用MPI与24个不同的节点并行运行上述代码的“for循环”部分。我还希望结果列表中的项目顺序与dat列表中的相同
注意数据是从其他文件读取的,这些文件可以被视为每个处理器的“修复”
我以前没有用过mpi,所以这让我有一段时间很难受
您可以选择@dreamcrash的答案中所示的低级MPI方式,也可以选择更具Python风格的解决方案,该解决方案使用与标准Python
multiprocessing
模块提供的执行器池非常类似的执行器池首先,您需要通过注意您实际上正在执行一个
map
操作,将myFun
应用于dat
的每个元素,从而将代码转换为一种功能性更强的样式:map
这里在一个Python解释器进程中顺序运行要与
multiprocessing
模块并行运行该映射,只需实例化一个Pool
对象,然后调用其map()
方法来代替Pythonmap()
函数:在这里,
Pool()
创建一个新的执行器池,其中包含的解释器进程数量与操作系统看到的逻辑CPU数量相同。调用池的map()
方法,通过将项目发送到池中的不同进程并等待完成,并行运行映射。由于辅助进程将Python脚本作为模块导入,因此将以前处于顶层的代码移动到if __name__ == '__main__':
条件下非常重要,这样它就不会在辅助进程中运行使用
multiprocessing.Pool()
非常方便,因为它只需要对原始代码稍作更改,模块就可以为您处理所有工作调度和从工作进程到工作进程的所需数据移动。multiprocessing
的问题是它只能在单个主机上工作。幸运的是,mpi4py
通过mpi4py.futures.MPIPoolExecutor
类提供了类似的接口:与来自
multiprocessing
模块的Pool
对象一样,MPI池执行器为您处理所有工作调度和数据移动有两种方法可以运行MPI程序。第一个以MPI单例启动脚本,然后使用MPI进程控制工具生成包含所有池工作线程的子MPI作业:
您还需要指定MPI范围大小(主作业和所有子作业中的MPI列组总数)。实现的具体方式不同,因此需要参考实现手册
第二个选项是直接启动所需数量的MPI列组,并让它们以脚本名称作为参数执行
mpi4py.futures
模块本身:请记住,无论以何种方式启动脚本,都会为控制器保留一个MPI列组,并且不会运行映射任务。您的目标是在24台主机上运行,因此您应该有足够的CPU核心,并且可能有能力保留一个。或者,您可以指示MPI向第一台主机多订购一个列组
关于
multiprocessing.Pool
和mpi4py.futures.MPIPoolExecutor
需要注意的一点是map()
方法保证了输出数组中项的顺序,但它不保证对不同项求值的顺序。在大多数情况下,这不应该是个问题一句忠告。如果您的数据实际上是从文件中读取的块,您可能会尝试执行以下操作:
不要那样做。相反,如果可能,例如,如果通过共享(希望是并行)文件系统的存在而启用,则将读取委托给工作程序:
为了简单起见,让我们假设主进程(带有
rank = 0
的进程)是将整个文件从磁盘读入内存的进程。只有了解以下MPI例程Get_size()
、Get_rank()
、scatter
和gather
,才能解决此问题会议Get_size():
该Get_rank()委员会:
在MPI中,为每个进程分配一个等级,等级从0到N-1不等,其中N是正在运行的进程总数
会议scatter:
以及gather:
显然,您应该首先阅读教程并阅读MPI文档,以了解其并行编程模型及其例程。否则,你会发现很难理解它是如何工作的。也就是说,您的代码可能如下所示:
这样,每个进程将只在特定的数据块中工作(并行)。完成工作后,每个进程将其数据块(即^{)发送回主进程。这只是一个玩具示例,现在取决于您根据测试环境和代码进行改进
相关问题 更多 >
编程相关推荐