将多进程池适配到mpi4py

9 投票
3 回答
4931 浏览
提问于 2025-04-18 12:45

我正在用Python的多进程池来运行一个并行模拟,这在多核电脑上效果很好。现在我想在一个集群上用多个节点来执行这个程序。我想多进程可能不适合分布式内存。不过,mpi4py看起来是个不错的选择。那么,最简单的mpi4py代码应该怎么写,才能和这些代码等效呢:

from multiprocessing import Pool

pool = Pool(processes=16)

pool.map(functionName,parameters_list)

3 个回答

0

我用下面的代码来实现类似于multiprocessing.Pool的功能。虽然还没有经过大量测试,但看起来运行得很好:

from functools import partial
function = partial(...)  # Store all fixed parameters this way if needed

if use_MPI:
    arguments = range(num_runs)
    run_data = None

    # mpi4py
    comm = MPI.COMM_SELF.Spawn(sys.executable, args=['MPI_slave.py'], maxprocs=num_runs)  # Init
    comm.bcast(function, root=MPI.ROOT)     # Equal for all processes
    comm.scatter(arguments, root=MPI.ROOT)  # Different for each process
    comm.Barrier()                          # Wait for everything to finish...
    run_data = comm.gather(run_data, root=MPI.ROOT)  # And gather everything up
else:        
    # multiprocessing
    p = Pool(multiprocessing.cpu_count())
    run_data = p.map(function, range(num_runs))

接下来,它会使用一个单独的文件'MPI_slave.py':

from mpi4py import MPI
# import the function you actually pass to this file here!!!
comm = MPI.COMM_SELF.Get_parent()
size = comm.Get_size()
rank = comm.Get_rank()

def runSlaveRun():
    function = None
    options = None
    # print("Process {}/{} reporting for duty!".format(rank, size))

    function = comm.bcast(function, root=0)
    arguments = comm.scatter(options, root=0)
    results = function(arguments)
    comm.Barrier()
    comm.gather(results, root=0)
    comm.Disconnect()

if __name__ == '__main__':
    runSlaveRun()
4

这里有一个叫做 MPIPool 的类,你可以在 这里 找到它的实现。

如果你想看看我是怎么使用这个类的,可以查看我在GitHub上的这个 代码片段

5

我有一个老旧的程序包,叫做 mpi4py,它可以让我们在 MPI 任务中实现并行处理。这个程序包并不是为了速度而设计的,而是为了让我们可以从解释器直接在计算集群上进行 MPI 的并行处理,也就是说,不需要通过命令行运行 mpiexec。简单来说:

>>> from pyina.launchers import MpiPool, MpiScatter
>>> pool = MpiPool()
>>> jobs = MpiScatter()
>>> def squared(x):
...   return x**2
... 
>>> pool.map(squared, range(4))
[0, 1, 4, 9]
>>> jobs.map(sqaured, range(4))
[0, 1, 4, 9]

它展示了“工作池”策略和“分散-收集”策略,这些策略用于将任务分配给工作者。当然,我不会用它来处理像 squared 这样的小任务,因为启动 MPI 的开销实在太大了(比起设置一个 multiprocessingPool 要高得多)。但是,如果你有一个大任务需要运行,比如通常在集群上使用 MPI 运行的任务,那么 pyina 对你来说会非常有帮助。

不过,使用 pyina 的真正好处在于,它不仅可以启动 MPI 的任务,还可以将任务交给调度器。pyina 理解并简化了多种调度器的启动语法。

使用调度器调用 pyina 的映射通常是这样的:

>>> # instantiate and configure a scheduler
>>> from pyina.schedulers import Torque
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> torque = Torque(**config)
>>> 
>>> # instantiate and configure a worker pool
>>> from pyina.launchers import Mpi
>>> pool = Mpi(scheduler=torque)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]

有几种常见的配置可以作为预配置的映射。以下内容与上面的例子是一样的:

>>> # instantiate and configure a pre-configured worker pool
>>> from pyina.launchers import TorqueMpiPool
>>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'}
>>> pool = TorqueMpiPool(**config)
>>>
>>> # do a blocking map on the chosen function
>>> pool.map(pow, [1,2,3,4], [5,6,7,8])
[1, 64, 2187, 65536]

pyina 需要一些关心,因为它仍然是 python2.7,而且已经好几年没有更新了……不过,它在 github 上保持了更新,并且在过去的 10 年里,能够“完成任务”,尤其是在与 pathos 配合使用时(pathos 提供了 ssh 隧道和统一的接口,用于 multiprocessingParallelPython 的映射)。pyina 还没有利用共享内存,但在处理一些简单的并行计算方面表现得相当不错。与调度器的互动总体上还不错,但在一些失败情况下可能会有点问题,而非阻塞的映射还需要很多改进。尽管如此,它提供了一个相当有用的接口,可以在集群上运行简单的并行任务,使用 MPI

你可以在这里获取 pyina(和 pathos):https://github.com/uqfoundation

撰写回答