将多进程池适配到mpi4py
我正在用Python的多进程池来运行一个并行模拟,这在多核电脑上效果很好。现在我想在一个集群上用多个节点来执行这个程序。我想多进程可能不适合分布式内存。不过,mpi4py看起来是个不错的选择。那么,最简单的mpi4py代码应该怎么写,才能和这些代码等效呢:
from multiprocessing import Pool
pool = Pool(processes=16)
pool.map(functionName,parameters_list)
3 个回答
我用下面的代码来实现类似于multiprocessing.Pool的功能。虽然还没有经过大量测试,但看起来运行得很好:
from functools import partial
function = partial(...) # Store all fixed parameters this way if needed
if use_MPI:
arguments = range(num_runs)
run_data = None
# mpi4py
comm = MPI.COMM_SELF.Spawn(sys.executable, args=['MPI_slave.py'], maxprocs=num_runs) # Init
comm.bcast(function, root=MPI.ROOT) # Equal for all processes
comm.scatter(arguments, root=MPI.ROOT) # Different for each process
comm.Barrier() # Wait for everything to finish...
run_data = comm.gather(run_data, root=MPI.ROOT) # And gather everything up
else:
# multiprocessing
p = Pool(multiprocessing.cpu_count())
run_data = p.map(function, range(num_runs))
接下来,它会使用一个单独的文件'MPI_slave.py':
from mpi4py import MPI
# import the function you actually pass to this file here!!!
comm = MPI.COMM_SELF.Get_parent()
size = comm.Get_size()
rank = comm.Get_rank()
def runSlaveRun():
function = None
options = None
# print("Process {}/{} reporting for duty!".format(rank, size))
function = comm.bcast(function, root=0)
arguments = comm.scatter(options, root=0)
results = function(arguments)
comm.Barrier()
comm.gather(results, root=0)
comm.Disconnect()
if __name__ == '__main__':
runSlaveRun()
我有一个老旧的程序包,叫做 mpi4py
,它可以让我们在 MPI
任务中实现并行处理。这个程序包并不是为了速度而设计的,而是为了让我们可以从解释器直接在计算集群上进行 MPI
的并行处理,也就是说,不需要通过命令行运行 mpiexec
。简单来说:
>>> from pyina.launchers import MpiPool, MpiScatter
>>> pool = MpiPool()
>>> jobs = MpiScatter()
>>> def squared(x):
... return x**2
...
>>> pool.map(squared, range(4))
[0, 1, 4, 9]
>>> jobs.map(sqaured, range(4))
[0, 1, 4, 9]
它展示了“工作池”策略和“分散-收集”策略,这些策略用于将任务分配给工作者。当然,我不会用它来处理像 squared
这样的小任务,因为启动 MPI
的开销实在太大了(比起设置一个 multiprocessing
的 Pool
要高得多)。但是,如果你有一个大任务需要运行,比如通常在集群上使用 MPI
运行的任务,那么 pyina
对你来说会非常有帮助。
不过,使用 pyina
的真正好处在于,它不仅可以启动 MPI
的任务,还可以将任务交给调度器。pyina
理解并简化了多种调度器的启动语法。
使用调度器调用 pyina
的映射通常是这样的:
>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>>
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]
有几种常见的配置可以作为预配置的映射。以下内容与上面的例子是一样的:
>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]
pyina
需要一些关心,因为它仍然是 python2.7
,而且已经好几年没有更新了……不过,它在 github 上保持了更新,并且在过去的 10 年里,能够“完成任务”,尤其是在与 pathos
配合使用时(pathos
提供了 ssh
隧道和统一的接口,用于 multiprocessing
和 ParallelPython
的映射)。pyina
还没有利用共享内存,但在处理一些简单的并行计算方面表现得相当不错。与调度器的互动总体上还不错,但在一些失败情况下可能会有点问题,而非阻塞的映射还需要很多改进。尽管如此,它提供了一个相当有用的接口,可以在集群上运行简单的并行任务,使用 MPI
。
你可以在这里获取 pyina
(和 pathos
):https://github.com/uqfoundation